четверг, февраля 25, 2016

Client notification using ZeroMQ

Итак нам понадобилось реализовать механизм рассылки сообщений. Дополнительных требований таких как гарантированная доставка не предъявляется. Фактически нам подойдет примитивное сокетное взаимодействие.
На практике обычно стоит обратная задача, когда надо реализовать сервер выдерживающий большую нагрузку. Есть множество решений как ее решать, используя традиционные блокирующие сокеты, или что будет лучшим выбором, используя асинхронное взаимодействие NIO в экосистеме java. Например можно взять готовый клиент-серверный фремворк netty.
У нас же стоит обратная задача, сервер должен уведомлять большое количество клиентов.
В enterpise системах, для этой цели обычно используются сиcтемы MQ. Но в нашем pet-project можно использовать кое-что поинтереснее.


Используя для нашей задачи традиционные сокеты, нам бы потребовалось заводить в отдельный поток каждый сокет клиента, и при наступлении события перебирать их в цикле, решая в том числе проблемы с дисконектами. А сколько при этом потребуется ресурсов, если клиентов будет хотя бы больше 1000.
И вот для этого нам идеально подойдет библиотека ZeroMQ.
Подробно рассказывать о ней я не буду, информации в интернетах достаточно. Отмечу просто что кроме традиционного взаимодействия REQUEST/REPLY библиотека предоставляет возможность взаимодействия по типу PUBLISH/SUBSCRIBE.

Это то что нам нужно.
Итак архитектура механизма доставки следующая:

  • PubServer.java - сервер доставки получает сообщение из ActiveMQ в примитивном формате из которого получает имя топик и само сообщение. И собственно отправляет его в топик нужному клиенту.
  • SubClient.java- принимает имя топика из которого будет получать сообщения устанавливает соединение с сервером и подписывается на топик.

Реализация ZeroMQ предоставлена официально поддерживамой библиотекой jeromq для java.

package com.bssys.zeromq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class PubServer {

 private Socket publisher = null;

 public PubServer() throws Exception {
  initJMSConnection();
  initZMQ();
 }

 private void initZMQ() {
  Context context = ZMQ.context(1);
  publisher = context.socket(ZMQ.PUB);
  publisher.bind("tcp://*:5563");

  Runtime.getRuntime().addShutdownHook(new Thread() {
   public void run() {
    System.out.println("ShutDown");
    if (publisher != null) {
     publisher.close();
    }
    if (context != null) {
     context.term();
    }
   }
  });
 }

 private void initJMSConnection() {
  Connection connection = null;
  Session session = null;
  MessageConsumer consumer = null;
  try {
   ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
   connection = connectionFactory.createConnection();
   connection.start();
   session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   Destination adminQueue = session.createQueue("TEST_QUEUE");
   consumer = session.createConsumer(adminQueue);
   for (int i = 0; i <= 10; i++) {
    consumer.setMessageListener(new ConsumerMessageListener());
   }

  } catch (Exception e) {
   if (consumer != null) {
    try {
     consumer.close();
    } catch (JMSException e1) {
     e1.printStackTrace();
    }
   }
   if (session != null) {
    try {
     session.close();
    } catch (JMSException e1) {
     e1.printStackTrace();
    }
   }
   if (connection != null) {
    try {
     connection.close();
    } catch (JMSException e1) {
     e1.printStackTrace();
    }
   }
   throw new IllegalStateException("JMS connection fail");
  }
 }

 public synchronized void send(String topic, String msg) {
  publisher.sendMore(topic);
  publisher.send(msg);
 }

 class ConsumerMessageListener implements MessageListener {

  @Override
  public void onMessage(Message jmsMessage) {
   System.out.println(Thread.currentThread().getName()+" recive message");
   if (jmsMessage instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) jmsMessage;
    try {
     String text = textMessage.getText();
     String[] split = text.split(":");
     if (split != null && split.length > 1) {
      String topic = split[0];
      String msg = split[1];
      send(topic, msg);
     }
    } catch (JMSException e) {
     e.printStackTrace();
    }
   }
  }
 }

 public static void main(String arg[]) {
  try {
   new PubServer();
  } catch (Exception e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
}
package com.bssys.zeromq;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class SubClient implements Runnable {

 private Context context;
 private Socket subscriber;
 
 private String topic;

 public SubClient(String topic) {
  this.topic = topic;
  context = ZMQ.context(1);
  subscriber = context.socket(ZMQ.SUB);
  
  ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
  newCachedThreadPool.submit(this);
  
  Runtime.getRuntime().addShutdownHook(new Thread() {
   public void run() {
    System.out.println("ShutDown");
    subscriber.close();
    context.term();
   }
  });

 }

 @Override
 public void run() {
  subscriber.connect("tcp://localhost:5563");
  subscriber.subscribe(topic.getBytes());
  while (!Thread.currentThread().isInterrupted()) {
   // Read envelope with address
   String address = subscriber.recvStr();
   // Read message contents
   String contents = subscriber.recvStr();
   System.out.println(address + " : " + contents);
  }
 }
 
 public static void main(String args[]){
  if (args!=null && args.length>0){
   new SubClient(args[0]);
  }else {
   System.out.println("java SubClient A");
  }
 }

}

Комментариев нет: