一、核心编排组件:ChannelPipeLine
ChannelPipeLine是Netty的核心编排组件,负责调度各类ChannelHandler,实际的加工处理由ChannelHandler完成。
ChannelPipeLine可以看做是ChannelHandler的容器,包含一组ChannelHandler实例,内部通过双向链表将ChannelHandler链接在一起。当有I/O读写事件时,依次调用ChannelHandler列表对Channel的数据进行拦截和处理。
每个Channel绑定一个ChannelPipeLine,每个ChannelPipeLine包含多个ChannelHandlerContext,所有ChannelHandlerCoontext组成双向链表。每个ChannelHandler对应一个ChannelHandlerContext,ChannelHandlerContext可以保存ChannelHandler上下文,同时包含ChannelHandler生命周期的所有事件(如 connect、bind、read、flush、write、close 等)。基于ChannelHandlerContext的封装,可以提取事件传递的前置和后置通用逻辑,降低耦合性。
ChannelPipeLine中包含两大类处理器:InboudHandler入站处理器和OutboundHandler出站处理器,内部的双向链表维护了HeadContext和TailContext的头尾节点,自定义的ChannelHandler会插入两个节点之间。
HeadContext既是Inbound处理器,又是Outbound处理器,分别实现了ChannelInboudHandler和ChannelOutboudHandler。网络数据的写入操作入口就是由HeadContext完成。HeadContext作为头结点负责读取数据并传递InBound事件,当数据处理完后,数据反方向经过Outbound处理器,最终又传到HeadContext,所有HeadContext又是处理OutBound事件的最后一站。此外,HeadContext在传递时间之前还会执行一些前置操作。
TailContext只实现了ChannelOutboudHandler,在ChannelInboundHandler调用链路的最后一步执行,用于终止InBound事件的传播。作为OutBound事件传播的第一站,仅仅是将OutBound事件传递给下一个节点。
Netty支持由Channel直接触发事件,这样调用链路将会贯穿整个ChannelPipeLine。同时,也可以在某一个ChannelHandlerContext触发事件传播,这样只会从当前ChannelHandler开始事件传播,不会从头贯穿到尾。
二、事件处理器:ChannelHandler
ChannelHandler是围绕I/O事件的生命周期(建立连接、读数据、写数据、连接销毁)设计的,包含两个重要子接口:ChannelIInboudHandler和ChannelOutboundHandler,分别拦截入站和出站的各种I/O事件。
ChannelInboudHandler的事件回调方法
事件 | 方法 |
---|---|
新的客户端连接事件 | handlerAdded |
通道注册事件 | channelRegistered |
通道处于活动状态事件 | channelActive |
通道数据可读取事件 | channelRead0 |
通道数据读取完毕事件 | channelReadComplete |
通道进入非活动状态事件 | channelInactive |
通道移除事件 | channelUnregistered |
处理器移除事件(断开连接) | handlerRemoved |
异常发生事件 | exceptionCaught |
ChannelOutboundHandler的事件回调方法
三、事件传播机制
ChannelPipeLine中的处理器分为InboundHandler和OutboundHandler两种处理器。InBound事件的传播方向为:Head –> Tail,而OutBound事件的传播方向为:Tail –> Head。
通过以下的代码演示ChannelPipeLine的时间传播机制:
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new SampleInBoundHandler("SampleInBoundHandlerA", false)).addLast(new SampleInBoundHandler("SampleInBoundHandlerB", false)).addLast(new SampleInBoundHandler("SampleInBoundHandlerC", true));ch.pipeline().addLast(new SampleOutBoundHandler("SampleOutBoundHandlerA")).addLast(new SampleOutBoundHandler("SampleOutBoundHandlerB")).addLast(new SampleOutBoundHandler("SampleOutBoundHandlerC"));}}public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {private final String name;private final boolean flush;public SampleInBoundHandler(String name, boolean flush) {this.name = name;this.flush = flush;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("InBoundHandler: " + name);if (flush) {ctx.channel().writeAndFlush(msg);} else {super.channelRead(ctx, msg);}}}public class SampleOutBoundHandler extends ChannelOutboundHandlerAdapter {private final String name;public SampleOutBoundHandler(String name) {this.name = name;}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutBoundHandler: " + name);super.write(ctx, msg, promise);}}
最终的控制台输出为:
四、异常传播机制
ChannelPipeLine中的事件传播采用了经典的责任链模式,调用链路环环相扣。但是,如果有一个节点处理逻辑出现异常会怎么样?当用户在自定义的ChannelHandler中对异常没有进行拦截,最终会由TailContext进行拦截。
通过以下代码演示,第一个A节点会抛出RunTimeException。同时重写ChannelInboundHandlerAdapter的exceptionCaught方法,直在开头加上控制台输出
public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {private final String name;private final boolean flush;public SampleInBoundHandler(String name, boolean flush) {this.name = name;this.flush = flush;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println("InBoundHandler: " + name);if (flush) {ctx.channel().writeAndFlush(msg);} else {throw new RuntimeException("InBoundHandler: " + name);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.out.println("InBoundHandlerException: " + name);ctx.fireExceptionCaught(cause);}}
最终的控制台输出结果为:
实际在用Netty进行开发时,推荐对异常进行同一拦截,然后根据实际业务场景进行更加完善的异常处理机制,参考如下方式:
具体的代码如下:
public class ExceptionHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {if (cause instanceof RuntimeException) {System.out.println("Handle Business Exception Success.");}}}
最终的控制台输出结果为: