讲了一些Netty的组件,来聊一聊大家最关心的事情吧,他能够做什么?毕竟,我们学习就是拿来用的嘛。我可以简单的概括一下,凡是牵扯到网络相关的,都可以使用Neety去实现!
构建高性能、低时延的各种 Java 中间件,例如 MQ、分布式服务框架、ESB 消息总线等,Netty 主要作为基础通信框架提供高性能、低时延的通信服务;
公有或者私有协议栈的基础通信框架,例如可以基于 Netty 构建异步、高性能的 WebSocket 协议栈;
各领域应用,例如大数据、游戏等,Netty 作为高性能的通信框架用于内部各模块的数据分发、传输和汇总等,实现模块之间高性能通信。
1、MultipartRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import io.netty.handler.codec.http.multipart.FileUpload; import org.json.simple.JSONObject; import java.util.Map; /** * <p>请求对象</p> * * @author DarkKing */ public class MultipartRequest { private Map<String, FileUpload> fileUploads; private JSONObject params; public Map<String, FileUpload> getFileUploads() { return fileUploads; } public void setFileUploads(Map<String, FileUpload> fileUploads) { this.fileUploads = fileUploads; } public JSONObject getParams() { return params; } public void setParams(JSONObject params) { this.params = params; } } |
定义了一个http封装的对象。保存对应的传参数。
2、FileServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; public class FileServer { private final int port; public FileServer(int port) { this.port = port; } public static void main(String[] args) throws InterruptedException { int port = 9999; FileServer fileServer = new FileServer(port); System.out.println("服务器即将启动"); fileServer.start(); System.out.println("服务器关闭"); } public void start() throws InterruptedException { final FileServerHandle serverHandler = new FileServerHandle(); /*线程组*/ EventLoopGroup group = new NioEventLoopGroup(); Pipeline pipeline = new Pipeline(); try { /*服务端启动必须*/ ServerBootstrap b = new ServerBootstrap(); b.group(group)/*将线程组传入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/ .localAddress(new InetSocketAddress(port))/*指定服务器监听端口*/ /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel, 所以下面这段代码的作用就是为这个子channel增加handle*/ .childHandler(pipeline); ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/ System.out.println("Netty server start,port is " + port); f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/ } finally { group.shutdownGracefully().sync();/*优雅关闭线程组*/ } } } |
使用netty实现服文件服务器端。
3、Pipeline
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; public class Pipeline extends ChannelInitializer<SocketChannel> { private EventExecutorGroup businessEventExecutorGroup = new DefaultEventExecutorGroup(10); @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); /** * http服务器端对response编码 */ pipeline.addLast("encoder", new HttpResponseEncoder()); /** * http服务器端对request解码3. */ pipeline.addLast("decoder", new HttpRequestDecoder()); /** * 合并请求 */ pipeline.addLast("aggregator", new HttpObjectAggregator(655300000)); /** * 正常业务逻辑处理 */ pipeline.addLast(businessEventExecutorGroup, new FileServerHandle()); } } |
编写职责链,请求会从入栈以次从上到下经过编解码,请求和秉承HTTPObject,最后执行业务类FileServerHandle。
4、FileServerHandle
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.ChannelHandler.Sharable; import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.multipart.*; import io.netty.util.CharsetUtil; import org.json.simple.JSONObject; import java.io.*; import java.net.URLEncoder; import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; @Sharable public class FileServerHandle extends SimpleChannelInboundHandler<FullHttpRequest> { /*客户端读到数据以后,就会执行*/ @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { //打印请求url System.out.println(request.uri()); //下载任务处理 if (request.uri().equals("/downFile")) { responseExportFile(ctx, "D://model.txt", "model.txt"); } //上传接口处理 if (request.uri().equals("/upLoadFile")) { MultipartRequest MultipartBody = getMultipartBody(request); Map<String, FileUpload> fileUploads = MultipartBody.getFileUploads(); //输出文件信息 for (String key : fileUploads.keySet()) { //获取文件对象 FileUpload file = fileUploads.get(key); System.out.println("fileName is" + file.getFile().getPath()); //获取文件流 InputStream in = new FileInputStream(file.getFile()); BufferedReader bf = new BufferedReader(new InputStreamReader(in)); String content = bf.lines().collect(Collectors.joining("\n")); //打印文件 System.out.println("content is \n" + content); } //输出参数信息 JSONObject params = MultipartBody.getParams(); //输出文件信息 System.out.println(JSONObject.toJSONString(params)); } } /*连接建立以后*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer( "Hello Netty", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * <p> * 返回下载内容 * </p> * * @param ctx * @author DarkKing 2019-12-17 */ public static void responseExportFile(ChannelHandlerContext ctx, String path, String name) { File file = new File(path); try { //随机读取文件 final RandomAccessFile raf = new RandomAccessFile(file, "r"); long fileLength = raf.length(); //定义response对象 HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); //设置请求头部 response.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream; charset=UTF-8"); response.headers().add(HttpHeaderNames.CONTENT_DISPOSITION, "attachment; filename=\"" + URLEncoder.encode(file.getName(), "UTF-8") + "\";"); ctx.write(response); //设置事件通知对象 ChannelFuture sendFileFuture = ctx .write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); sendFileFuture.addListener(new ChannelProgressiveFutureListener() { //文件传输完成执行监听器 @Override public void operationComplete(ChannelProgressiveFuture future) throws Exception { System.out.println("file {} transfer complete."); } //文件传输进度监听器 @Override public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception { if (total < 0) { System.out.println("file {} transfer progress: {}"); } else { System.out.println("file {} transfer progress: {}/{}"); } } }); //刷新缓冲区数据,文件结束标志符 ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 功能描述 * <p>解析文件上传</p> * * @author DarkKing 2019/10/9 15:24 * @params [ctx, httpDecode] */ private static MultipartRequest getMultipartBody(FullHttpRequest request) { try { //创建HTTP对象工厂 HttpDataFactory factory = new DefaultHttpDataFactory(true); //使用HTTP POST解码器 HttpPostRequestDecoder httpDecoder = new HttpPostRequestDecoder(factory, request); httpDecoder.setDiscardThreshold(0); if (httpDecoder != null) { //获取HTTP请求对象 final HttpContent chunk = (HttpContent) request; //加载对象到加吗器。 httpDecoder.offer(chunk); if (chunk instanceof LastHttpContent) { //自定义对象bean MultipartRequest multipartRequest = new MultipartRequest(); //存放文件对象 Map<String, FileUpload> fileUploads = new HashMap<>(); //存放参数对象 JSONObject body = new JSONObject(); //通过迭代器获取HTTP的内容 java.util.List<InterfaceHttpData> InterfaceHttpDataList = httpDecoder.getBodyHttpDatas(); for (InterfaceHttpData data : InterfaceHttpDataList) { //如果数据类型为文件类型,则保存到fileUploads对象中 if (data != null && InterfaceHttpData.HttpDataType.FileUpload.equals(data.getHttpDataType())) { FileUpload fileUpload = (FileUpload) data; fileUploads.put(data.getName(), fileUpload); } //如果数据类型为参数类型,则保存到body对象中 if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { Attribute attribute = (Attribute) data; body.put(attribute.getName(), attribute.getValue()); } } //存放文件信息 multipartRequest.setFileUploads(fileUploads); //存放参数信息 multipartRequest.setParams(body); return multipartRequest; } } } catch (IOException e) { e.printStackTrace(); } return null; } } |
业务执行类,实现了文件上传和下载的接口。当请求为downFile则下载文件,请求为upLoadFile则为上传文件
1、下载演示
启动服务器端
浏览器执行下载文件,正确的下载到文件test.txt
2、上传演示
使用apipost或者postman执行文件上传操作
文件上传成功并成功读取文件内容
本文主要演示了如何不使用spring或者tomcat当做服务器,使用netty实现自己的文件上传和下载服务。并根据请求来实现对应的api接口操作。当然,如果想使用netty像spring那样简单并规范化的封装自己的api,那么就要靠自己去封装实现了。有兴趣的朋友可以自己尝试下
作者:Dark_King_
原文链接:https://blog.csdn.net/b37968539
from:https://zhuanlan.zhihu.com/p/152071762