Skip to content

EventLoop组件

1. EventLoop介绍

也就是事件循环对象,EventLoop本质是一个单线程执行器(同时维护了一个 Selector),里面有run方法处理 Channel上源源不断的io事件。
它的继承关系比较复杂:

  1. 一条线是继承自j.u.c.ScheduledExecutorService因此包含了线程池中所有的方法
  2. 另一条线是继承自netty自己的OrderedEventExecutor,
  • 提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
  • 提供了parent()方法来看看自己属于哪个EventLoopGroup

2. EventLoopGroup介绍

事件循环组,EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的io事件都由此EventLoop来处理(保证了io事件处理时的线程安全)

3. 使用EventLoopGroup

3.1 创建EventLoopGroup

EventLoopGroup是一个接口,使用需要通过实现类来创建:

java
public static void main(String[] args) throws IOException {
    // DefaultEventLoopGroup能处理普通任务,定时任务
    // 内部创建了两个EventLoop, 每个EventLoop维护一个线程
    EventLoopGroup group1 = new DefaultEventLoopGroup(2);
    // NioEventLoopGroup能处理IO事件,普通任务,定时任务
    // 不设置线程数大小,默认使用CPU核数*2
    EventLoopGroup group2 = new NioEventLoopGroup();
    // 获取下一个事件循环对象
    System.out.println(group1.next());
    System.out.println(group1.next());
    // 由于只有两个线程,这时打印又是第一个线程
    System.out.println(group1.next());
    for (EventExecutor eventExecutor : group2) {
        System.out.println(eventExecutor);
    }
}

输出:
Alt text

3.2 使用EventLoopGroup

java
EventLoopGroup group2 = new NioEventLoopGroup();
// 执行普通任务
group2.submit(()->{
    log.info(Thread.currentThread().getName()+"执行任务1。。。。");
});
group2.execute(()->{
    log.info(Thread.currentThread().getName()+"执行任务2。。。。");
});
// 执行定时任务,周期为2秒,初次执行延迟1秒
group2.scheduleAtFixedRate(()->{
    log.info("延时任务执行");
}, 1, 2, TimeUnit.SECONDS);

Alt text

💡优雅关闭

优雅关闭shutdownGracefully方法。该方法会首先切换EventLoopGroup到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。

3.3 NioEventLoop处理io事件

java
public static void main(String[] args) throws IOException, InterruptedException {

    new ServerBootstrap()
            // boss和worker的线程池,boss线程池处理连接,worker线程池处理读写
            .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) {
                            ByteBuf byteBuf = (ByteBuf) msg;
                            if (byteBuf != null) {
                                log.info(byteBuf.toString(Charset.defaultCharset()));
                            }
                        }
                    });
                }
            }).bind(8080).sync();
}

Netty的线程组可以细分,group()方法可以传递进去两个NioEventLoopGroup,分别用来作为boss线程和worker线程的。ServerSocketChannel只会有一个,所以从NioEventLoopGroup只会拿一个线程处理连接。 启动一个客户端,为了方便,这里使用网络助手: Alt text 运行输出如下:
Alt text 可以看出客户端连接Netty,一旦建立连接后,后续每次连接都是固定的线程负责处理channel,这样的好处就是避免接收的数据出现跨线程通信问题,比如出现半包情况下还是同一个线程处理,保证线程安全。 Alt text

4. 多线程优化

4.1 数据分工优化

目前接收数据处理都是使用IO线程进行处理,但如果某些数据处理比较耗时则会影响整体的Netty数据吞吐量,有必要对耗时操作使用单独线程处理。可以使用DefaultEventLoopGroup处理普通任务:

java
public static void main(String[] args) throws IOException, InterruptedException {
    // 使用DefaultEventLoopGroup线程池,用于处理普通任务
    DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);
    new ServerBootstrap()
            .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch)  {
                    ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                    ByteBuf byteBuf = (ByteBuf) msg;
                                    log.info(byteBuf.toString(Charset.defaultCharset()));
                                    ctx.fireChannelRead(msg); // 将数据传递给下一个handler
                                }
                            }).addLast(normalWorkers, "handler2", new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                    ByteBuf byteBuf = (ByteBuf) msg;
                                    log.info(byteBuf.toString(Charset.defaultCharset()));
                                }
                            });
                }
            }).bind("localhost", 8080).sync();
}

运行效果:
Alt text 可以看出handler2中数据处理使用的是DefaultEventLoopGroup线程组处理的。并且客户端Channel和对应的线程处理在DefaultEventLoopGroup线程组依然有效。

4.2 源码分析线程切换

深入源码分析:Netty在不同的handler中如何实现不同的线程切换的?
关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

java
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        // 返回下一个handler的eventloop
        EventExecutor executor = next.executor();
        // 当前handler的线程是否和eventloop是同一个线程
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人) 
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }