вторник, апреля 26, 2016

ServiceMix Atomikos XA transaction JDBC JMS

Снова вернемся к важному вопросу о XA транзакциях. Рассмотрим реализацию XA транзакций с использованием популярного менеджера распределенных транзакций Atomikos. Это широко используемый диспетчер транзакций JTA с открытым исходным кодом, который предназначен для сред отличных от J2EE. Это как раз наш случай.
В данном примере будет представлена готовая конфигурация транзакций.
Приложение разделено на модули, которые разделяют настройку ресурсов, от бизнес логики. Фактически вы можете установить модули в ServiceMix и заняться разработкой, не разгребая как во многих других примерах системную и бизнес логику.
Итак модули:
  1. ts - поднимает транзакционные артефакты JtaTransactionManager, UserTransactionManager, UserTransactionImp
  2. ds - поднимает транзакционный источник данных на примере БД Oracle OracleXADataSource, AtomikosDataSourceBean
  3. amq - поднимает транзакционные артефакты для работы с JMS на примере ActiveMQ, ActiveMQXAConnectionFactory, AtomikosConnectionFactoryBean
  4. testXA - приложение для ServiceMix демонстрируещее работающие транзакции 
Контекст модуля ts определяет два бина txMgr и userTransaction. Классы реализации предоставляются Atomikos и реализуют интерфейсы TransactionManager и UserTransaction из стандарта J2EE. После этого определяется бин jtaTxMgr с классом реализации JtaTransactionManager и внедряются два бина транзакций предоставляемые Atomikos. Это информирует Spring о необходимости использования Atomikos JTA для управления транзакциями. Далее сконфигурированный менеджер транзакций выставляется как osgi-сервис, чтобы позволить другим модулям его переиспользовать.



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:osgi="http://www.springframework.org/schema/osgi"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

   <bean id="jtaTxMgr" class="org.springframework.transaction.jta.JtaTransactionManager">
  <property name="transactionManager" ref="txMgr" />
  <property name="userTransaction" ref="userTransaction" />
 </bean>

 <bean id="txMgr" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
  <property name="forceShutdown" value="false" />
 </bean>

 <bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
  <property name="transactionTimeout" value="120" />
 </bean>
 
 <osgi:service interface="org.springframework.transaction.PlatformTransactionManager" ref="jtaTxMgr">
  <osgi:service-properties>
   <entry key="transaction.manager.name" value="atomikos" />
  </osgi:service-properties>
 </osgi:service>
</beans>

Контекст модуля ds определяет источник данных OracleXADataSource на примере БД Oracle , с использованием AtomikosDataSourceBean, который поддерживает XA источники данных. Он также предоставляет пул соединений. OracleXADataSource несмотря на название не является пулом, и при работе каждый раз поднимает соединение с базой. Не знаю почему во многих примерах его используют напрямую. При большой нагрузке его использование будет "безбожно жрать коннекты". Сконфигурированный источник данных выставляется как osgi-сервис. Обратите внимание что интерфейс для сервиса определяется как javax.sql.DataSource, что позволит в дальнейшем использовать удобный JdbcTemplate.


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi"
 xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

 <bean id="atomikosXaDataSourceBean" class="com.atomikos.jdbc.AtomikosDataSourceBean">
  <property name="uniqueResourceName" value="oracleXA" />
  <property name="testQuery" value="select * from dual" />
  <property name="xaDataSource" ref="dataSourceXA" />
  <property name="minPoolSize" value="1" />
  <property name="maxPoolSize" value="5" />
  <property name="reapTimeout" value="0" />
 </bean>

 <bean id="dataSourceXA" class="oracle.jdbc.xa.client.OracleXADataSource">
  <property name="URL" value="jdbc:oracle:thin:@host:1521:sid" />
  <property name="user" value="123" />
  <property name="password" value="123" />
  <property name="connectionCachingEnabled" value="false" />
 </bean>

 <osgi:service interface="javax.sql.DataSource" ref="atomikosXaDataSourceBean">
  <osgi:service-properties>
   <entry key="osgi.jndi.service.name" value="jdbc/oracleXA" />
   <entry key="datasource.name" value="oracleXA" />
  </osgi:service-properties>
 </osgi:service>
</beans>
Аналогичным образом определяем нужные бины для работы с ActiveMQ с помощью AtomikosConnectionFactoryBean

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:osgi="http://www.springframework.org/schema/osgi"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <bean id="atomikosXaConnectionFactoryBean" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
  init-method="init" destroy-method="close">
  <property name="uniqueResourceName" value="amq1" />
  <property name="xaConnectionFactory" ref="jmsXaConnectionFactory" />
  <property name="localTransactionMode" value="false" />
  <property name="minPoolSize" value="1" />
  <property name="maxPoolSize" value="5" />
  <property name="reapTimeout" value="0" />
 </bean>

 <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
  <property name="brokerURL" value="failover:(tcp://localhost:61616)" />
  <property name="redeliveryPolicy">
   <bean class="org.apache.activemq.RedeliveryPolicy">
    <property name="maximumRedeliveries" value="1" />
   </bean>
  </property>
  <property name="userName" value="smx" />
  <property name="password" value="smx" />
 </bean>
 
 <osgi:service interface="javax.jms.ConnectionFactory" ref="atomikosXaConnectionFactoryBean">
  <osgi:service-properties>
   <entry key="osgi.jndi.service.name" value="jms/connectionFactory" />
   <entry key="connection.factory.name" value="activemqXA" />
  </osgi:service-properties>
 </osgi:service>
</beans>

Пример демонстрирующий использование транзакций разделен на два контекста.
system-context.xml получающий по ссылке нужные системные артефакты для работы с транзакциями и camel-context.xml содержащий бизнес логику

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi"
 xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

 <osgi:reference id="jtaTxMgr" interface="org.springframework.transaction.PlatformTransactionManager" filter="(transaction.manager.name=atomikos)"/>
 <osgi:reference id="atomikosXaDataSourceBean" interface="javax.sql.DataSource" filter="(datasource.name=oracleXA)"/>
 <osgi:reference id="atomikosXaConnectionFactoryBean" interface="javax.jms.ConnectionFactory" filter="(connection.factory.name=activemqXA)"/>
 
 <bean id="REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <property name="transactionManager" ref="jtaTxMgr" />
  <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
 </bean>

 <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
  <property name="connectionFactory" ref="atomikosXaConnectionFactoryBean" />
  <property name="transacted" value="true" />
  <property name="transactionManager" ref="jtaTxMgr" />
  <property name="maxConcurrentConsumers" value="1" />
  <property name="cacheLevelName" value="CACHE_NONE" />
 </bean>

 <bean id="activemqXA" class="org.apache.activemq.camel.component.ActiveMQComponent">
  <property name="configuration" ref="jmsConfig" />
 </bean>
 
 <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="atomikosXaDataSourceBean"/>
    </bean>
</beans>

Бизнес логика следующая:
  1. Получить jms сообщение
  2. Выгрузить сообщение в промежуточную очередь
  3. Выполнить обновление БД (успешное)
  4. Выполнить обновление БД приводящее к исключению
  5. Выгрузить сообщение в итоговую очередь
Как вы можете убедиться, запустив этот пример, что сообщение в результате обработки данным роутом будет выгружено в очередь ошибок, при этом ни в одном промежуточном ресурсе данные не сохранятся. Это то что там надо.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi"
 xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
 
 <bean id="hello" class="com.bssys.nio2.testXA.HelloBean">
  <constructor-arg ref="jdbcTemplate"/>
 </bean>

 <camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
   <from uri="activemqXA:queue:SQL_IN" />
   <transacted  ref="REQUIRED"/>
   <to uri="activemqXA:queue:SQL_MIDLE" />
   <bean ref="hello" method="goodlogic"/>
   <log message="REDELIVERED: ${header.JMSRedelivered}" />
   <bean ref="hello" method="badlogic"/>
   <to uri="activemqXA:queue:SQL_OUT" />
  </route>
 </camelContext>

</beans>

Чтобы сообщение выгружалось в очередь DLQ надо настроить немного ActiveMQ прописав в activemq.xml следующее

<policyEntry queue=">">
          <deadLetterStrategy>
            <!--
              Use the prefix 'DLQ.' for the destination name, and make
              the DLQ a queue rather than a topic
            -->
            <individualDeadLetterStrategy
              queuePrefix="DLQ." useQueueForQueueMessages="true" />
          </deadLetterStrategy>
 </policyEntry>


Таким образом для каждом очереди будет создаваться своя очередь DLQ. Очень удобно в дальнейшем разгребать конфликты. Ну и зависимости для Servicemix'а для того чтобы модули взлетели. Предварительно надо соответствующим образом установить драйвер для Oracle, которого нет в центральном репозитории Maven. Будут вопросы пишите в коментариях, я отвечу.

<?xml version="1.0" encoding="UTF-8"?>
<features xmlns="http://karaf.apache.org/xmlns/features/v1.0.0"
 name="egg">
 <feature name="egg-features" version="1.0">
  <feature version="3.2.14.RELEASE_1">spring-jdbc</feature>
  <feature>camel-sql</feature>
  <feature>camel-jdbc</feature>
  <feature>jndi</feature>
  <feature>war</feature>
  <feature>activemq</feature>
  <feature>activemq-web-console</feature>
  <feature>activemq-broker</feature>
  <feature>transaction</feature>
  <bundle>wrap:mvn:com.oracle/ojdbc6/12.1.0.2.0</bundle>
  <bundle>wrap:mvn:commons-dbcp/commons-dbcp/1.4</bundle>
  <bundle>mvn:com.atomikos/transactions-osgi/4.0.2</bundle>
 </feature>
</features>
Исходники на GitHib https://github.com/Hibernate2009/servicemix-atomikos-xa-jms-jdbc.git

Комментариев нет: