当前位置: 首页> 财经> 金融 > Netty笔记05-组件Handler Pipeline

Netty笔记05-组件Handler Pipeline

时间:2025/7/12 14:37:28来源:https://blog.csdn.net/weixin_46425661/article/details/142177575 浏览次数:0次

文章目录

  • 概述
    • ChannelHandler
      • 方法
    • ChannelPipeline
      • 特点
    • 总结
  • 代码示例
    • 服务器端
    • 客户端
  • EmbeddedChannel
    • EmbeddedChannel 的方法


概述

ChannelHandler

ChannelHandler 是 Netty 中的一个接口,它定义了处理 I/O 事件的方法。ChannelHandler 可以处理各种类型的事件,包括连接事件、读写事件、异常事件等。

方法

入站(Inbound)方法:

  • channelRead(ChannelHandlerContext ctx, Object msg):当从通道接收到数据时调用。
  • channelReadComplete(ChannelHandlerContext ctx):当所有数据都已读取完毕时调用。
  • exceptionCaught(ChannelHandlerContext ctx, Throwable cause):当发生异常时调用。

出站(Outbound)方法:

  • write(ChannelHandlerContext ctx, Object msg, Promise promise):当有数据需要写出时调用。
  • flush(ChannelHandlerContext ctx):当需要刷新写出缓冲区时调用。

ChannelPipeline

ChannelPipeline 是 Netty 中的核心组件之一,它管理了一系列的 ChannelHandler。ChannelPipeline 可以理解为一个责任链模式的实现,它按照顺序处理事件,每个 ChannelHandler 负责处理特定类型的事件,并可以选择将事件传递给下一个 ChannelHandler 或者停止处理。

特点

  • 有序性:ChannelPipeline 中的 ChannelHandler 是按顺序排列的,可以插入、删除或替换。
  • 双向性:ChannelPipeline 支持双向处理,即可以处理入站(Inbound)事件,也可以处理出站(Outbound)事件。
  • 灵活性:可以动态地添加、删除或替换 ChannelHandler,而无需重新启动应用程序。

总结

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。

代码示例

服务器端

在服务器端中添加处理器 head -> h1 -> h2 -> h4 -> h3 -> h5 -> h6 -> tail

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.Charset;@Slf4j
public class PipelineTest {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 1. 通过 channel 拿到 pipelineChannelPipeline pipeline = ch.pipeline();// 2. 添加处理器 head ->  h1 -> h2 ->  h4 -> h3 -> h5 -> h6 -> tail//addLast()会加在tail之前,而不是最后。底层是双向链表//入站处理器pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");//处理加工msgByteBuf buf = (ByteBuf) msg;String name = buf.toString(Charset.defaultCharset());//将处理好的字符串传递给h2super.channelRead(ctx, name);}});pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {log.debug("2");Student student = new Student(name.toString());super.channelRead(ctx, student);// 调用super.channelRead()将数据传递给下个 handler,如果不调用,调用链会断开。或者调用ctx.fireChannelRead(student);}});//出战处理器。注意:ctx.writeAndFlush()和ch.writeAndFlush()都会走该出站处理器pipeline.addLast("h2.5", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("2.5");super.write(ctx, msg, promise);}});pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("3,h2传递的数据为{},class为{}",msg,msg.getClass());//super.channelRead(ctx, msg);//channelRead()是将控制权交给pipeline中下一个入站处理器//这里h3后面已经没有入站处理器了,所以可以不用调用channelRead()//向channel中写入数据触发出站处理器(ch.writeAndFlush()会从尾节点向前找出站处理器)ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));//ctx.writeAndFlush()是从当前处理器向前寻找出站处理器
//                                ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));}});//出站处理器 p2.25//注意:出站处理器只有向channel中写入数据才会触发pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("4");super.write(ctx, msg, promise);}});pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("5");super.write(ctx, msg, promise);}});pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("6");super.write(ctx, msg, promise);}});}}).bind(8080);}@Data@AllArgsConstructorstatic class Student {private String name;}
}

客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;/*** @author qf* @since 2024/09/12 19:44*/
public class Client {public static void main(String[] args) {new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) {ch.pipeline().addLast(new StringEncoder());}}).connect("127.0.0.1", 8080).addListener((ChannelFutureListener) future -> {future.channel().writeAndFlush("hello,world");});}
}

服务器端输出

18:45:18.287 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 1
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 2
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 3,h2传递的数据为PipelineTest.Student(name=hello,world),classclass com.qf.netty.Pipeline.PipelineTest$Student
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 6
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 5
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 4
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 2.5

以上代码可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表。
在这里插入图片描述

  • 入站处理器中,super.channelRead(ctx, name)或使用ctx.fireChannelRead(msg) 是 调用下一个入站处理器

    • 如果注释掉 h1中 super.channelRead(ctx, name)代码,则仅会打印 1
    • 如果注释掉 h2中 super.channelRead(ctx, student)代码,则仅会打印 1 2
  • h3 处的 ch.writeAndFlush() 会 从尾部开始触发 后续出站处理器的执行

    • 如果注释掉 h3 处 ch.writeAndFlush() 代码,则仅会打印 1 2 3
  • 类似的,出站处理器中,super.write(ctx, msg, promise)或ctx.write(msg, promise) 的调用也会 触发上一个出站处理器

    • 如果注释掉 h6 处 super.write(ctx, msg, promise) 代码,则仅会打印 1 2 3 6
  • ctx.channel().write(msg) vs ctx.write(msg)

    • 都是触发出站处理器的执行
    • ctx.channel().write(msg) 从尾部开始查找出站处理器
    • ctx.write(msg) 是从当前节点找上一个出站处理器
    • 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
    • 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己(死循环)
      在这里插入图片描述
      服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序

EmbeddedChannel

EmbeddedChannel 是 netty提供的专门用来测试的channel。
使用EmbeddedChannel进行测试可以不启动服务器和客户端了。

EmbeddedChannel 的方法

  • writeInbound(Object msg):将入站消息写到 EmbeddedChannel 中。
  • writeOutbound(Object msg):将出站消息写到 EmbeddedChannel 中。
  • readInbound():从 EmbeddedChannel 中读取入站消息。
  • readOutbound():从 EmbeddedChannel 中读取出站消息。
  • finish():完成所有未完成的写操作,并关闭 EmbeddedChannel

@Slf4j
public class Test06EmbeddedChannel {public static void main(String[] args) {ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("2");super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("3");super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("4");super.write(ctx, msg, promise);}};EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);// 模拟入站操作
//        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));/*[DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 1[DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 2*/// 模拟出站操作channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));/*[DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 4[DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 3*/}
}

关键字:Netty笔记05-组件Handler Pipeline

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: