понедельник, апреля 28, 2014

ActiveMQ Master/Slave and load balancer cluster

Пришло время объединить все технологии кластеризации ActiveMQ вместе, и продемонстрировать систему устойчивую к сбоям и позволяющую распределять нагрузку. Для примера построим архитектуру кластера на базе 8 инстансов ActiveMQ. Схема выглядит следующим образом.

Рассмотрим детально:

  • Инстансы связаные общим хранилищем Master/Slave 1-2, 3-4, 5-6, 7-8. По идее каждая пара разворачивается на отдельной машине. Для возможности развертывания на отдельной машине, все инстансы разнесены по портам.
  • С помощью Network Broker каждая пара с связана с каждой парой.
  • Для каждой пары на этом же хосте, с помощью Apache Camel поднимается HTTP точка, принимающая запросы от внешнего балансировщика и прокидывающая запрос в очередь.
  • Все инстансы из каждой группы могут делиться сообщения с другими группами
  • Отдельные клиенты разворачиваются каждый на своем хосте и читают сообщения и очереди.
  • Балансировщик для простоты тоже реализован с помощью Apache Camel
Ниже можно взглянуть на настройки каждого инстанса и исходный код.



instance1:
<networkConnectors>
 <networkConnector name="instance1-instance3-instance4" uri="masterslave:(tcp://localhost:61619,tcp://localhost:61620)" />
 <networkConnector name="instance1-instance5-instance6" uri="masterslave:(tcp://localhost:61621,tcp://localhost:61622)" />
 <networkConnector name="instance1-instance7-instance8" uri="masterslave:(tcp://localhost:61623,tcp://localhost:61624)" />
</networkConnectors>
<persistenceAdapter>
 <kahaDB directory="${activemq.data}/../../../kahadb1" />
</persistenceAdapter>
instance2:
<networkConnectors>
 <networkConnector name="instance2-instance3-instance4" uri="masterslave:(tcp://localhost:61619,tcp://localhost:61620)" />
 <networkConnector name="instance2-instance5-instance6" uri="masterslave:(tcp://localhost:61621,tcp://localhost:61622)" />
 <networkConnector name="instance2-instance7-instance8" uri="masterslave:(tcp://localhost:61623,tcp://localhost:61624)" />
</networkConnectors>
<persistenceAdapter>
 <kahaDB directory="${activemq.data}/../../../kahadb1" />
</persistenceAdapter>
instance3:
<networkConnectors>
 <networkConnector name="instance3-instance1-instance2" uri="masterslave:(tcp://localhost:61617,tcp://localhost:61618)" />
 <networkConnector name="instance3-instance5-instance6" uri="masterslave:(tcp://localhost:61621,tcp://localhost:61622)" />
 <networkConnector name="instance3-instance7-instance8" uri="masterslave:(tcp://localhost:61623,tcp://localhost:61624)" />
</networkConnectors>
<persistenceAdapter>
 <kahaDB directory="${activemq.data}/../../../kahadb2" />
</persistenceAdapter>

instance4:
<networkConnectors>
 <networkConnector name="instance4-instance1-instance2" uri="masterslave:(tcp://localhost:61617,tcp://localhost:61618)" />
 <networkConnector name="instance4-instance5-instance6" uri="masterslave:(tcp://localhost:61621,tcp://localhost:61622)" />
 <networkConnector name="instance4-instance7-instance8" uri="masterslave:(tcp://localhost:61623,tcp://localhost:61624)" />
</networkConnectors>
<persistenceAdapter>
 <kahaDB directory="${activemq.data}/../../../kahadb2" />
</persistenceAdapter>

instance5:
<networkConnectors>
 <networkConnector name="instance5-instance1-instance2" uri="masterslave:(tcp://localhost:61617,tcp://localhost:61618)" />
 <networkConnector name="instance5-instance3-instance4" uri="masterslave:(tcp://localhost:61619,tcp://localhost:61620)" />
 <networkConnector name="instance5-instance7-instance8" uri="masterslave:(tcp://localhost:61623,tcp://localhost:61624)" />
</networkConnectors>
<persistenceAdapter>
 <kahaDB directory="${activemq.data}/../../../kahadb3" />
</persistenceAdapter>

instance6:
<networkConnectors>
 <networkConnector name="instance6-instance1-instance2" uri="masterslave:(tcp://localhost:61617,tcp://localhost:61618)" />
 <networkConnector name="instance6-instance3-instance4" uri="masterslave:(tcp://localhost:61619,tcp://localhost:61620)" />
 <networkConnector name="instance6-instance7-instance8" uri="masterslave:(tcp://localhost:61623,tcp://localhost:61624)" />
</networkConnectors>
<persistenceAdapter>
 <kahaDB directory="${activemq.data}/../../../kahadb3" />
</persistenceAdapter>

instance7:
<networkConnectors>
 <networkConnector name="instance7-instance1-instance2" uri="masterslave:(tcp://localhost:61617,tcp://localhost:61618)" />
 <networkConnector name="instance7-instance3-instance4" uri="masterslave:(tcp://localhost:61619,tcp://localhost:61620)" />
 <networkConnector name="instance7-instance5-instance6" uri="masterslave:(tcp://localhost:61621,tcp://localhost:61622)" />
</networkConnectors>
<persistenceAdapter>
 <kahaDB directory="${activemq.data}/../../../kahadb4" />
</persistenceAdapter>

instance8:
<networkConnectors>
 <networkConnector name="instance8-instance1-instance2" uri="masterslave:(tcp://localhost:61617,tcp://localhost:61618)" />
 <networkConnector name="instance8-instance3-instance4" uri="masterslave:(tcp://localhost:61619,tcp://localhost:61620)" />
 <networkConnector name="instance8-instance5-instance6" uri="masterslave:(tcp://localhost:61621,tcp://localhost:61622)" />
</networkConnectors>
<persistenceAdapter>
 <kahaDB directory="${activemq.data}/../../../kahadb4" />
</persistenceAdapter>

Настройки ресурсов для клиента, я объединил в один модуль. В реальном случае настройки будут разнесены.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi"
 xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd">

 <osgi:reference id="platformTransactionManager"
  interface="org.springframework.transaction.PlatformTransactionManager" />


 <bean id="instanceConnectionFactory1" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL"
   value="failover://(tcp://localhost:61617,tcp://localhost:61618)?initialReconnectDelay=100" />
  <property name="userName" value="karaf" />
  <property name="password" value="karaf" />
  <property name="redeliveryPolicy">
   <bean class="org.apache.activemq.RedeliveryPolicy">
    <property name="maximumRedeliveries" value="0" />
   </bean>
  </property>
 </bean>

 <bean id="instanceConnectionFactory2" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL"
   value="failover://(tcp://localhost:61619,tcp://localhost:61620)?initialReconnectDelay=100" />
  <property name="userName" value="karaf" />
  <property name="password" value="karaf" />
  <property name="redeliveryPolicy">
   <bean class="org.apache.activemq.RedeliveryPolicy">
    <property name="maximumRedeliveries" value="0" />
   </bean>
  </property>
 </bean>

 <bean id="instanceConnectionFactory3" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL"
   value="failover://(tcp://localhost:61621,tcp://localhost:61622)?initialReconnectDelay=100" />
  <property name="userName" value="karaf" />
  <property name="password" value="karaf" />
  <property name="redeliveryPolicy">
   <bean class="org.apache.activemq.RedeliveryPolicy">
    <property name="maximumRedeliveries" value="0" />
   </bean>
  </property>
 </bean>

 <bean id="instanceConnectionFactory4" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL"
   value="failover://(tcp://localhost:61623,tcp://localhost:61624)?initialReconnectDelay=100" />
  <property name="userName" value="karaf" />
  <property name="password" value="karaf" />
  <property name="redeliveryPolicy">
   <bean class="org.apache.activemq.RedeliveryPolicy">
    <property name="maximumRedeliveries" value="0" />
   </bean>
  </property>
 </bean>

 <bean id="jmsConfig1"
  class="org.apache.activemq.camel.component.ActiveMQConfiguration">
  <property name="connectionFactory" ref="instanceConnectionFactory1" />
  <property name="transactionManager" ref="platformTransactionManager" />
  <property name="transacted" value="true" />
 </bean>

 <bean id="jmsConfig2"
  class="org.apache.activemq.camel.component.ActiveMQConfiguration">
  <property name="connectionFactory" ref="instanceConnectionFactory2" />
  <property name="transactionManager" ref="platformTransactionManager" />
  <property name="transacted" value="true" />
 </bean>

 <bean id="jmsConfig3"
  class="org.apache.activemq.camel.component.ActiveMQConfiguration">
  <property name="connectionFactory" ref="instanceConnectionFactory3" />
  <property name="transactionManager" ref="platformTransactionManager" />
  <property name="transacted" value="true" />
 </bean>

 <bean id="jmsConfig4"
  class="org.apache.activemq.camel.component.ActiveMQConfiguration">
  <property name="connectionFactory" ref="instanceConnectionFactory4" />
  <property name="transactionManager" ref="platformTransactionManager" />
  <property name="transacted" value="true" />
 </bean>

 <bean id="instance1" class="org.apache.activemq.camel.component.ActiveMQComponent">
  <constructor-arg ref="jmsConfig1" />
 </bean>
 <bean id="instance2" class="org.apache.activemq.camel.component.ActiveMQComponent">
  <constructor-arg ref="jmsConfig2" />
 </bean>
 <bean id="instance3" class="org.apache.activemq.camel.component.ActiveMQComponent">
  <constructor-arg ref="jmsConfig3" />
 </bean>
 <bean id="instance4" class="org.apache.activemq.camel.component.ActiveMQComponent">
  <constructor-arg ref="jmsConfig4" />
 </bean>

 <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <property name="transactionManager" ref="platformTransactionManager" />
  <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
 </bean>
 <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <property name="transactionManager" ref="platformTransactionManager" />
  <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW" />
 </bean>

</beans>

Непосредственно код клиента на Apache Camel
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
 xmlns:osgi="http://www.springframework.org/schema/osgi"
 xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

 <camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
   <from uri="jetty:http://localhost:8191/myapp/myservice" />
   <to uri="instance1:queue:notify" />
  </route>
  <route>
   <from uri="jetty:http://localhost:8192/myapp/myservice" />
   <to uri="instance2:queue:notify" />
  </route>
  <route>
   <from uri="jetty:http://localhost:8193/myapp/myservice" />
   <to uri="instance3:queue:notify" />
  </route>
  <route>
   <from uri="jetty:http://localhost:8194/myapp/myservice" />
   <to uri="instance4:queue:notify" />
  </route>
  
  <route>
   <from uri="instance1:queue:notify" />
   <log message="instance1:${body}" />
   <to uri="file:C:/data/kraken/instance1" />
  </route>
  <route>
   <from uri="instance2:queue:notify" />
   <log message="instance2:${body}" />
   <to uri="file:C:/data/kraken/instance2" />
  </route>
  <route>
   <from uri="instance3:queue:notify" />
   <log message="instance3:${body}" />
   <to uri="file:C:/data/kraken/instance3" />
  </route>
  <route>
   <from uri="instance4:queue:notify" />
   <log message="instance4:${body}" />
   <to uri="file:C:/data/kraken/instance4" />
  </route>
 </camelContext>
</beans>

И код балансировщика, который отправляет 1000 сообщений, также на Apache Camel.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
 xmlns:osgi="http://www.springframework.org/schema/osgi"
 xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

 <bean id="myDataSet" class="org.apache.camel.component.dataset.SimpleDataSet">
  <property name="size" value="1000" />
 </bean>

 <camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
   <from uri="dataset:myDataSet?produceDelay=-1" />
   <loadBalance>
    <roundRobin/>
    <to uri="http://localhost:8191/myapp/myservice" />
    <to uri="http://localhost:8192/myapp/myservice" />
    <to uri="http://localhost:8193/myapp/myservice" />
    <to uri="http://localhost:8194/myapp/myservice" />
   </loadBalance>
  </route>
 </camelContext>
 
</beans>

Комментариев нет: