文章目录
- 概述
- ChannelHandler
- 方法
- ChannelPipeline
- 特点
- 总结
- 代码示例
- 服务器端
- 客户端
- EmbeddedChannel
- EmbeddedChannel 的方法
概述
ChannelHandler
ChannelHandler 是 Netty 中的一个接口,它定义了处理 I/O 事件的方法。ChannelHandler 可以处理各种类型的事件,包括连接事件、读写事件、异常事件等。
方法
入站(Inbound)方法:
- channelRead(ChannelHandlerContext ctx, Object msg):当从通道接收到数据时调用。
- channelReadComplete(ChannelHandlerContext ctx):当所有数据都已读取完毕时调用。
- exceptionCaught(ChannelHandlerContext ctx, Throwable cause):当发生异常时调用。
出站(Outbound)方法:
- write(ChannelHandlerContext ctx, Object msg, Promise promise):当有数据需要写出时调用。
- flush(ChannelHandlerContext ctx):当需要刷新写出缓冲区时调用。
ChannelPipeline
ChannelPipeline 是 Netty 中的核心组件之一,它管理了一系列的 ChannelHandler。ChannelPipeline 可以理解为一个责任链模式的实现,它按照顺序处理事件,每个 ChannelHandler 负责处理特定类型的事件,并可以选择将事件传递给下一个 ChannelHandler 或者停止处理。
特点
- 有序性:ChannelPipeline 中的 ChannelHandler 是按顺序排列的,可以插入、删除或替换。
- 双向性:ChannelPipeline 支持双向处理,即可以处理入站(Inbound)事件,也可以处理出站(Outbound)事件。
- 灵活性:可以动态地添加、删除或替换 ChannelHandler,而无需重新启动应用程序。
总结
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。
代码示例
服务器端
在服务器端中添加处理器 head -> h1 -> h2 -> h4 -> h3 -> h5 -> h6 -> tail
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.Charset;@Slf4j
public class PipelineTest {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 1. 通过 channel 拿到 pipelineChannelPipeline pipeline = ch.pipeline();// 2. 添加处理器 head -> h1 -> h2 -> h4 -> h3 -> h5 -> h6 -> tail//addLast()会加在tail之前,而不是最后。底层是双向链表//入站处理器pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");//处理加工msgByteBuf buf = (ByteBuf) msg;String name = buf.toString(Charset.defaultCharset());//将处理好的字符串传递给h2super.channelRead(ctx, name);}});pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {log.debug("2");Student student = new Student(name.toString());super.channelRead(ctx, student);// 调用super.channelRead()将数据传递给下个 handler,如果不调用,调用链会断开。或者调用ctx.fireChannelRead(student);}});//出战处理器。注意:ctx.writeAndFlush()和ch.writeAndFlush()都会走该出站处理器pipeline.addLast("h2.5", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("2.5");super.write(ctx, msg, promise);}});pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("3,h2传递的数据为{},class为{}",msg,msg.getClass());//super.channelRead(ctx, msg);//channelRead()是将控制权交给pipeline中下一个入站处理器//这里h3后面已经没有入站处理器了,所以可以不用调用channelRead()//向channel中写入数据触发出站处理器(ch.writeAndFlush()会从尾节点向前找出站处理器)ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));//ctx.writeAndFlush()是从当前处理器向前寻找出站处理器
// ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));}});//出站处理器 p2.25//注意:出站处理器只有向channel中写入数据才会触发pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("4");super.write(ctx, msg, promise);}});pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("5");super.write(ctx, msg, promise);}});pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("6");super.write(ctx, msg, promise);}});}}).bind(8080);}@Data@AllArgsConstructorstatic class Student {private String name;}
}
客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;/*** @author qf* @since 2024/09/12 19:44*/
public class Client {public static void main(String[] args) {new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) {ch.pipeline().addLast(new StringEncoder());}}).connect("127.0.0.1", 8080).addListener((ChannelFutureListener) future -> {future.channel().writeAndFlush("hello,world");});}
}
服务器端输出
18:45:18.287 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 1
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 2
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 3,h2传递的数据为PipelineTest.Student(name=hello,world),class为class com.qf.netty.Pipeline.PipelineTest$Student
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 6
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 5
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 4
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 2.5
以上代码可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表。
-
入站处理器中,super.channelRead(ctx, name)或使用ctx.fireChannelRead(msg) 是 调用下一个入站处理器
- 如果注释掉 h1中 super.channelRead(ctx, name)代码,则仅会打印 1
- 如果注释掉 h2中 super.channelRead(ctx, student)代码,则仅会打印 1 2
-
h3 处的 ch.writeAndFlush() 会 从尾部开始触发 后续出站处理器的执行
- 如果注释掉 h3 处 ch.writeAndFlush() 代码,则仅会打印 1 2 3
-
类似的,出站处理器中,super.write(ctx, msg, promise)或ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
- 如果注释掉 h6 处 super.write(ctx, msg, promise) 代码,则仅会打印 1 2 3 6
-
ctx.channel().write(msg) vs ctx.write(msg)
- 都是触发出站处理器的执行
- ctx.channel().write(msg) 从尾部开始查找出站处理器
- ctx.write(msg) 是从当前节点找上一个出站处理器
- 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
- 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己(死循环)
服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序
EmbeddedChannel
EmbeddedChannel 是 netty提供的专门用来测试的channel。
使用EmbeddedChannel进行测试可以不启动服务器和客户端了。
EmbeddedChannel 的方法
- writeInbound(Object msg):将入站消息写到 EmbeddedChannel 中。
- writeOutbound(Object msg):将出站消息写到 EmbeddedChannel 中。
- readInbound():从 EmbeddedChannel 中读取入站消息。
- readOutbound():从 EmbeddedChannel 中读取出站消息。
- finish():完成所有未完成的写操作,并关闭 EmbeddedChannel
@Slf4j
public class Test06EmbeddedChannel {public static void main(String[] args) {ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("2");super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("3");super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("4");super.write(ctx, msg, promise);}};EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);// 模拟入站操作
// channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));/*[DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 1[DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 2*/// 模拟出站操作channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));/*[DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 4[DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 3*/}
}