1. 首页
  2. >
  3. 编程技术
  4. >
  5. Java

基于netty的构建一个群聊系统

要求

1.群聊系统可以实现服务器端和客户端之间的数据简单通讯(非阻塞)

2.通过系统可以实现多人群聊

3.服务器端:可以监控用户上线,离线,并实现消息转发功能

4.客户端:通过channel可以无阻塞发送消息给其它所有用户,同时可以接受到其它用户发送的消息(由服务端转发得到)

代码实现(服务端)

 import java.io.IOException;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.*;  import java.util.Iterator;    public class GroupChatServer {      private Selector selector;      private ServerSocketChannel listenChannel;      private static final int PORT=6667;            public GroupChatServer(){          try{              //得到选择器              selector=Selector.open();              listenChannel=ServerSocketChannel.open();              //绑定端口              listenChannel.socket().bind(new InetSocketAddress(PORT));              //设置非阻塞模式              listenChannel.configureBlocking(false);              //将该ListenChannel注册到selector              listenChannel.register(selector, SelectionKey.OP_ACCEPT);            }catch (IOException e){              e.printStackTrace();          }      }        //读取客户端的消息      public void readData(SelectionKey key){          //定义一个SocketChannel          SocketChannel channel=null;          try{              channel = (SocketChannel) key.channel();              //创建buffer              ByteBuffer buffer= ByteBuffer.allocate(1024);              int count=channel.read(buffer);              //根据count的值可以知道有没有督导数据              if(count >0){                  //把缓冲区的数据转变为字符串                  String msg=new String(buffer.array());                  //输出该消息                  System.out.println("客户端"+msg);                  //向其他的客户端转发给其他的客户端                  sendInfoToOtherClients(msg,channel);              }          }catch (Exception e){              //一旦读取不到客户端的消息,那么表示这个客户端下线了              try{                  System.out.println(channel.getRemoteAddress()+"离线了");                  //取消注册                  key.channel();                  //关闭通道                  channel.close();              }catch(IOException ex){                  ex.printStackTrace();              }            }      }      //给所有,除了自己channel发送消息      public  void sendInfoToOtherClients(String msg,SocketChannel channel) throws IOException {          System.out.println("服务器消息发送中");          //遍历所有          for(SelectionKey key:selector.keys()){              Channel targetChannel=key.channel();              //排除自己              if(targetChannel instanceof SocketChannel && targetChannel!=channel){                  SocketChannel dest=(SocketChannel) targetChannel;                  ByteBuffer buffer=ByteBuffer.wrap(msg.getBytes());                    //写入到通道                  dest.write(buffer);              }          }      }      //监听      public void listen(){          try{              while(true){                  int count =selector.select(2000);                  if(count>0){//有事件处理                      Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();                      while(iterator.hasNext()){                          SelectionKey  key= iterator.next();                            //判断不同的事件                          //监听到accept                          if(key.isAcceptable()) {                              SocketChannel sc=listenChannel.accept();                              sc.configureBlocking(false);                              //将该sc注册到Seletor                              sc.register(selector,SelectionKey.OP_READ);                              //提示某某上线                              System.out.println(sc.getRemoteAddress()+"上线");                          }                          //监听到发送过来数据                          if(key.isReadable()){                              //专门处理客户端的消息                              }                            //当前的key删除,防止重复处理                          iterator.remove();                      }                    }              }          }catch (Exception e){              e.printStackTrace();          }finally {            }      }        public static void main(String[] args) {          //创建一个服务器对象          GroupChatServer groupChatServer=new GroupChatServer();          groupChatServer.listen();      }  }  

代码实现(客户端)

 import java.io.IOException;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.SocketChannel;  import java.util.Iterator;  import java.util.Scanner;  import java.util.Set;    public class GroupChatClient {        //定义相关的属性      private final String HOST="127.0.0.1";      private final int POST=6667;      private Selector selector;      private SocketChannel socketChannel;      private String username;        public static void main(String[] args) throws Exception {          //启动我们的客户端          GroupChatClient chatClient=new GroupChatClient();          //启动一个线程          //每3秒读取从服务器发送数据          new Thread(){              public void run(){                  while(true){                      chatClient.readInfo();                      try{                          Thread.currentThread().sleep(3000);                      }catch (InterruptedException e){                          e.printStackTrace();                      }                  }              }          }.start();            Scanner scanner = new Scanner(System.in);            while(scanner.hasNextLine()){              String s=scanner.nextLine();              chatClient.sendInfo(s);          }      }        public GroupChatClient()throws Exception{          selector=Selector.open();          //连接服务器          socketChannel=socketChannel.open(new InetSocketAddress("127.0.0.1",POST));          //设置非阻塞          socketChannel.configureBlocking(false);          socketChannel.register(selector, SelectionKey.OP_READ);          username=socketChannel.getLocalAddress().toString().substring(1);          System.out.println(username+"is OK");      }        public void sendInfo(String info){          info=username+"说"+info;          try{              socketChannel.write(ByteBuffer.wrap(info.getBytes()));          }catch (IOException e){              e.printStackTrace();          }      }        //读取从服务器端发送过来的消息      public void readInfo(){          try{              int readChannels=selector.select();              if(readChannels >0){//有事件发生的通道                  Set<SelectionKey> selectionKeys = selector.selectedKeys();                  Iterator<SelectionKey> iterator = selectionKeys.iterator();                  while(iterator.hasNext()){                      SelectionKey key=iterator.next();                      if(key.isAcceptable()){                          SocketChannel sc= (SocketChannel) key.channel();                          ByteBuffer buffer=ByteBuffer.allocate(1024);                          //读取                          sc.read(buffer);                          String msg=new String(buffer.array());                          System.out.println(msg.trim());                          //删除当前的selectKey                          iterator.remove();                      }else{                        }                  }                }          }catch (Exception e){              e.printStackTrace();          }      }  }  

如上所示,客户端也使用了一个selector,这个selector只管理客户端自己,这样就可以通过它来监控了客户端的这个channel了。

这样就可以实现客户端和服务端的信息通信了。