1. 首页
  2. >
  3. 编程技术
  4. >
  5. Java

springboot整合netty替代websocket

步骤

1 先写好基本的Netty客户端和Netty服务的代码。参考文章【netty技术基础入门】
2.搭建好基本的Springboot项目。
3.将Netty服务端代码的启动代码和关闭代码分离,服务端加上@Component注解,交由Spring管理实例。
4.Springboot启动时,将Netty服务给启动;同时Springboot停止时,将Netty服务销毁。

实现

Netty服务端

主要工作:
将Netty服务端代码的启动代码和关闭代码分离,服务端加上@Component注解,交由Spring管理实例

/**
 * 服务端
 * 1.创建一个ServerBootstrap的实例引导和绑定服务器。
 * 2.创建并分配一个NioEventLoopGroup实例以进行事件的处理,比如接受连接以及读写数据。
 * 3.指定服务器绑定的本地的InetSocketAddress。
 * 4.使用一个EchoServerHandler的实例初始化每一个新的Channel。
 * 5.调用ServerBootstrap.bind()方法以绑定服务器。
 */ @Slf4j @Component public class EchoServer { /**
     * NioEventLoop并不是一个纯粹的I/O线程,它除了负责I/O的读写之外
     * 创建了两个NioEventLoopGroup,
     * 它们实际是两个独立的Reactor线程池。
     * 一个用于接收客户端的TCP连接,
     * 另一个用于处理I/O相关的读写操作,或者执行系统Task、定时任务Task等。
     */ private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private Channel channel; /**
     * 启动服务
     * @param hostname
     * @param port
     * @return
     * @throws Exception
     */ public ChannelFuture start(String hostname,int port) throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); ChannelFuture f = null; try { //ServerBootstrap负责初始化netty服务器,并且开始监听端口的socket请求 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(hostname,port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //                            为监听客户端read/write事件的Channel添加用户自定义的ChannelHandler socketChannel.pipeline().addLast(serverHandler); } }); f = b.bind().sync(); channel = f.channel(); log.info("======EchoServer启动成功!!!========="); } catch (Exception e) { e.printStackTrace(); } finally { if (f != null && f.isSuccess()) { log.info("Netty server listening " + hostname + " on port " + port + " and ready for connections..."); } else { log.error("Netty server start up Error!"); } } return f; } /**
     * 停止服务
     */ public void destroy() { log.info("Shutdown Netty Server..."); if(channel != null) { channel.close();} workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); log.info("Shutdown Netty Server Success!"); } } 

服务端业务处理handler

服务端的生命周期以及接收客户端的信息收发和处理。这里不建议使用阻塞的操作,容易影响netty的性能。

/***
 * 服务端自定义业务处理handler
 */ public class EchoServerHandler extends ChannelInboundHandlerAdapter { /**
     * 对每一个传入的消息都要调用;
     * @param ctx
     * @param msg
     * @throws Exception
     */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("server received: "+in.toString(CharsetUtil.UTF_8)); ctx.write(in); } /**
     * 通知ChannelInboundHandler最后一次对channelRead()的调用时当前批量读取中的的最后一条消息。
     * @param ctx
     * @throws Exception
     */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } /**
     * 在读取操作期间,有异常抛出时会调用。
     * @param ctx
     * @param cause
     * @throws Exception
     */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } 

Springboot启动服务端代码

CommandLineRunner #run()
这里主要是通过CommandLineRunner 接口的run方法,实现在项目启动后执行的功能,SpringBoot提供的一种简单的实现方案就是添加一个model并实现CommandLineRunner接口,实现功能的代码放在实现的run方法中。

addShutdownHook()
而 Runtime.getRuntime().addShutdownHook(shutdownHook); 这个方法的意思就是在jvm中增加一个关闭的钩子,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁等操作。

详细代码如下:

@SpringBootApplication public class SpringNettyApplication implements CommandLineRunner { @Value("${netty.port}") private int port; @Value("${netty.url}") private String url; @Autowired private EchoServer echoServer; public static void main(String[] args) { SpringApplication.run(SpringNettyApplication.class, args); } @Override public void run(String... args) throws Exception { ChannelFuture future = echoServer.start(url,port); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { echoServer.destroy(); } }); //服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程 future.channel().closeFuture().syncUninterruptibly(); } } 

Netty客户端

它在本文中的作用主要是为了测试服务端是否正常运行,通过客户端连接Netty的服务端,看到消息是否正常通信。

/**
 * 客户端
 * 1.为初始化客户端,创建一个Bootstrap实例
 * 2.为进行事件处理分配了一个NioEventLoopGroup实例,其中事件处理包括创建新的连接以及处理入站和出站数据;
 * 3.当连接被建立时,一个EchoClientHandler实例会被安装到(该Channel的一个ChannelPipeline中;
 * 4.在一切都设置完成后,调用Bootstrap.connect()方法连接到远程节点。
 */ public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } /**
     * 运行流程:
     * @param args
     * @throws Exception
     */ public static void main(String[] args) throws Exception { new EchoClient("127.0.0.1",10010).start(); } private void start() throws Exception { /**
         * Netty用于接收客户端请求的线程池职责如下。
         * (1)接收客户端TCP连接,初始化Channel参数;
         * (2)将链路状态变更事件通知给ChannelPipeline
         */ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host,port)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new EchoClientHandler()); } }); //绑定端口 ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } catch (Exception e) { group.shutdownGracefully().sync(); } } } 

Netty客户端业务处理类

主要是监控Netty客户端的生命周期以及接收服务端的消息,往服务端发送消息等。

 /**
 * 客户端处理类
 */ public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /**
     * 在到服务器的连接已经建立之后将被调用
     * @param ctx
     * @throws Exception
     */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks !", CharsetUtil.UTF_8)); } /**
     * 当从服务器接收到一个消息时被调用
     * @param channelHandlerContext
     * @param byteBuf
     * @throws Exception
     */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { System.out.println("Client received: "+ byteBuf.toString(CharsetUtil.UTF_8)); } /**
     * 在处理过程中引发异常时被调用
     * @param ctx
     * @param cause
     * @throws Exception
     */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } 

总结

从整体来看,只不过是将Netty服务端从main函数启动方式改为交给Spring来管理启动和销毁的工作,也就说以后你有个什么代码要交给Spring管理的也是可以这样子处理。