четверг, января 17, 2013

Apache Camel on ServiceMix XA Transaction with database and JMS

Перейдем к одной из самых серьезных задач, без которой не может обойтись ни одна серьезная интеграционная платформа, к XA транзакциям. Для примера возьмем ситуацию близкую к реальности описанную как маршрут Apache Camel.
<camelContext xmlns="http://camel.apache.org/schema/spring">
 <!-- Transactional route -->
 <route>
  <from uri="jmstx:queue:giro" />
  <to uri="jmstx:queue:statusLog" />
  <bean ref="accountService" method="credit" />
  <bean ref="accountService" method="debit" />
 </route>
 <!-- Feeder route -->
 <route>
  <from uri="jmstx:queue:statusLog" />
  <to uri="file:Success" />
 </route>
</camelContext>
В этом маршруте описывается следующее:
  1. Принимает сообщение из JMS-очереди giro
  2. Перенаправляет сообщение в JMS-очередь statusLog, в которой должны быть только успешные сообщения
  3. Меняем состояние базы данных  
Второй маршрут описывает следующее:
  1. Принимает сообщение из JMS-очереди statusLog
  2. Сохраняет сообщение в папке Success
В ходе работы в итоговой папке Success должны находиться только успешные сообщения. При возникновении исключения, состояние в базе не меняется, и файла с транзакцией не оказывается в папке успешных сообщений. В этом примере участвуют 3 транзакционные системы JMS, database и filesystem. Как это заставить работать на ServiceMix и Apache Camel?...


Для начала необходимо ознакомиться с основной документацией по транзакциям на ServiceMix и его коммерческой версииhttp://fusesource.com/docs/esbent/7.1/camel_tx/front.html. Ниже изложенный материал полностью основан на стандартной документации.
Итак для начала как обычно создаем проект в Eclipse на основе архетипа camel-archetype-spring-dm. Мы не будем изобретать логику и воспользуемся примером из документации. Создаем класс AccountService, который будет работать с базой данных. Вот прямая ссылка на исходник AccountService. В классе разработаны методы debit и credit которые меняют состояние базы.

Для начала определим DataSource. В предыдущей статье мы как раз опубликовали XADataSource к базе данных Oracle. В конфиге получаем доступ к нашему DataSource
<!-- Reference JDBC DataSource -->
<osgi:reference id="oracleXADataSource" interface="javax.sql.DataSource"
 filter="(oracle.xa.datasource.name=oracleXA)" />
В качестве фильтра используется выражение, которые мы определили при его создании. Теперь можно определить наш бин и передать ему полученный DataSource
<!-- bean for account business logic -->
<bean id="accountService" class="com.bssys.ts2.AccountService">
 <property name="dataSource" ref="oracleXADataSource" />
</bean>
Так настраивается соединение с базой.
Настройки JMS ресурсов чуть более сложны. Первым делом необходимо получить доступ к TransactionManager предоставляемых ServiceMix
<!-- OSGi TM Service -->
<!-- access through Spring's PlatformTransactionManager -->
<osgi:reference id="osgiPlatformTransactionManager"
 interface="org.springframework.transaction.PlatformTransactionManager" />
<!-- access through PlatformTransactionManager -->
<osgi:reference id="osgiJtaTransactionManager"
 interface="javax.transaction.TransactionManager" />
Имея их настраиваем ConnectionFactory для JMS
<!-- JMS TX endpoint configuration -->
<bean id="jmstx" class="org.apache.activemq.camel.component.ActiveMQComponent">
 <property name="configuration" ref="jmsTxConfig" />
</bean>

<bean id="jmsTxConfig" class="org.apache.camel.component.jms.JmsConfiguration">
 <property name="connectionFactory" ref="jmsXaPoolConnectionFactory" />
 <property name="transactionManager" ref="osgiPlatformTransactionManager" />
 <property name="transacted" value="false" />
 <property name="cacheLevelName" value="CACHE_CONNECTION" />
</bean>

<!-- connection factory wrapper to support auto-enlisting of XA resource -->
<bean id="jmsXaPoolConnectionFactory" class="org.apache.activemq.pool.XaPooledConnectionFactory">
 <property name="maxConnections" value="1" />
 <property name="connectionFactory" ref="jmsXaConnectionFactory" />
 <property name="transactionManager" ref="osgiJtaTransactionManager" />
</bean>

<bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
 <property name="brokerURL" value="vm:local" />
 <property name="redeliveryPolicy">
  <bean class="org.apache.activemq.RedeliveryPolicy">
   <property name="maximumRedeliveries" value="0" />
  </bean>
 </property>
</bean>
Осталось сформировать базу по примеру из документации. Собрать и закинуть наш модуль в папку deploy.

ij> CREATE TABLE accounts (name VARCHAR(50), amount INT);

ij> INSERT INTO accounts (name,amount) VALUES ('Major Clanger',2000);

ij> INSERT INTO accounts (name,amount) VALUES ('Tiny Clanger',100);

Теперь кидая сообщения вида мы будем видеть их в папке Success, и смотреть как поменялись данные в таблице Account
<transaction>
  <transfer>
    <sender>Major Clanger</sender>
    <receiver>Tiny Clanger</receiver>
    <amount>90</amount>
  </transfer>
</transaction>
Для того чтобы спровоцировать откат транзакции достаточно кинуть сообщение с полем amount превышающим 100.

<transaction>
  <transfer>
    <sender>Major Clanger</sender>
    <receiver>Tiny Clanger</receiver>
    <amount>150</amount>
  </transfer>
</transaction>

В этом случае метод debit выкинет исключение которое вызовет rollbak для транзакции.
public void debit(
            @XPath("/transaction/transfer/sender/text()") String name,
            @XPath("/transaction/transfer/amount/text()") String amount
            )
    {
        log.info("debit() called with args name = " + name + " and amount = " + amount);
        int iamount = Integer.parseInt(amount);
        if (iamount > 100) {
            throw new IllegalArgumentException("Debit limit is 100");
        }
        int origAmount = jdbc.queryForInt(
                "select amount from accounts where name = ?",
                new Object[]{name}
        );
        int newAmount = origAmount - Integer.parseInt(amount);
        if (newAmount < 0) {
            throw new IllegalArgumentException("Not enough in account");
        }
        
        jdbc.update(
                "update accounts set amount = ? where name = ?",
                new Object[] {newAmount, name}
        );
    }

Комментариев нет: