EventLoop组件
1. EventLoop介绍
也就是事件循环对象,EventLoop本质是一个单线程执行器(同时维护了一个 Selector),里面有run方法处理 Channel上源源不断的io事件。
它的继承关系比较复杂:
- 一条线是继承自j.u.c.ScheduledExecutorService因此包含了线程池中所有的方法
- 另一条线是继承自netty自己的OrderedEventExecutor,
- 提供了
boolean inEventLoop(Thread thread)
方法判断一个线程是否属于此EventLoop - 提供了
parent()
方法来看看自己属于哪个EventLoopGroup
2. EventLoopGroup介绍
事件循环组,EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的io事件都由此EventLoop来处理(保证了io事件处理时的线程安全)
3. 使用EventLoopGroup
3.1 创建EventLoopGroup
EventLoopGroup是一个接口,使用需要通过实现类来创建:
public static void main(String[] args) throws IOException {
// DefaultEventLoopGroup能处理普通任务,定时任务
// 内部创建了两个EventLoop, 每个EventLoop维护一个线程
EventLoopGroup group1 = new DefaultEventLoopGroup(2);
// NioEventLoopGroup能处理IO事件,普通任务,定时任务
// 不设置线程数大小,默认使用CPU核数*2
EventLoopGroup group2 = new NioEventLoopGroup();
// 获取下一个事件循环对象
System.out.println(group1.next());
System.out.println(group1.next());
// 由于只有两个线程,这时打印又是第一个线程
System.out.println(group1.next());
for (EventExecutor eventExecutor : group2) {
System.out.println(eventExecutor);
}
}
输出:
3.2 使用EventLoopGroup
EventLoopGroup group2 = new NioEventLoopGroup();
// 执行普通任务
group2.submit(()->{
log.info(Thread.currentThread().getName()+"执行任务1。。。。");
});
group2.execute(()->{
log.info(Thread.currentThread().getName()+"执行任务2。。。。");
});
// 执行定时任务,周期为2秒,初次执行延迟1秒
group2.scheduleAtFixedRate(()->{
log.info("延时任务执行");
}, 1, 2, TimeUnit.SECONDS);
💡优雅关闭
优雅关闭shutdownGracefully
方法。该方法会首先切换EventLoopGroup
到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。
3.3 NioEventLoop处理io事件
public static void main(String[] args) throws IOException, InterruptedException {
new ServerBootstrap()
// boss和worker的线程池,boss线程池处理连接,worker线程池处理读写
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
if (byteBuf != null) {
log.info(byteBuf.toString(Charset.defaultCharset()));
}
}
});
}
}).bind(8080).sync();
}
Netty的线程组可以细分,group()
方法可以传递进去两个NioEventLoopGroup,分别用来作为boss线程和worker线程的。ServerSocketChannel只会有一个,所以从NioEventLoopGroup只会拿一个线程处理连接。 启动一个客户端,为了方便,这里使用网络助手: 运行输出如下:
可以看出客户端连接Netty,一旦建立连接后,后续每次连接都是固定的线程负责处理channel,这样的好处就是避免接收的数据出现跨线程通信问题,比如出现半包情况下还是同一个线程处理,保证线程安全。
4. 多线程优化
4.1 数据分工优化
目前接收数据处理都是使用IO线程进行处理,但如果某些数据处理比较耗时则会影响整体的Netty数据吞吐量,有必要对耗时操作使用单独线程处理。可以使用DefaultEventLoopGroup处理普通任务:
public static void main(String[] args) throws IOException, InterruptedException {
// 使用DefaultEventLoopGroup线程池,用于处理普通任务
DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);
new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
log.info(byteBuf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); // 将数据传递给下一个handler
}
}).addLast(normalWorkers, "handler2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
log.info(byteBuf.toString(Charset.defaultCharset()));
}
});
}
}).bind("localhost", 8080).sync();
}
运行效果: 可以看出handler2中数据处理使用的是DefaultEventLoopGroup线程组处理的。并且客户端Channel和对应的线程处理在DefaultEventLoopGroup线程组依然有效。
4.2 源码分析线程切换
深入源码分析:Netty在不同的handler中如何实现不同的线程切换的?
关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 返回下一个handler的eventloop
EventExecutor executor = next.executor();
// 当前handler的线程是否和eventloop是同一个线程
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}