一、Netty概述

1、Netty介绍

Netty官网:https://netty.io/

官方介绍:

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

Netty 是一个高性能、异步事件驱动的 NIO 框架,基于 JAVA NIO 提供的 API 实现。封装了比 NIO简单使用的API,同时修复了 NIO中的一些bug。

2、Netty 的优势

  • 1、API 使用简单,开发门槛低;
  • 2、功能强大,预置了多种编解码功能,支持多种主流协议;
  • 3、定制能力强,可以通过 ChannelHandler 对通信框架进行灵活地扩展;
  • 4、性能高,通过与其他业界主流的 NIO 框架对比,Netty 的综合性能最优;
  • 5、成熟、稳定,Netty 修复了已经发现的所有 JDK NIO BUG,业务开发人员不需要再为 NIO 的 BUG 而烦恼;
  • 6、社区活跃,版本迭代周期短,发现的 BUG 可以被及时修复,同时,更多的新功能会 加入;
  • 7、经历了大规模的商业应用考验,质量得到验证,比如:Dubbo、Elasticsearch都采用了Netty。

3、为什么 Netty 使用 NIO 而不是 AIO?

Netty 大多数都在 Linux 上的使用,在 Linux 系统上,AIO 的底层实现仍使用 EPOLL,没有很好实现AIO,因此在性能上没有明显的优势,而且被 JDK 封装了一层不容易深度优化。

AIO 还有个缺点是接收数据需要预先分配缓存,而不是 NIO 那种需要接收时才需要分配缓存,所以对连接数量非常大但流量小的情况, 内存浪费很多。

目前 Linux 上 AIO 不够成熟,处理回调结果速度跟不上处理需求。

作者原话:

Not faster than NIO (epoll) on unix systems (which is true) There is no daragram suppport Unnecessary threading model (too much abstraction without usage)

4、架构图:

在这里插入图片描述

二、第一个Hello Netty

首先搭建一个 maven工程,引入依赖,这里使用 Netty 4.1.42版本。

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
        </dependency>

1、服务端

1.1 创建服务端启动类

public class MyServer {

    private int port;

    public MyServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws InterruptedException {
        MyServer myServer = new MyServer(19091);
        System.out.println("服务器即将启动");
        myServer.start();
        System.out.println("服务器已关闭");

    }

    private void start() throws InterruptedException {
        //创建线程组
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();

        try {
            //创建服务端的启动对象,设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup)
                    //设置服务端通道实现类型
                    .channel(NioServerSocketChannel.class)
                    //使用本地地址,绑定端口号
                    .localAddress(new InetSocketAddress(port))
                    //使用匿名内部类的形式初始化通道对象
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //给pipeline管道设置处理器
                            socketChannel.pipeline().addLast(new MyServerHandler());
                        }
                    });
            System.out.println("MyServer 服务端已经准备就绪...");
            // 异步绑定,启动服务端, sysc()会阻塞到完成
            ChannelFuture channelFuture = bootstrap.bind().sync();
            System.out.println("服务器启动完成");
            //阻塞当前线程,对关闭通道进行监听(直到服务器 channel被关闭)
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully().sync();
        }

    }
}    

1.2 创建服务端处理器

public class MyServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("MyServerHandler 连接已建立...");
		super.channelActive(ctx);
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//获取客户端发送过来的消息
		ByteBuf in = (ByteBuf) msg;
		System.out.println("Server Accept Client Context ("+ ctx.channel().remoteAddress() +")消息 ->" + in.toString(CharsetUtil.UTF_8));
		ctx.writeAndFlush(in);

		// ctx.close();
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		//发送消息给客户端
		ByteBuf byteBuf = Unpooled.copiedBuffer("Server Receive Client msg", CharsetUtil.UTF_8);
		ctx.writeAndFlush(byteBuf);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//发生异常,关闭通道
		//cause.printStackTrace();
		ctx.close();
	}
}

2、客户端

2.1 创建客户端启动类

public class MyClient {

    private String host;
    private int port;

    public MyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public static void main(String[] args) throws InterruptedException {
        MyClient myClient = new MyClient("127.0.0.1", 19091);
        System.out.println("客户端即将启动");
        myClient.start();
        System.out.println("客户端已关闭");

    }

    private void start() throws InterruptedException {
        //创建线程组
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();

        try {
            //创建客户端的启动对象,设置参数
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(bossGroup)
                    //设置客户端的通道实现类型
                    .channel(NioSocketChannel.class)
                    //设置连接服务端的地址和端口号
                    .remoteAddress(new InetSocketAddress(host, port))
                    //使用匿名内部类初始化通道
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //添加客户端通道的处理器
                            socketChannel.pipeline().addLast(new MyClientHandler());
                        }
                    });
            System.out.println("客户端准备就绪,随时可以连接服务端");
            // 异步连接到服务端, sysc()会阻塞到完成,和服务器的不同点
            ChannelFuture channelFuture = bootstrap.connect().sync();
            System.out.println("客户端启动连接完成");
            //阻塞当前线程,对关闭通道进行监听(直到客户端的 channel被关闭)
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭线程组
            bossGroup.shutdownGracefully().sync();
        }
    }
}    

2.2 创建客户端处理器

SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter的子类。继承这两个都可以。

public class MyClientHandler extends SimpleChannelInboundHandler {

    /**
     * channel活跃后,做业务处理
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送消息到服务端
        ByteBuf byteBuf = Unpooled.copiedBuffer(" Hello, Netty", CharsetUtil.UTF_8);
        ctx.writeAndFlush(byteBuf);

    }

    /**
     * 读取到网络数据后进行业务处理。 这里处理完直接关闭连接
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服务端发送过来的消息
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Client Accept Server("+ ctx.channel().remoteAddress() +")消息 ->" + in.toString(CharsetUtil.UTF_8));

        //关闭连接
        //ctx.close();
    }
}

3、测试

先启动服务端,再启动客户端,结果如下:

MyServer打印结果:
在这里插入图片描述

MyClient打印结果:
在这里插入图片描述
Netty入门使用到此完成,更多了解后面接着玩。

参考文章:

– 求知若饥,虚心若愚。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐