пятница, марта 11, 2016

NIO2.0 Asynchronous I/O. WebSocket chat server

Для лучшего понимания NIO2.0 есть очень подходящая задача. А именно попробуем реализовать WebSocket chat server. В качестве клиента будет выступать браузер. Если кратко с помощью WebSocket можно создать двунаправленый канал связи, в частности между браузером и сервером. Клиент в общем может быть любым, главное чтобы он поддерживал нужный протокол.

При установлении соединения браузер и сервер обмениваются так называемым рукопожатием.
По сути клиент посылает обычный GET запрос примерного вида

GET / HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: 127.0.0.1
Origin: null
Sec-WebSocket-Key: tR+4GeaXR7PuLsQtvlsMTw==
Sec-WebSocket-Version: 13
Sec-WebSocket-Extensions: x-webkit-deflate-frame
Если сервер поддерживает websocket ответит следующим образом
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: ELY6l2B3+S/6vjfrkzFMurz/hhQ=
Браузер при этом оставляет открытым соединение. Что позволяет отправлять и получать данные в любое время.
Важную часть при установлении рукопожатия составляет обмен ключами. Особой магии тут нет.

  • Клиент посылает ключ в заголовке Sec-WebSocket-Key закодированный в base64.
  • Серверу нужно взять этот ключ добавить "магическую строку" 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 и построить по итоговой строке SHA-1 хеш, и передать его в заголовке ответа Sec-WebSocket-Accept



После этого сервер и клиент могут обмениваться сообщениями в специфическом формате, так называемыми фреймами.

0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-------+-+-------------+-------------------------------+ |F|R|R|R| opcode|M| Payload len | Extended payload length | |I|S|S|S| (4) |A| (7) | (16/64) | |N|V|V|V| |S| | (if payload len==126/127) | | |1|2|3| |K| | | +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + | Extended payload length continued, if payload len == 127 | + - - - - - - - - - - - - - - - +-------------------------------+ | |Masking-key, if MASK set to 1 | +-------------------------------+-------------------------------+ | Masking-key (continued) | Payload Data | +-------------------------------- - - - - - - - - - - - - - - - + : Payload Data continued ... : + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | Payload Data continued ... | +---------------------------------------------------------------+

На самом деле полностью для нашего примера реализовывать парсинг не обязательно. На просторах интернета я нашел готовую реализацию парсера фреймов, которая нам подойдет. В любом случае в продакшен такую поделку тащить ни в коем случае нельзя. Для этого лучше подойдут уже готовые имплементации.(Jetty, Spring и тп).

Итак для реализации чата на websocket нам понадобятся следующие CompletedHandler

  • AcceptCompletionHandler - принимающий подключение от браузера
  • HandShakeReadCompletionHandler - читающий handshake от клиента
  • HandShakeWriteCompletionHandler - handshake ответ сервера 
  • ReadCompletionHandler - принимающий сообщение от сервера
  • WriteCompletionHandler - отправляющий сообщение клиентам
  • WebSocketServer - main класс 

WebSocketServer - запускает основной поток на прием подключенией через AcceptCompletionHandler, хранит все подключения клиентов и позволяет отправить сообщение всем подключеным клиентам.

package com.bssys.ws_nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class WebSocketServer {

 private final List<AsynchronousSocketChannel> connections = Collections
   .synchronizedList(new ArrayList<AsynchronousSocketChannel>());

 public void run() throws IOException {
  ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
  AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(newFixedThreadPool);

  final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group);
  InetSocketAddress address = new InetSocketAddress("localhost", 3333);
  listener.bind(address);

  AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler(listener, this);

  listener.accept(null, acceptCompletionHandler);
 }

 public void addClient(AsynchronousSocketChannel client) {
  connections.add(client);
 }

 public void removeClient(AsynchronousSocketChannel channel) {
  connections.remove(channel);
 }

 public void writeMessageToClients(AsynchronousSocketChannel channel, String message) {
  synchronized (connections) {
   for (AsynchronousSocketChannel clientConnection : connections) {
    if (clientConnection != channel) {
     try {
      byte[] doWrite = WriteFrame.doWrite(message);
      ByteBuffer outputBuffer = ByteBuffer.wrap(doWrite);
      SessionState sessionState = new SessionState();
      clientConnection.write(outputBuffer, sessionState, new WriteCompletionHandler());
     } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
     }
    }
   }
  }
 }

 public static void main(String[] args) throws IOException {
  System.out.println("WebSocket server start...");
  new WebSocketServer().run();
  try {
   Thread.currentThread().join();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }
}
AcceptCompletionHandler - при успешном подключении клиента, запускает асинхронный обмен рукопожатием через HandShakeReadCompletionHandler

package com.bssys.ws_nio2;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {

 private AsynchronousServerSocketChannel listener;
 private WebSocketServer server;

 public AcceptCompletionHandler(AsynchronousServerSocketChannel listener, WebSocketServer server) {
  this.listener = listener;
  this.server = server;
 }

 public void completed(AsynchronousSocketChannel socketChannel, Void arg1) {
  System.out.println("client connected: " + socketChannel);

  listener.accept(null, this);

  ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
  HandShakeReadCompletionHandler readCompletionHandler = new HandShakeReadCompletionHandler(socketChannel,
    inputBuffer, server);
  SessionState sessionState = new SessionState();
  socketChannel.read(inputBuffer, sessionState, readCompletionHandler);
 }

 public void failed(Throwable arg0, Void arg1) {
 }
}

HandShakeReadCompletionHandler - читает переданное сообщение от клиента на установление рукопожатия. Генерирует ключ, подготавливает ответное сообщение handshake и оправляет его асинхронно с помощью HandShakeWriteCompletionHandler

package com.bssys.ws_nio2;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.security.NoSuchAlgorithmException;
import java.util.Map;

public class HandShakeReadCompletionHandler implements CompletionHandler<Integer, SessionState> {

 private AsynchronousSocketChannel socketChannel;
 private ByteBuffer inputBuffer;
 private WebSocketServer server;

 public HandShakeReadCompletionHandler(AsynchronousSocketChannel socketChannel, ByteBuffer inputBuffer,
   WebSocketServer server) {
  this.socketChannel = socketChannel;
  this.inputBuffer = inputBuffer;
  this.server = server;
 }

 public void completed(Integer bytesRead, SessionState sessionState) {

  byte[] buffer = new byte[bytesRead];
  inputBuffer.rewind();

  inputBuffer.get(buffer);
  String message = new String(buffer);

  System.out.println("Received message from client : " + message);

  HttpRequest httpRequest = new HttpRequest(message);
  Map<String, String> headers = httpRequest.getHeaders();

  inputBuffer.clear();

  String websocketKey = headers.get("Sec-WebSocket-Key").trim();
  try {
   String websocketAcceptKey = KeyGeneration.getKey(websocketKey);
   String response = String.format("HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n"
     + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: %s\r\n\r\n", websocketAcceptKey);
   ByteBuffer wrap = ByteBuffer.wrap(response.getBytes());
   HandShakeWriteCompletionHandler handShakeWriteCompletionHandler = new HandShakeWriteCompletionHandler(
     socketChannel, server);
   socketChannel.write(wrap, sessionState, handShakeWriteCompletionHandler);

  } catch (NoSuchAlgorithmException e) {
   e.printStackTrace();
  }
 }

 public void failed(Throwable exc, SessionState sessionState) {
 }
}

HandShakeWriteCompletionHandler -при успешной отправке handshake, сохраняет поключение, и запускает асинхронное чтение канала клиента. через ReadCompletionHandler

package com.bssys.ws_nio2;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class HandShakeWriteCompletionHandler implements CompletionHandler<Integer, SessionState> {

 private AsynchronousSocketChannel socketChannel;
 private WebSocketServer server;

 public HandShakeWriteCompletionHandler(AsynchronousSocketChannel socketChannel, WebSocketServer server) {
  this.socketChannel = socketChannel;
  this.server = server;
 }

 public void completed(Integer result, SessionState sessionState) {
  server.addClient(socketChannel);
  ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
  socketChannel.read(inputBuffer, sessionState, new ReadCompletionHandler(socketChannel, inputBuffer, server));
 }

 public void failed(Throwable exc, SessionState sessionState) {
 }
}
ReadCompletionHandler - Читает сообщение, декодирует его, отправляет его всем подключеным клиентам, и снова запускает чтение из канала, используя самого себя в качестве CompletionHandler. Там образом организуется цикл чтения сообщения от клиента. Все работает асинхронно.

package com.bssys.ws_nio2;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class ReadCompletionHandler implements CompletionHandler<Integer, SessionState> {

 private AsynchronousSocketChannel channel;
 private ByteBuffer inputBuffer;
 private WebSocketServer server;

 public ReadCompletionHandler(AsynchronousSocketChannel channel, ByteBuffer inputBuffer, WebSocketServer server) {
  this.channel = channel;
  this.inputBuffer = inputBuffer;
  this.server = server;
 }

 public void completed(Integer bytesRead, SessionState session) {
  if (bytesRead < 1) {
   System.out.println("Closing connection to " + channel);
   server.removeClient(channel);
  } else {

   byte[] buffer = new byte[bytesRead];
   inputBuffer.rewind();
   inputBuffer.get(buffer);

   byte[] decodeFrame = RawParse.parse(buffer).payload;
   String message = new String(decodeFrame);

   System.out.println("Received message from " + ":" + message);
   inputBuffer.clear();

   channel.read(inputBuffer, session, this);

   server.writeMessageToClients(channel, message);
  }
 }

 public void failed(Throwable exc, SessionState attachment) {
  server.removeClient(channel);
 }
}


Весь код вместе с клиентом на js на gihub
https://github.com/Hibernate2009/nio-websocket-server.git

Ссылки которые мне очень помогли самому разобраться с websocket'ами.

https://websockets.wordpress.com/the-lightweight-websockets-java-server/
http://stackoverflow.com/questions/18368130/how-to-parse-and-validate-a-websocket-frame-in-java
http://learn.javascript.ru/websockets

Комментариев нет: