Посмотрим внимательно на NIO2.0 асинхронную сокетную модель появившуюся еще в Java 7 и напишем полностью асинхронный сервер и клиент по типу request/reply.
Асинхронный IO для сокетного взаимодействия представлен следующими классами:
Очевидно что для полного асинхронного взаимодействия надо использовать CompletionHandler. При этом надо действовать аккуратно чтобы не свалиться calback hell.
Интерфейс CompletionHandler следующий:
A = тип объекта вложения при выполнении операции. Используется для того чтобы передавать контекст. Фактически это сессия текущего запроса.
Для реализации сервера нам в общем случае понадобится 3 реализации CompletionHandler
EchoServer
Асинхронный IO для сокетного взаимодействия представлен следующими классами:
- AsynchronousSocketChannel
- AsynchronousServerSocketChannel
- Future
- CompletionHandler
- Использование блокирующего вызова get(), который сразу вернет результат
AsynchronousSocketChannel worker = future.get();
- Проверка результата выполнения
while(!future.isDone()) { // do something Thread.sleep(100); }
Очевидно что для полного асинхронного взаимодействия надо использовать CompletionHandler. При этом надо действовать аккуратно чтобы не свалиться calback hell.
Интерфейс CompletionHandler следующий:
interface CompletionHandler<V,A> {
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);
}
V = тип результирующего значенияA = тип объекта вложения при выполнении операции. Используется для того чтобы передавать контекст. Фактически это сессия текущего запроса.
Для реализации сервера нам в общем случае понадобится 3 реализации CompletionHandler
- для обработки подключения клиента
CompletionHandler<AsynchronousSocketChannel, ? super A> - для чтения запроса клиента
CompletionHandler<Integer, ? super A> - для отправки ответа клиенту
CompletionHandler<Integer, ? super A>
- для подключения к серверу
CompletionHandler<Void, ? super A> - для отправки запроса на сервер
CompletionHandler<Integer, ? super A> - для чтения ответа серверу
CompletionHandler<Integer, ? super A>
listener.accept(state, new CompletionHandler<AsynchronousSocketChannel, SessionState>() {
@Override
public void completed(final AsynchronousSocketChannel socketChannel, SessionState attachment) {
ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
socketChannel.read(inputBuffer, attachment, new CompletionHandler<Integer, SessionState>(){
@Override
public void completed(Integer result, SessionState attachment) {
ByteBuffer outputBuffer = ByteBuffer.allocate(1024);;
socketChannel.write(outputBuffer, attachment,new CompletionHandler<Integer, SessionState>(){
@Override
public void completed(Integer result, SessionState attachment) {
}
@Override
public void failed(Throwable exc, SessionState attachment) {
}
});
}
@Override
public void failed(Throwable exc, SessionState attachment) {
}
});
}
@Override
public void failed(Throwable exc, SessionState attachment) {
}
});
Конечно код по такому шаблону неудобно сопровождать. Лучше всего разнести реализацию CompletionHandler в отдельные классы.
Итак вот итоговый результат EchoServer. Исходный код https://github.com/Hibernate2009/nio-echoserver.gitEchoServer
package com.bssys.nio2.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.bssys.nio2.SessionState;
public class EchoServer {
public void run() throws IOException {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(newFixedThreadPool);
final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group);
InetSocketAddress address = new InetSocketAddress("localhost", 3333);
listener.bind(address);
AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler(listener);
SessionState state = new SessionState();
listener.accept(state, acceptCompletionHandler);
}
public static void main(String[] args) throws IOException {
System.out.println("Async server started");
new EchoServer().run();
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
AcceptCompletionHandler
package com.bssys.nio2.server;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.bssys.nio2.SessionState;
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, SessionState> {
private AsynchronousServerSocketChannel listener;
public AcceptCompletionHandler(AsynchronousServerSocketChannel listener) {
this.listener = listener;
}
@Override
public void completed(AsynchronousSocketChannel socketChannel, SessionState sessionState) {
// accept the next connection
SessionState newSessionState = new SessionState();
listener.accept(newSessionState, this);
// handle this connection
ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
ServerReadCompletionHandler readCompletionHandler = new ServerReadCompletionHandler(socketChannel, inputBuffer);
socketChannel.read(inputBuffer, sessionState, readCompletionHandler);
}
@Override
public void failed(Throwable exc, SessionState sessionState) {
// Handle connection failure...
}
}
ServerReadCompletionHandler
package com.bssys.nio2.server;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.bssys.nio2.SessionState;
public class ServerReadCompletionHandler implements CompletionHandler<Integer, SessionState> {
private AsynchronousSocketChannel socketChannel;
private ByteBuffer inputBuffer;
public ServerReadCompletionHandler(AsynchronousSocketChannel socketChannel, ByteBuffer inputBuffer) {
this.socketChannel = socketChannel;
this.inputBuffer = inputBuffer;
}
@Override
public void completed(Integer bytesRead, SessionState sessionState) {
byte[] buffer = new byte[bytesRead];
inputBuffer.rewind();
inputBuffer.get(buffer);
String message = new String(buffer);
System.out.println("Received message from client : " + message);
ServerWriteCompletionHandler writeCompletionHandler = new ServerWriteCompletionHandler(socketChannel);
ByteBuffer outputBuffer = ByteBuffer.wrap(buffer);
socketChannel.write(outputBuffer, sessionState, writeCompletionHandler);
}
@Override
public void failed(Throwable exc, SessionState attachment) {
// Handle read failure.....
}
}
ServerWriteCompletionHandler
package com.bssys.nio2.server;
import java.io.IOException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.bssys.nio2.SessionState;
public class ServerWriteCompletionHandler implements CompletionHandler<Integer, SessionState> {
private AsynchronousSocketChannel socketChannel;
public ServerWriteCompletionHandler(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void completed(Integer bytesWritten, SessionState sessionState) {
try {
socketChannel.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, SessionState attachment) {
// Handle write failure.....
}
}
Неплохая презентация по работе с буферами http://www.kgeorgiy.info/courses/java-advanced/slides/nio.xhtml#(1)
Комментариев нет:
Отправить комментарий