Skip to content

Channel

1. Channel的API

  • close() 可以用来关闭channel
  • closeFuture() 用来处理channel的关闭
    • sync()方法作用是同步等待channel关闭
    • addListener()方法是异步等待channel关闭
  • pipeline()方法添加处理器
  • write()方法将数据写入
  • writeAndFlush()方法将数据写入并刷出

2. ChannelFuture

在客户端执行connect()方法后,会得到ChannelFuture对象,我们直接使用.channel()方法得到channel对象向服务器端发消息会发现消息发不出去:

java
public static void main(String[] args) throws InterruptedException, IOException {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.channel(NioSocketChannel.class);
    bootstrap.group(new NioEventLoopGroup());
    bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringEncoder());
        }
    });
    ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("localhost", 8080));
    channelFuture.channel().writeAndFlush("hello");
}

原因是带有Future、Promise字样的对象在Netty中都是异步使用的,因此connect()方法是异步的,意味着并不马上建立连接,因此channelFuture得到的channel对象并不是正确能够使用的。

2.1 正确发送消息方式一

通过使用sync()方法可以让同步等待连接建立完成:

java
ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080);

System.out.println(channelFuture.channel()); // 1
channelFuture.sync(); // 2
System.out.println(channelFuture.channel()); // 3

Alt text

2.3 正确发送消息方式二

使用ChannelFuture的回调方式,在连接建立后进行回调:

java
channelFuture.addListener(new ChannelFutureListener() {
    // 使用nio线程去执行这个方法
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        Channel channel = future.channel();
        log.info("channel:{}", channel);
        channel.writeAndFlush("hello world!");
    }
});

当我们正常发送消息完毕后,希望关闭退出客户端,但是会发现我们的程序还在运行没有结束:
Alt text 原因在于NioEventLoopGroup线程池里的线程并没有结束,导致整个程序都还在运行中,那么如果关闭退出程序呢?

2.4 正确退出方式一

首先是关闭通道channel,然后阻塞等待关闭线程组NioEventLoopGroup:

java
public static void main(String[] args) throws InterruptedException, IOException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.channel(NioSocketChannel.class);
    bootstrap.group(group);
    bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringEncoder());
        }
    });
    ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("localhost", 8080));
    Channel channel = channelFuture.sync().channel();
    channel.writeAndFlush("hello");
    logger.info("发送数据成功");
    ChannelFuture closeFuture = channel.close();
    // 阻塞等待连接通道关闭
    closeFuture.sync();
    // 优雅关闭线程组, 也就是将现有的任务执行完, 才会关闭线程组并不接受新的任务
    group.shutdownGracefully();
}

close()方法类似也是异步非阻塞方法,返回一个ChannelFuture对象,此时需要执行sync()方法确保通道channel被关闭,防止主线程执行完毕优先退出。

2.5 正确退出方式二

首先还是先要关闭通道channel,使用channel的close()方法虽然立即返回,但主线程并不会退出,毕竟NioEventLoopGroup线程组还在运行。

java
public static void main(String[] args) throws InterruptedException, IOException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.channel(NioSocketChannel.class);
    bootstrap.group(group);
    bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringEncoder());
        }
    });
    ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("localhost", 8080));
    Channel channel = channelFuture.sync().channel();
    channel.writeAndFlush("hello");
    logger.info("发送数据成功");
    ChannelFuture closeFuture = channel.close();
    closeFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            // 优雅关闭线程组, 也就是将现有的任务执行完, 才会关闭线程组并不接受新的任务
            group.shutdownGracefully();
        }
    });
}

为什么不在一个线程中去执行建立连接、去执行关闭 channel,那样不是也可以吗?非要用这么复杂的异步方式:比如一个线程发起建立连接,另一个线程去真正建立连接?

很可能你会笼统地回答,因为netty异步方式用了多线程、多线程就效率高。其实这些认识都比较片面。

思考下面的场景,4个医生给人看病,每个病人花费20分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下每个医生工作1小时,处理的病人总数是:60/20=3
Alt text 经研究发现,看病可以细分为四个步骤,经拆分后每个步骤需要5分钟,如下:
Alt text 可以做如下图优化, 由于4个医生进行了分工,每个医生一个小时都能看12(60分钟/5)个病人, 可见效率一下子提高了4倍:
Alt text 只有一开始,医生2、3、4分别要等待5、10、15分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作。
可见必须配合多线程,合理进行拆分任务才是利用异步的关键。

3. 使用LoggingHandler

LoggingHandler是一个channelhandler,利用底层的日志框架记录事件日志。可以方便的查看整个Netty建立连接,收数和数据内容,最后关闭连接的顺序:

java
public static void main(String[] args) throws InterruptedException, IOException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.channel(NioSocketChannel.class);
    bootstrap.group(group);
    bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
            ch.pipeline().addLast(new StringEncoder());
        }
    });
    ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("localhost", 8080));
    channelFuture.addListener(new ChannelFutureListener() {
        // 使用nio线程去执行这个方法
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Channel channel = future.channel();
            logger.info("channel:{}", channel);
            channel.writeAndFlush("hello world!");
            logger.info("发送数据成功");
            channel.close().sync();
            group.shutdownGracefully();
            logger.info("关闭连接");
        }
    });
}

运行日志打印:

sh
07:57:41.222 [nioEventLoopGroup-2-1] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xeb1338c1] REGISTERED
07:57:41.226 [nioEventLoopGroup-2-1] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xeb1338c1] CONNECT: localhost/127.0.0.1:8080
07:57:41.242 [nioEventLoopGroup-2-1] INFO  com.rocket.demo.test.NettyTcpClient - channel:[id: 0xeb1338c1, L:/127.0.0.1:13491 - R:localhost/127.0.0.1:8080]
07:57:41.265 [nioEventLoopGroup-2-1] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xeb1338c1, L:/127.0.0.1:13491 - R:localhost/127.0.0.1:8080] WRITE: 12B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 21             |hello world!    |
+--------+-------------------------------------------------+----------------+
07:57:41.266 [nioEventLoopGroup-2-1] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xeb1338c1, L:/127.0.0.1:13491 - R:localhost/127.0.0.1:8080] FLUSH
07:57:41.268 [nioEventLoopGroup-2-1] INFO  com.rocket.demo.test.NettyTcpClient - 发送数据成功
07:57:41.268 [nioEventLoopGroup-2-1] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xeb1338c1, L:/127.0.0.1:13491 - R:localhost/127.0.0.1:8080] CLOSE
07:57:41.273 [nioEventLoopGroup-2-1] INFO  com.rocket.demo.test.NettyTcpClient - 关闭连接
07:57:41.275 [nioEventLoopGroup-2-1] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xeb1338c1, L:/127.0.0.1:13491 ! R:localhost/127.0.0.1:8080] ACTIVE
07:57:41.277 [nioEventLoopGroup-2-1] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xeb1338c1, L:/127.0.0.1:13491 ! R:localhost/127.0.0.1:8080] INACTIVE
07:57:41.278 [nioEventLoopGroup-2-1] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xeb1338c1, L:/127.0.0.1:13491 ! R:localhost/127.0.0.1:8080] UNREGISTERED

可以看到数据详细的通过十六进制的形式记录,清晰的展示连接的整个生命周期过程