четверг, марта 03, 2016

Understanding NIO2.0 Asynchronous I/O. EchoServer

Посмотрим внимательно на NIO2.0 асинхронную сокетную модель появившуюся еще в Java 7 и напишем полностью асинхронный сервер и клиент по типу request/reply.
Асинхронный IO для сокетного взаимодействия представлен следующими классами:
  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
Есть два подхода работы с ними. Используя:
  • Future
  • CompletionHandler
Использование Future предполагает следующие действия
  • Использование блокирующего вызова get(), который сразу вернет результат
    AsynchronousSocketChannel worker = future.get();
  • Проверка результата выполнения
    while(!future.isDone()) {
     // do something
            Thread.sleep(100);
    }
Работа с Future моделью в любом случае предполагает блокировку потока выполнения. Я вообще не вижу смысла в таком случае в использовании асинхронного IO. Какой в смысл, если в результате у нас все равно возникают блокировки, и при этом нужно будет выносить процессинг в отдельный поток. Такое может понадобиться только в самых простых случаях.

Очевидно что для полного асинхронного взаимодействия надо использовать CompletionHandler. При этом надо действовать аккуратно чтобы не свалиться calback hell.


Интерфейс CompletionHandler следующий:
interface CompletionHandler<V,A> {
 void completed(V result, A attachment);
 void failed(Throwable exc, A attachment);
}
V =  тип результирующего значения
A = тип объекта вложения при выполнении операции. Используется для того чтобы передавать контекст. Фактически это сессия текущего запроса.

Для реализации сервера нам в общем случае понадобится 3 реализации CompletionHandler

  • для обработки подключения клиента
    CompletionHandler<AsynchronousSocketChannel, ? super A>
  • для чтения запроса клиента
    CompletionHandler<Integer, ? super A>
  • для отправки ответа клиенту
    CompletionHandler<Integer, ? super A>
Для клиента аналогично
  • для подключения к серверу
    CompletionHandler<Void, ? super A>
  • для отправки запроса на сервер
    CompletionHandler<Integer, ? super A>
  • для чтения ответа серверу
    CompletionHandler<Integer, ? super A>
Итак вот шаблон реализации асинхронного сервера calback hell исполнении. Инициализацию и поднятие сервера опустим, ее вы можете увидеть в итоговой реализации.
listener.accept(state, new CompletionHandler<AsynchronousSocketChannel, SessionState>() {
 @Override
 public void completed(final AsynchronousSocketChannel socketChannel, SessionState attachment) {
  ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
  socketChannel.read(inputBuffer, attachment, new CompletionHandler<Integer, SessionState>(){
   @Override
   public void completed(Integer result, SessionState attachment) {
    ByteBuffer outputBuffer = ByteBuffer.allocate(1024);;
    socketChannel.write(outputBuffer, attachment,new CompletionHandler<Integer, SessionState>(){
     @Override
     public void completed(Integer result, SessionState attachment) {
      
     }
     @Override
     public void failed(Throwable exc, SessionState attachment) {
      
     }
    });
   }
   @Override
   public void failed(Throwable exc, SessionState attachment) {
    
   }
  });
 }
 @Override
 public void failed(Throwable exc, SessionState attachment) {
  
 }
});
Конечно код по такому шаблону неудобно сопровождать. Лучше всего разнести реализацию CompletionHandler в отдельные классы. Итак вот итоговый результат EchoServer. Исходный код https://github.com/Hibernate2009/nio-echoserver.git
EchoServer
package com.bssys.nio2.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.bssys.nio2.SessionState;

public class EchoServer {

 public void run() throws IOException {
  ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
  AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(newFixedThreadPool);

  final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group);
  InetSocketAddress address = new InetSocketAddress("localhost", 3333);
  listener.bind(address);

  AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler(listener);

  SessionState state = new SessionState();
  listener.accept(state, acceptCompletionHandler);
  
 }

 public static void main(String[] args) throws IOException {
  System.out.println("Async server started");
  new EchoServer().run();
  try {
   Thread.currentThread().join();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }
}
AcceptCompletionHandler
package com.bssys.nio2.server;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

import com.bssys.nio2.SessionState;

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, SessionState> {

 private AsynchronousServerSocketChannel listener;

 public AcceptCompletionHandler(AsynchronousServerSocketChannel listener) {
  this.listener = listener;
 }

 @Override
 public void completed(AsynchronousSocketChannel socketChannel, SessionState sessionState) {
  // accept the next connection
  SessionState newSessionState = new SessionState();
  listener.accept(newSessionState, this);

  // handle this connection
  ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
  ServerReadCompletionHandler readCompletionHandler = new ServerReadCompletionHandler(socketChannel, inputBuffer);
  socketChannel.read(inputBuffer, sessionState, readCompletionHandler);
 }

 @Override
 public void failed(Throwable exc, SessionState sessionState) {
  // Handle connection failure...
 }
}
ServerReadCompletionHandler
package com.bssys.nio2.server;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

import com.bssys.nio2.SessionState;

public class ServerReadCompletionHandler implements CompletionHandler<Integer, SessionState> {

 private AsynchronousSocketChannel socketChannel;
 private ByteBuffer inputBuffer;

 public ServerReadCompletionHandler(AsynchronousSocketChannel socketChannel, ByteBuffer inputBuffer) {
  this.socketChannel = socketChannel;
  this.inputBuffer = inputBuffer;
 }

 @Override
 public void completed(Integer bytesRead, SessionState sessionState) {

  byte[] buffer = new byte[bytesRead];
  inputBuffer.rewind();

  inputBuffer.get(buffer);
  String message = new String(buffer);

  System.out.println("Received message from client : " + message);
  
  ServerWriteCompletionHandler writeCompletionHandler = new ServerWriteCompletionHandler(socketChannel);
  
  ByteBuffer outputBuffer = ByteBuffer.wrap(buffer);
  socketChannel.write(outputBuffer, sessionState, writeCompletionHandler);
 }

 @Override
 public void failed(Throwable exc, SessionState attachment) {
  // Handle read failure.....
 }
}
ServerWriteCompletionHandler
package com.bssys.nio2.server;

import java.io.IOException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

import com.bssys.nio2.SessionState;

public class ServerWriteCompletionHandler implements CompletionHandler<Integer, SessionState> {

 private AsynchronousSocketChannel socketChannel;
 

 public ServerWriteCompletionHandler(AsynchronousSocketChannel socketChannel) {
  this.socketChannel = socketChannel;
 }

 @Override
 public void completed(Integer bytesWritten, SessionState sessionState) {
  try {
   socketChannel.close();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

 @Override
 public void failed(Throwable exc, SessionState attachment) {
  // Handle write failure.....
 }
}
Неплохая презентация по работе с буферами http://www.kgeorgiy.info/courses/java-advanced/slides/nio.xhtml#(1)

Комментариев нет: