Снова вернемся к важному вопросу о XA транзакциях. Рассмотрим реализацию XA транзакций с использованием популярного менеджера распределенных транзакций Atomikos. Это широко используемый диспетчер транзакций JTA с открытым исходным кодом, который предназначен для сред отличных от J2EE. Это как раз наш случай.
В данном примере будет представлена готовая конфигурация транзакций.
Приложение разделено на модули, которые разделяют настройку ресурсов, от бизнес логики. Фактически вы можете установить модули в ServiceMix и заняться разработкой, не разгребая как во многих других примерах системную и бизнес логику.
Итак модули:
Контекст модуля ds определяет источник данных OracleXADataSource на примере БД Oracle , с использованием AtomikosDataSourceBean, который поддерживает XA источники данных. Он также предоставляет пул соединений. OracleXADataSource несмотря на название не является пулом, и при работе каждый раз поднимает соединение с базой. Не знаю почему во многих примерах его используют напрямую. При большой нагрузке его использование будет "безбожно жрать коннекты". Сконфигурированный источник данных выставляется как osgi-сервис. Обратите внимание что интерфейс для сервиса определяется как javax.sql.DataSource, что позволит в дальнейшем использовать удобный JdbcTemplate.
Пример демонстрирующий использование транзакций разделен на два контекста.
system-context.xml получающий по ссылке нужные системные артефакты для работы с транзакциями и camel-context.xml содержащий бизнес логику
Бизнес логика следующая:
Чтобы сообщение выгружалось в очередь DLQ надо настроить немного ActiveMQ прописав в activemq.xml следующее
В данном примере будет представлена готовая конфигурация транзакций.
Приложение разделено на модули, которые разделяют настройку ресурсов, от бизнес логики. Фактически вы можете установить модули в ServiceMix и заняться разработкой, не разгребая как во многих других примерах системную и бизнес логику.
Итак модули:
- ts - поднимает транзакционные артефакты JtaTransactionManager, UserTransactionManager, UserTransactionImp
- ds - поднимает транзакционный источник данных на примере БД Oracle OracleXADataSource, AtomikosDataSourceBean
- amq - поднимает транзакционные артефакты для работы с JMS на примере ActiveMQ, ActiveMQXAConnectionFactory, AtomikosConnectionFactoryBean
- testXA - приложение для ServiceMix демонстрируещее работающие транзакции
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="jtaTxMgr" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="txMgr" />
<property name="userTransaction" ref="userTransaction" />
</bean>
<bean id="txMgr" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
<property name="forceShutdown" value="false" />
</bean>
<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="120" />
</bean>
<osgi:service interface="org.springframework.transaction.PlatformTransactionManager" ref="jtaTxMgr">
<osgi:service-properties>
<entry key="transaction.manager.name" value="atomikos" />
</osgi:service-properties>
</osgi:service>
</beans>
Контекст модуля ds определяет источник данных OracleXADataSource на примере БД Oracle , с использованием AtomikosDataSourceBean, который поддерживает XA источники данных. Он также предоставляет пул соединений. OracleXADataSource несмотря на название не является пулом, и при работе каждый раз поднимает соединение с базой. Не знаю почему во многих примерах его используют напрямую. При большой нагрузке его использование будет "безбожно жрать коннекты". Сконфигурированный источник данных выставляется как osgi-сервис. Обратите внимание что интерфейс для сервиса определяется как javax.sql.DataSource, что позволит в дальнейшем использовать удобный JdbcTemplate.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="atomikosXaDataSourceBean" class="com.atomikos.jdbc.AtomikosDataSourceBean">
<property name="uniqueResourceName" value="oracleXA" />
<property name="testQuery" value="select * from dual" />
<property name="xaDataSource" ref="dataSourceXA" />
<property name="minPoolSize" value="1" />
<property name="maxPoolSize" value="5" />
<property name="reapTimeout" value="0" />
</bean>
<bean id="dataSourceXA" class="oracle.jdbc.xa.client.OracleXADataSource">
<property name="URL" value="jdbc:oracle:thin:@host:1521:sid" />
<property name="user" value="123" />
<property name="password" value="123" />
<property name="connectionCachingEnabled" value="false" />
</bean>
<osgi:service interface="javax.sql.DataSource" ref="atomikosXaDataSourceBean">
<osgi:service-properties>
<entry key="osgi.jndi.service.name" value="jdbc/oracleXA" />
<entry key="datasource.name" value="oracleXA" />
</osgi:service-properties>
</osgi:service>
</beans>
Аналогичным образом определяем нужные бины для работы с ActiveMQ с помощью AtomikosConnectionFactoryBean<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="atomikosXaConnectionFactoryBean" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="amq1" />
<property name="xaConnectionFactory" ref="jmsXaConnectionFactory" />
<property name="localTransactionMode" value="false" />
<property name="minPoolSize" value="1" />
<property name="maxPoolSize" value="5" />
<property name="reapTimeout" value="0" />
</bean>
<bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL" value="failover:(tcp://localhost:61616)" />
<property name="redeliveryPolicy">
<bean class="org.apache.activemq.RedeliveryPolicy">
<property name="maximumRedeliveries" value="1" />
</bean>
</property>
<property name="userName" value="smx" />
<property name="password" value="smx" />
</bean>
<osgi:service interface="javax.jms.ConnectionFactory" ref="atomikosXaConnectionFactoryBean">
<osgi:service-properties>
<entry key="osgi.jndi.service.name" value="jms/connectionFactory" />
<entry key="connection.factory.name" value="activemqXA" />
</osgi:service-properties>
</osgi:service>
</beans>
Пример демонстрирующий использование транзакций разделен на два контекста.
system-context.xml получающий по ссылке нужные системные артефакты для работы с транзакциями и camel-context.xml содержащий бизнес логику
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<osgi:reference id="jtaTxMgr" interface="org.springframework.transaction.PlatformTransactionManager" filter="(transaction.manager.name=atomikos)"/>
<osgi:reference id="atomikosXaDataSourceBean" interface="javax.sql.DataSource" filter="(datasource.name=oracleXA)"/>
<osgi:reference id="atomikosXaConnectionFactoryBean" interface="javax.jms.ConnectionFactory" filter="(connection.factory.name=activemqXA)"/>
<bean id="REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="jtaTxMgr" />
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="atomikosXaConnectionFactoryBean" />
<property name="transacted" value="true" />
<property name="transactionManager" ref="jtaTxMgr" />
<property name="maxConcurrentConsumers" value="1" />
<property name="cacheLevelName" value="CACHE_NONE" />
</bean>
<bean id="activemqXA" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="atomikosXaDataSourceBean"/>
</bean>
</beans>
Бизнес логика следующая:
- Получить jms сообщение
- Выгрузить сообщение в промежуточную очередь
- Выполнить обновление БД (успешное)
- Выполнить обновление БД приводящее к исключению
- Выгрузить сообщение в итоговую очередь
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="hello" class="com.bssys.nio2.testXA.HelloBean">
<constructor-arg ref="jdbcTemplate"/>
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="activemqXA:queue:SQL_IN" />
<transacted ref="REQUIRED"/>
<to uri="activemqXA:queue:SQL_MIDLE" />
<bean ref="hello" method="goodlogic"/>
<log message="REDELIVERED: ${header.JMSRedelivered}" />
<bean ref="hello" method="badlogic"/>
<to uri="activemqXA:queue:SQL_OUT" />
</route>
</camelContext>
</beans>
Чтобы сообщение выгружалось в очередь DLQ надо настроить немного ActiveMQ прописав в activemq.xml следующее
<policyEntry queue=">">
<deadLetterStrategy>
<!--
Use the prefix 'DLQ.' for the destination name, and make
the DLQ a queue rather than a topic
-->
<individualDeadLetterStrategy
queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
Таким образом для каждом очереди будет создаваться своя очередь DLQ. Очень удобно в дальнейшем разгребать конфликты. Ну и зависимости для Servicemix'а для того чтобы модули взлетели. Предварительно надо соответствующим образом установить драйвер для Oracle, которого нет в центральном репозитории Maven. Будут вопросы пишите в коментариях, я отвечу.<?xml version="1.0" encoding="UTF-8"?>
<features xmlns="http://karaf.apache.org/xmlns/features/v1.0.0"
name="egg">
<feature name="egg-features" version="1.0">
<feature version="3.2.14.RELEASE_1">spring-jdbc</feature>
<feature>camel-sql</feature>
<feature>camel-jdbc</feature>
<feature>jndi</feature>
<feature>war</feature>
<feature>activemq</feature>
<feature>activemq-web-console</feature>
<feature>activemq-broker</feature>
<feature>transaction</feature>
<bundle>wrap:mvn:com.oracle/ojdbc6/12.1.0.2.0</bundle>
<bundle>wrap:mvn:commons-dbcp/commons-dbcp/1.4</bundle>
<bundle>mvn:com.atomikos/transactions-osgi/4.0.2</bundle>
</feature>
</features>
Исходники на GitHib https://github.com/Hibernate2009/servicemix-atomikos-xa-jms-jdbc.git
Комментариев нет:
Отправить комментарий