当前位置: 首页 > news >正文

JAVA代码操作HDFS

1、客户端环境准备

(1)将Hadoop-2.9.2安装包解压到非中文路径(例如:E:\hadoop-2.9.2)

(2) 配置HADOOP_HOME环境变量

(3) 配置Path环境变量。

(4) 创建一个Maven工程ClientDemo

(5)导入相应的依赖坐标+日志配置文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.lagou.hdfs</groupId>
    <artifactId>client_demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.9.2</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

        为了便于控制程序运行打印的日志数量,需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,文件内容:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

(6)创建包名:com.lagou.hdfs

(7)创建HdfsClient类

public class HdfsClientDemo {
  @Test
    public void testMkdirs() throws Exception {
        // 1、获取Hadoop 集群的configuration对象
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://hadoop1:9000"); // 设置这个属性以后,获取FileSystem对象时,就不在需要创建URI连接对象了。使用这种方式,不能指定对象,可能会引发权限不足问题,解决办法参考下面文章


        // 2、根据configuration获取FileSystem对象
        // FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        FileSystem fileSystem = FileSystem.get(configuration);

        // 3、使用FileSystem对象创建一个测试目录
        fileSystem.mkdirs(new Path("/api_test11"));

        // 4、释放FileSystem对象(类似数据库连接)
        fileSystem.close();
    }
}

注意:

(1)windows解压安装Hadoop后,在调用相关API操作HDFS集群时可能会报错,这是由于Hadoop安装缺少windows操作系统相关文件所致,如下图:

解决方案: 

        从资料文件夹中找到winutils.exe拷贝放到windows系统Hadoop安装目录的bin目录下即可!!

链接:文件地址 

(2)HDFS文件系统权限问题

        如果不指定操作HDFS集群的用户信息,默认是获取当前操作系统的用户信息,出现权限被拒绝的问题,报错如下:

 

  • hdfs的文件权限机制与linux系统的文件权限机制类似!!
  • r:read w:write x:execute 权限x对于文件表示忽略,对于文件夹表示是否有权限访问其内容
  • 如果linux系统用户zhangsan使用hadoop命令创建一个文件,那么这个文件在HDFS当中的owner就是zhangsan
  • HDFS文件权限的目的,防止好人做错事,而不是阻止坏人做坏事。HDFS相信你告诉我你是谁,你就是谁!!

解决方案:

  • 指定用户信息获取FileSystem对象
  • 关闭HDFS集群权限校验

    vim hdfs-site.xml

    #添加如下属性
    <property>
        <name>dfs.permissions</name>
        <value>true</value>
    </property>

    修改完成之后要分发到其它节点,同时要重启HDFS集群

  • 基于HDFS权限本身比较鸡肋的特点,我们可以彻底放弃HDFS的权限校验,如果生产环境中我们可以考虑借助kerberos以及sentry等安全框架来管理大数据集群安全。所以我们直接修改HDFS的根目录权限为777

    hadoop fs -chmod -R 777 /

2、HDFS的API操作

        注:为了后续的使用方便,我对代码进行了改造,将创建Configuration以及FileSystem对象的代码移到了了@Before注解上,关闭流的操作移到了@After注解上,使我们后续的操作重点关注于HDFS api 的使用上。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class HdfsClientDemo {

    FileSystem fileSystem = null;

    @Before
    public void init() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://hadoop1:9000");
        // configuration.set("dfs.replication","2");
        fileSystem = FileSystem.get(configuration);
    }

// 现在就只写我们使用FileSystem操作api了

 @After
    public void destroy() throws Exception {
        fileSystem.close();
    }

}

(1)上传文件

 // 上传文件
    @Test
    public void copyFromLocalToHdfs() throws Exception {
        /**
         * src:源文件目录,本地路径
         * dst:目标文件目录,hdfs路径
         */
        fileSystem.copyFromLocalFile(new Path("C:/Users/小不点/Desktop/VPN.txt"), new Path("/VPN.txt"));
        // 上传文件到hdfs默认的是3个副本
        /*
         *如何改变上传文件的副本数量
         * 1、configuration对象中指定新的副本数量,就是@Before注解下面代码块,注释掉的那一行代码
         * 2、创建xml文件,在里面添加属性
         */

    }

将hdfs-site.xml拷贝到项目的根目录下

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

参数优先级排序:(1)代码中设置的值 >(2)用户自定义配置文件 >(3)服务器的默认配置

(2)下载文件

   // 下载文件
    @Test
    public void copyFromHdfsToLocal() throws Exception {
            
        /**
         * 三个参数
         * boolean:是否删除源文件
         * src:源文件目录,hdfs路径
         * dst:目标文件目录,本地路径
         */
        fileSystem.copyToLocalFile(true, new Path("/VPN.txt"), new Path("D:/下载/VPN.txt"));

      //  四个参数
        // boolean delSrc 指是否将原文件删除
        // Path src 指要下载的文件路径
        // Path dst 指将文件下载到的路径
        // boolean useRawLocalFileSystem 是否开启文件校验
    }

(3)删除文件/文件夹

 // 删除文件或文件夹
    @Test
    public void deleteFile() throws IOException {
        // boolean值代表是否递归删除文件
        fileSystem.delete(new Path("/api_test11"), true);
    }

(4)查看文件名称、权限、长度、块信息

  // 遍历hdfs的根目录得到文件夹以及文件夹的信息:名称、权限、大小
    @Test
    public void listFiles() throws Exception {
        // 得到一个迭代器:装有指定目录下所有的文件信息
        RemoteIterator<LocatedFileStatus> remoteIterator = fileSystem.listFiles(new Path("/"), true);
        // 遍历迭代器
        while (remoteIterator.hasNext()) {
            LocatedFileStatus fileStatus = remoteIterator.next();
            // 文件名称
            String fileName = fileStatus.getPath().getName();
            // 长度
            long len = fileStatus.getLen();
            // 权限
            FsPermission permission = fileStatus.getPermission();
            // 所属组
            String group = fileStatus.getGroup();
            // 所属用户
            String owner = fileStatus.getOwner();
            System.out.println(fileName + "\t" + len + "\t" + permission + "\t" + group + "\t" + owner);
            System.out.println("================================");

            // 块信息
            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            for (BlockLocation blockLocation : blockLocations) {
                String[] hosts = blockLocation.getHosts();
                for (String host : hosts) {
                    System.out.println("主机名称:" + host);
                }
            }
        }
    }

(5)文件夹判断

// 文件夹的判断
    @Test
    public void isFile() throws Exception {
        FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/"));
        for (FileStatus fileStatus : fileStatuses) {
            boolean flag = fileStatus.isFile();
            if (flag) {
                System.out.println("文件:" + fileStatus.getPath().getName());
            } else {
                System.out.println("文件夹:" + fileStatus.getPath().getName());
            }
        }
    }

(6)I/O流操作HDFS

        以上我们使用的API操作都是HDFS系统框架封装好的。我们自己也可以采用IO流的方式实现文件的上传和下载。

  // 使用IO流操作HDFS
    // 上传文件:准备输入流读取本地文件,使用hdfs的输出流写数据到hdfs
    @Test
    public void uploadFileIO() throws Exception {
        // 读取本地文件的输入流
        FileInputStream inputStream = new FileInputStream(new File("C:/Users/小不点/Desktop/test.txt"));
        // 准备写数据到hdfs的输出流
        FSDataOutputStream outputStream = fileSystem.create(new Path("/Java.txt"));
        // 输入流数据拷贝到输出流
        IOUtils.copyBytes(inputStream, outputStream, 4096, true); // 最后一个参数代表是否关闭流,true为关闭
    }


    // 下载文件
    @Test
    public void downloadFile() throws Exception {
        // 读取hdfs文件的输入流
        FSDataInputStream inputStream = fileSystem.open(new Path("/lagou.txt"));
        // 准备写数据到本地的输出流
        FileOutputStream outputStream = new FileOutputStream("C:/Users/小不点/Desktop/张三.txt");
        // 输入流数据拷贝到输出流
        IOUtils.copyBytes(inputStream, outputStream, 4096, true); // 最后一个参数代表是否关闭流,true为关闭
    }

(7)seek 定位读取

 // seek定位读取hdfs文件:使用IO流读取/lagopu.txt文件内容输出两次,本质就是读取文件内容两次并输出
    @Test
    public void seekReadFile() throws Exception {
        // 创建读取hdfs文件的输入流
        FSDataInputStream inputStream = fileSystem.open(new Path("/Java.txt"));
        // 控制台输出System.out
        // 实现流拷贝
        IOUtils.copyBytes(inputStream, System.out, 4096, false);
        // 再次读取文件
        inputStream.seek(0);// 定位从0偏移量(文件头部)再次读取
        IOUtils.copyBytes(inputStream, System.out, 4096, false); // false代表不关闭流

        // 关闭输入流
        IOUtils.closeStream(inputStream);
    }

全部完整代码

package com.lagou.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class HdfsClientDemo {

    FileSystem fileSystem = null;

    @Before
    public void init() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://hadoop1:9000");
        // configuration.set("dfs.replication","2");
        fileSystem = FileSystem.get(configuration);
    }


    @Test
    public void testMkdirs() throws Exception {
        // 1、获取Hadoop 集群的configuration对象
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://hadoop1:9000"); // 设置这个属性以后,获取FileSystem对象时,就不在需要创建URI连接对象了


        // 2、根据configuration获取FileSystem对象
        // FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        FileSystem fileSystem = FileSystem.get(configuration);

        // 3、使用FileSystem对象创建一个测试目录
        fileSystem.mkdirs(new Path("/api_test11"));

        // 4、释放FileSystem对象(类似数据库连接)
        fileSystem.close();
    }

    @After
    public void destroy() throws Exception {
        fileSystem.close();
    }

    // 上传文件
    @Test
    public void copyFromLocalToHdfs() throws Exception {
        /**
         * src:源文件目录,本地路径
         * dst:目标文件目录,hdfs路径
         */
        fileSystem.copyFromLocalFile(new Path("C:/Users/小不点/Desktop/VPN.txt"), new Path("/VPN.txt"));
        // 上传文件到hdfs默认的是3个副本
        /*
         *如何改变上传文件的副本数量
         * 1、configuration对象中指定新的副本数量
         * 2、创建xml文件,在里面添加属性
         */

    }

    // 下载文件
    @Test
    public void copyFromHdfsToLocal() throws Exception {
        /**
         * boolean:是否删除源文件
         * src:源文件目录,hdfs路径
         * dst:目标文件目录,本地路径
         */
        fileSystem.copyToLocalFile(true, new Path("/VPN.txt"), new Path("D:/下载/VPN.txt"));
    }

    // 删除文件或文件夹
    @Test
    public void deleteFile() throws IOException {
        // boolean值代表是否递归删除文件
        fileSystem.delete(new Path("/api_test11"), true);
    }

    // 遍历hdfs的根目录得到文件夹以及文件夹的信息:名称、权限、大小
    @Test
    public void listFiles() throws Exception {
        // 得到一个迭代器:装有指定目录下所有的文件信息
        RemoteIterator<LocatedFileStatus> remoteIterator = fileSystem.listFiles(new Path("/"), true);
        // 遍历迭代器
        while (remoteIterator.hasNext()) {
            LocatedFileStatus fileStatus = remoteIterator.next();
            // 文件名称
            String fileName = fileStatus.getPath().getName();
            // 长度
            long len = fileStatus.getLen();
            // 权限
            FsPermission permission = fileStatus.getPermission();
            // 所属组
            String group = fileStatus.getGroup();
            // 所属用户
            String owner = fileStatus.getOwner();
            System.out.println(fileName + "\t" + len + "\t" + permission + "\t" + group + "\t" + owner);
            System.out.println("================================");

            // 块信息
            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            for (BlockLocation blockLocation : blockLocations) {
                String[] hosts = blockLocation.getHosts();
                for (String host : hosts) {
                    System.out.println("主机名称:" + host);
                }
            }
        }
    }


    // 文件夹的判断
    @Test
    public void isFile() throws Exception {
        FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/"));
        for (FileStatus fileStatus : fileStatuses) {
            boolean flag = fileStatus.isFile();
            if (flag) {
                System.out.println("文件:" + fileStatus.getPath().getName());
            } else {
                System.out.println("文件夹:" + fileStatus.getPath().getName());
            }
        }
    }

    // 使用IO流操作HDFS
    // 上传文件:准备输入流读取本地文件,使用hdfs的输出流写数据到hdfs
    @Test
    public void uploadFileIO() throws Exception {
        // 读取本地文件的输入流
        FileInputStream inputStream = new FileInputStream(new File("C:/Users/小不点/Desktop/test.txt"));
        // 准备写数据到hdfs的输出流
        FSDataOutputStream outputStream = fileSystem.create(new Path("/Java.txt"));
        // 输入流数据拷贝到输出流
        IOUtils.copyBytes(inputStream, outputStream, 4096, true);
    }


    // 下载文件
    @Test
    public void downloadFile() throws Exception {
        // 读取hdfs文件的输入流
        FSDataInputStream inputStream = fileSystem.open(new Path("/lagou.txt"));
        // 准备写数据到本地的输出流
        FileOutputStream outputStream = new FileOutputStream("C:/Users/小不点/Desktop/张三.txt");
        // 输入流数据拷贝到输出流
        IOUtils.copyBytes(inputStream, outputStream, 4096, true);
    }


    // seek定位读取hdfs文件:使用IO流读取/lagopu.txt文件内容输出两次,本质就是读取文件内容两次并输出
    @Test
    public void seekReadFile() throws Exception {
        // 创建读取hdfs文件的输入流
        FSDataInputStream inputStream = fileSystem.open(new Path("/Java.txt"));
        // 控制台输出System.out
        // 实现流拷贝
        IOUtils.copyBytes(inputStream, System.out, 4096, false);
        // 再次读取文件
        inputStream.seek(0);// 定位从0偏移量(文件头部)再次读取
        IOUtils.copyBytes(inputStream, System.out, 4096, false);

        // 关闭输入流
        IOUtils.closeStream(inputStream);
    }
}


相关文章:

  • web前端开发基础教程一
  • 原子尺度仿真对材料设计效率的提升,是未来材料研发的关键核心竞争力
  • CDH 10Cloudera Manager Kerberos安装配置CA配置(markdown新版三)
  • RedHat7无法安装Telnet
  • LeetCode刷题(二):前言
  • 网络套接字实现TCP机制通信
  • 一个非教条式的TDD例子
  • Spring 整合 MyBatis
  • Verilog 有符号数详解(含代码验证)
  • 今天是圣诞节, 要打印一个漂亮的圣诞树送给想象中的女朋友,请你帮助他实现梦想。
  • 同样是测试工程师,月薪8k的功能测试和月薪14k的自动化测试,差在了那里?
  • k8s 认证机制源码分析
  • Java-KoTime:接口耗时监测与邮件通知接口耗时情况
  • 【Linux】Linux系统编程(入门与系统编程)(一)(环境搭建、常见指令以及权限理解)
  • 【JavaScript高级】函数相关知识:函数、纯函数、柯里化、严格模式
  • 07.Android之多媒体问题
  • CSS魔法堂:Absolute Positioning就这个样
  • CSS中外联样式表代表的含义
  • C语言笔记(第一章:C语言编程)
  • Docker 1.12实践:Docker Service、Stack与分布式应用捆绑包
  • Git的一些常用操作
  • Java 实战开发之spring、logback配置及chrome开发神器(六)
  • JavaScript-Array类型
  • macOS 中 shell 创建文件夹及文件并 VS Code 打开
  • React的组件模式
  • Vue UI框架库开发介绍
  • WePY 在小程序性能调优上做出的探究
  • 测试如何在敏捷团队中工作?
  • 工作踩坑系列——https访问遇到“已阻止载入混合活动内容”
  • 记一次用 NodeJs 实现模拟登录的思路
  • 简单易用的leetcode开发测试工具(npm)
  • 聊聊flink的TableFactory
  • 模仿 Go Sort 排序接口实现的自定义排序
  • 学习Vue.js的五个小例子
  • 一个普通的 5 年iOS开发者的自我总结,以及5年开发经历和感想!
  • 用Node EJS写一个爬虫脚本每天定时给心爱的她发一封暖心邮件
  • Mac 上flink的安装与启动
  • ​ubuntu下安装kvm虚拟机
  • # 数据结构
  • #define、const、typedef的差别
  • #LLM入门|Prompt#2.3_对查询任务进行分类|意图分析_Classification
  • #QT(TCP网络编程-服务端)
  • #我与Java虚拟机的故事#连载08:书读百遍其义自见
  • $ is not function   和JQUERY 命名 冲突的解说 Jquer问题 (
  • (09)Hive——CTE 公共表达式
  • (Redis使用系列) Springboot 实现Redis 同数据源动态切换db 八
  • (八十八)VFL语言初步 - 实现布局
  • (附源码)ssm高校运动会管理系统 毕业设计 020419
  • (附源码)计算机毕业设计SSM在线影视购票系统
  • (过滤器)Filter和(监听器)listener
  • (三)Honghu Cloud云架构一定时调度平台
  • (三)mysql_MYSQL(三)
  • (使用vite搭建vue3项目(vite + vue3 + vue router + pinia + element plus))
  • (五)MySQL的备份及恢复
  • (一)spring cloud微服务分布式云架构 - Spring Cloud简介