记录springboot集成hadoop3.2.4版本,并且调用HDFS的相关接口,这里就不展示springboot工程的建立了,这个你们自己去建工程很多教程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
<!-- hadoop依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.1.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.10</version> </dependency> |
完整pom配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.3</version> </parent> <groupId>com.hadoop</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>hadoop</name> <description>Demo project for Spring Boot</description> <properties> <java.version>8</java.version> <hadoop.version>3.2.4</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--Lombok简化代码--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- hadoop依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.1.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.10</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.5.3</version> </plugin> </plugins> </build> </project> |
加入以下配置
1 2 |
hadoop.name-node: hdfs://192.168.184.129:8020 hadoop.namespace: /mydir |
name-node是这个服务的地址,可以在hadoop的配置文件中找,或者直接看hadoop集群namenode网页也可以看到端口号。
我的集群的地址是以下这个:
1 |
http://192.168.184.129:9870/ |
namespace是在hdfs上文件的地址,就是写文件要写到这个目录下面去。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
package com.hadoop.demo.config; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.FileSystem; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.net.URI; @Configuration @ConditionalOnProperty(name="hadoop.name-node") @Slf4j public class HadoopConfig { @Value("${hadoop.name-node}") private String nameNode; /** * Configuration conf=new Configuration(); * 创建一个Configuration对象时,其构造方法会默认加载hadoop中的两个配置文件, * 分别是hdfs-site.xml以及core-site.xml,这两个文件中会有访问hdfs所需的参数值, * 主要是fs.default.name,指定了hdfs的地址,有了这个地址客户端就可以通过这个地址访问hdfs了。 * 即可理解为configuration就是hadoop中的配置信息。 * @return */ @Bean("fileSystem") public FileSystem createFs() throws Exception{ //读取配置文件 org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); conf.set("fs.defalutFS", nameNode); conf.set("dfs.replication", "1"); FileSystem fs = null; //conf.set("fs.defaultFS","hdfs://ns1"); //指定访问hdfs的客户端身份 //fs = FileSystem.get(new URI(nameNode), conf, "root"); // 文件系统 // 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统 try { URI uri = new URI(nameNode.trim()); fs = FileSystem.get(uri,conf,"root"); } catch (Exception e) { log.error("", e); } System.out.println("fs.defaultFS: "+conf.get("fs.defaultFS")); return fs; } } |
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
package com.hadoop.demo.config; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; @Component @ConditionalOnBean(FileSystem.class) @Slf4j public class HadoopTemplate { @Autowired private FileSystem fileSystem; @Value("${hadoop.name-node}") private String nameNode; @Value("${hadoop.namespace:/}") private String nameSpace; @PostConstruct public void init(){ existDir(nameSpace,true); } public void uploadFile(String srcFile){ copyFileToHDFS(false,true,srcFile,nameSpace); } public void uploadFile(boolean del,String srcFile){ copyFileToHDFS(del,true,srcFile,nameSpace); } public void uploadFile(String srcFile,String destPath){ copyFileToHDFS(false,true,srcFile,destPath); } public void uploadFile(boolean del,String srcFile,String destPath){ copyFileToHDFS(del,true,srcFile,destPath); } public void delFile(String fileName){ rmdir(nameSpace,fileName) ; } public void delDir(String path){ nameSpace = nameSpace + "/" +path; rmdir(path,null) ; } public void download(String fileName,String savePath){ getFile(nameSpace+"/"+fileName,savePath); } /** * 创建目录 * @param filePath * @param create * @return */ public boolean existDir(String filePath, boolean create){ boolean flag = false; if(StringUtils.isEmpty(filePath)){ throw new IllegalArgumentException("filePath不能为空"); } try{ Path path = new Path(filePath); if (create){ if (!fileSystem.exists(path)){ fileSystem.mkdirs(path); } } if (fileSystem.isDirectory(path)){ flag = true; } }catch (Exception e){ log.error("", e); } return flag; } /** * 文件上传至 HDFS * @param delSrc 指是否删除源文件,true为删除,默认为false * @param overwrite * @param srcFile 源文件,上传文件路径 * @param destPath hdfs的目的路径 */ public void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) { // 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txt Path srcPath = new Path(srcFile); // 目的路径 if(StringUtils.isNotBlank(nameNode)){ destPath = nameNode + destPath; } Path dstPath = new Path(destPath); // 实现文件上传 try { // 获取FileSystem对象 fileSystem.copyFromLocalFile(srcPath, dstPath); fileSystem.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath); //释放资源 // fileSystem.close(); } catch (IOException e) { log.error("", e); } } /** * 删除文件或者文件目录 * * @param path */ public void rmdir(String path,String fileName) { try { // 返回FileSystem对象 if(StringUtils.isNotBlank(nameNode)){ path = nameNode + path; } if(StringUtils.isNotBlank(fileName)){ path = path + "/" +fileName; } // 删除文件或者文件目录 delete(Path f) 此方法已经弃用 fileSystem.delete(new Path(path),true); } catch (IllegalArgumentException | IOException e) { log.error("", e); } } /** * 从 HDFS 下载文件 * * @param hdfsFile * @param destPath 文件下载后,存放地址 */ public void getFile(String hdfsFile,String destPath) { // 源文件路径 if(StringUtils.isNotBlank(nameNode)){ hdfsFile = nameNode + hdfsFile; } Path hdfsPath = new Path(hdfsFile); Path dstPath = new Path(destPath); try { // 下载hdfs上的文件 fileSystem.copyToLocalFile(hdfsPath, dstPath); // 释放资源 // fs.close(); } catch (IOException e) { log.error("", e); } } public String getNameSpace(){ return nameSpace; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
package com.hadoop.demo.util; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import java.io.IOException; import java.net.URI; /** * hdfs基本操作 */ @Slf4j public class HdfsUtil { /** * 获取文件系统 * @param hdfsUri nameNode地址 如"hdfs://10.10.1.142:9000" * @return */ public static FileSystem getFileSystem(String hdfsUri) { //读取配置文件 Configuration conf = new Configuration(); // 文件系统 FileSystem fs = null; if(StringUtils.isBlank(hdfsUri)){ // 返回默认文件系统 如果在 Hadoop集群下运行,使用此种方法可直接获取默认文件系统 try { fs = FileSystem.get(conf); } catch (IOException e) { log.error("", e); } }else{ // 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统 try { URI uri = new URI(hdfsUri.trim()); fs = FileSystem.get(uri,conf); } catch (Exception e) { log.error("", e); } } return fs; } /** * 创建文件目录 * * @param hdfsUri * @param path */ public static void mkdir(String hdfsUri, String path) { try { // 获取文件系统 FileSystem fs = getFileSystem(hdfsUri); if(StringUtils.isNotBlank(hdfsUri)){ path = hdfsUri + path; } // 创建目录 fs.mkdirs(new Path(path)); //释放资源 fs.close(); } catch (IllegalArgumentException | IOException e) { log.error("", e); } } /** * 删除文件或者文件目录 * * @param path */ public static void rmdir(String hdfsUri,String path) { try { // 返回FileSystem对象 FileSystem fs = getFileSystem(hdfsUri); if(StringUtils.isNotBlank(hdfsUri)){ path = hdfsUri + path; } // 删除文件或者文件目录 delete(Path f) 此方法已经弃用 fs.delete(new Path(path),true); // 释放资源 fs.close(); } catch (IllegalArgumentException | IOException e) { log.error("", e); } } /** * 根据filter获取目录下的文件 * * @param path * @param pathFilter * @return String[] */ public static String[] listFile(String hdfsUri, String path,PathFilter pathFilter) { String[] files = new String[0]; try { // 返回FileSystem对象 FileSystem fs = getFileSystem(hdfsUri); if(StringUtils.isNotBlank(hdfsUri)){ path = hdfsUri + path; } FileStatus[] status; if(pathFilter != null){ // 根据filter列出目录内容 status = fs.listStatus(new Path(path),pathFilter); }else{ // 列出目录内容 status = fs.listStatus(new Path(path)); } // 获取目录下的所有文件路径 Path[] listedPaths = FileUtil.stat2Paths(status); // 转换String[] if (listedPaths != null && listedPaths.length > 0){ files = new String[listedPaths.length]; for (int i = 0; i < files.length; i++){ files[i] = listedPaths[i].toString(); } } // 释放资源 fs.close(); } catch (IllegalArgumentException | IOException e) { log.error("", e); } return files; } /** * 文件上传至 HDFS * @param hdfsUri * @param delSrc 指是否删除源文件,true为删除,默认为false * @param overwrite * @param srcFile 源文件,上传文件路径 * @param destPath hdfs的目的路径 */ public static void copyFileToHDFS(String hdfsUri,boolean delSrc, boolean overwrite,String srcFile,String destPath) { // 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txt Path srcPath = new Path(srcFile); // 目的路径 if(StringUtils.isNotBlank(hdfsUri)){ destPath = hdfsUri + destPath; } Path dstPath = new Path(destPath); // 实现文件上传 try { // 获取FileSystem对象 FileSystem fs = getFileSystem(hdfsUri); fs.copyFromLocalFile(srcPath, dstPath); fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath); //释放资源 fs.close(); } catch (IOException e) { log.error("", e); } } /** * 从 HDFS 下载文件 * * @param srcFile * @param destPath 文件下载后,存放地址 */ public static void getFile(String hdfsUri, String srcFile,String destPath) { // 源文件路径 if(StringUtils.isNotBlank(hdfsUri)){ srcFile = hdfsUri + srcFile; } Path srcPath = new Path(srcFile); Path dstPath = new Path(destPath); try { // 获取FileSystem对象 FileSystem fs = getFileSystem(hdfsUri); // 下载hdfs上的文件 fs.copyToLocalFile(srcPath, dstPath); // 释放资源 fs.close(); } catch (IOException e) { log.error("", e); } } /** * 获取 HDFS 集群节点信息 * * @return DatanodeInfo[] */ public static DatanodeInfo[] getHDFSNodes(String hdfsUri) { // 获取所有节点 DatanodeInfo[] dataNodeStats = new DatanodeInfo[0]; try { // 返回FileSystem对象 FileSystem fs = getFileSystem(hdfsUri); // 获取分布式文件系统 DistributedFileSystem hdfs = (DistributedFileSystem)fs; dataNodeStats = hdfs.getDataNodeStats(); } catch (IOException e) { log.error("", e); } return dataNodeStats; } /** * 查找某个文件在 HDFS集群的位置 * * @param filePath * @return BlockLocation[] */ public static BlockLocation[] getFileBlockLocations(String hdfsUri, String filePath) { // 文件路径 if(StringUtils.isNotBlank(hdfsUri)){ filePath = hdfsUri + filePath; } Path path = new Path(filePath); // 文件块位置列表 BlockLocation[] blkLocations = new BlockLocation[0]; try { // 返回FileSystem对象 FileSystem fs = getFileSystem(hdfsUri); // 获取文件目录 FileStatus filestatus = fs.getFileStatus(path); //获取文件块位置列表 blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen()); } catch (IOException e) { log.error("", e); } return blkLocations; } /** * 判断目录是否存在 * @param hdfsUri * @param filePath * @param create * @return */ public boolean existDir(String hdfsUri,String filePath, boolean create){ boolean flag = false; if (StringUtils.isEmpty(filePath)){ return flag; } try{ Path path = new Path(filePath); // FileSystem对象 FileSystem fs = getFileSystem(hdfsUri); if (create){ if (!fs.exists(path)){ fs.mkdirs(path); } } if (fs.isDirectory(path)){ flag = true; } }catch (Exception e){ log.error("", e); } return flag; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
package com.hadoop.demo.control; import com.hadoop.demo.config.HadoopTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RequestMapping("/hdfs") @RestController public class HdfsController { @Autowired private HadoopTemplate hadoopTemplate; /** * 将本地文件srcFile,上传到hdfs * @param srcFile * @return */ @RequestMapping("/upload") public String upload(@RequestParam String srcFile){ hadoopTemplate.uploadFile(srcFile); return "upload"; } @RequestMapping("/delFile") public String del(@RequestParam String fileName){ hadoopTemplate.delFile(fileName); return "delFile"; } @RequestMapping("/download") public String download(@RequestParam String fileName,@RequestParam String savePath){ hadoopTemplate.download(fileName,savePath); return "download"; } } |
项目启动后是从windows调用linux集群,启动一定会报错,如果没有配置windows的环境。
报错如下
1 2 |
java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547) ~[hadoop-common-3.2.4.jar:na] |
报错的原因是缺少了hadoop的环境配置。要做以下的配置。
https://gitee.com/nkuhyx/winutils.git
下载地址在上面,我这里的hadoop版本是3.2.4,这里我选择的是版本接近的3.2.1
我本地下载后安装到
D:\javaTools\hadoopwindowsclient\hadoop-3.2.1
添加系统变量HADOOP_HOME
1 |
D:\javaTools\hadoopwindowsclient\hadoop-3.2.1 |
添加到path
1 |
%HADOOP_HOME%\bin |
配置好后重启电脑或者使用dos命令刷新环境变量,我这里直接重启电脑了,就懒得去弄命令了。
上传文件
本地D盘新建了一个测试文件,内容如下
调用上传接口
srcfile为你本地的文件路径。
1 |
http://localhost:8080/hdfs/upload?srcFile=D:\test.txt |
结果:
点击namenode进来可以看到文件路径。
点开这个文件
可以看到文件已经上传到hdfs了,这里需要注意一个细节。
文件的格式必须是utf-8的如果不是的话,上传中文里面的文件是乱码,这个需要注意下。
这里的filename是下载文件的路径。
1 |
http://localhost:8080/hdfs/download?fileName=test.txt&savePath=D:\Download |
下载到d盘下,结果如下
打开内容和上传的一致,说明下载成功。
1 |
http://localhost:8080/hdfs/delFile?fileName=test.txt |
删除后重新查看namenode网址
可以看到文件已经删除了。
这里展示了springboot集成hadoopHDFS的相关操作以及遇到的问题解决,如果对你有帮助点个赞吧。
from:https://blog.csdn.net/qq_34526237/article/details/130059360