文章目录
- 一、创建Configuration对象
- 二、创建FileSystem对象
- 三、打开hdfs文件
- 四、FileSystem的一些方法
- 五、完整示例
- 1、拉取文件
- 2、上传文件
一、创建Configuration对象
org.apache.hadoop.conf.Configuration
Configuration conf = new Configuration();
设置部分属性
//使用数据节点(DataNode)的主机名(hostname)而不是IP地址。
conf.set("dfs.client.use.datanode.hostname", "true");
//用于指定HDFS文件系统的实现类。
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
//System.setProperty("hadoop.user", "root");
//System.setProperty("HADOOP_USER_NAME", "root");
二、创建FileSystem对象
org.apache.hadoop.fs.FileSystem
可以使用get 或 newInstance方法
FileSystem fs = FileSystem.newInstance(conf);
//FileSystem fs = FileSystem.get(conf);
三、打开hdfs文件
Path hdfsPath = new Path("hdfs://<namenode>:<port>/path/to/hdfs/file")
FSDataInputStream fsStream = fs.open(hdfsPath);
四、FileSystem的一些方法
方法 | 作用 |
---|---|
mkdirs(Path p) | 创建目录 |
create(Path p,boolean overwrite) | 指定路径创建FSDataOutputStream |
delete(Path p,boolean b) | 删除指定路径目录 |
exists(Path p) | 判断文件或目录是否存在 |
FileStatus[] listStatus(Path p) | 返回所有文件信息 |
open(Path p) | 指定路径创建FSDataInputStream |
copyToLocalFile(Path src, Path dst) | 复制文件至本地 |
copyFromLocalFile(Path src, Path dst) | 上传本地文件至hdfs |
五、完整示例
以下示例都需要先获取到FileSystem对象fs
即
Configuration conf = new Configuration();
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fs = FileSystem.newInstance(conf);
如上代码在后续示例中不再重复展示
1、拉取文件
直接复制文件至本地,使用copyToLocalFile:
int totalFileSize = 0;
int fileSize = 0;
FileStatus[] fileStatusArray = fs.listStatus(new Path(hdfsPath));if (fileStatusArray == null || fileStatusArray.length == 0) {throw new RuntimeException("download file from hdfs error,hdfsPath=" + hdfsPath + "targetPath=" + targetPath);
}
totalFileSize = fileStatusArray.length;for (FileStatus fileStatus : fileStatusArray) {String[] path = fileStatus.getPath().toString().split("/");String fileName = path[path.length - 1];try {fs.copyToLocalFile(fileStatus.getPath(), new Path(targetPath + fileName));} catch (Exception e) {LOGGER.error(String.format("hadoop download data error,hdfsPath=%s,targetPath=%s", hdfsPath, targetPath), e);throw new RuntimeException(e);}fileSize++;
}
如果需对文件内容做处理,则用BufferedReader读取:
InputStream inputStream;
BufferedReader reader = null;
try {inputStream = fs.open(fileStatus.getPath());reader = new BufferedReader(new InputStreamReader(inputStream));while (!StringUtils.isBlank(str = reader.readLine())) {ExampleMessage example;try {example = JacksonTool.jackson2Bean(str, ExampleMessage.class);//省略后续处理逻辑…} catch(Exception e){//省略打印日志等操作…continue;}}
}catch (Exception e) {//省略打印日志等操作…
} finally {try {//org.apache.hadoop.io.IOUtilsIOUtils.closeStream(inputStream);IOUtils.closeStream(reader);} catch (Exception e) {//省略打印日志等操作…}
}
2、上传文件
public boolean uploadFile2HDFS(String localFile, String hdfsPath, String hdfsFile) {try {hdfsFile = hdfsPath + "/" + hdfsFile;Path src = new Path(localFile);Path dst = new Path(hdfsFile);fs.copyFromLocalFile(src, dst);return true;} catch (Exception e) {LOGGER.error(String.format("hadoop upload data error, localFile=%s,path=%s", localFile, hdfsPath), e);throw new RuntimeException(e);}
}
或
public boolean uploadFile2HDFS(InputStream inputStream, String hdfsPath) {FSDataOutputStream outputStream = null;try {outputStream = fs.create(new Path(hdfsPath), true);IOUtils.copy(inputStream, outputStream);} catch (Exception e) {LOGGER.error(String.format("hadoop upload data error,path=%s", hdfsPath), e);throw new RuntimeException(e);} finally {try {if (outputStream != null) {outputStream.close();}} catch (Exception e) {LOGGER.error(String.format("hadoop upload data error,close,path=%s", hdfsPath), e);}}return true;
}