четверг, апреля 03, 2014

Transactional JMS Client

Разберемся детально с локальными транзакциями на примере взаимодействия JMS очередей (ActiveMQ). В документации по этому поводу говорится, что для того чтобы с route был транзакционным достаточно в начале объявить его с помощью инструкции <transacted/> . Но как будет выглядеть транзакционное поведение умалчивается. Обычно в качестве примера демонстрируется подобный route.

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="jmstx:queue:okay"/>
      <transacted/>
      <to uri="jmstx:queue:final"/>
    </route>
  </camelContext>
</beans>
Который не демонстрирует ровным счетом ничего.



Модифицируем код следующим образом

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="jmstx:queue:okay"/>
      <transacted/>
      <to uri="jmstx:queue:final"/>
      <bean ref="businessLogic" method="business">
    </route>
  </camelContext>
</beans>

Поставим посередине вызов bean содержащей бизнес-логику. Сама бизнес-логика простейшая
package com.bssys.transaction_client;

package com.bssys.transaction_client;

import org.apache.log4j.Logger;

public class BusinessLogic {
 private Logger log = Logger.getLogger(BusinessLogic.class);
 
 public void business(String message){
  log.info(message);
  if ("exception".equals(message)){
   log.info("Rollback " + message);
   throw new IllegalArgumentException("This is forced");
  }
 }
}

Задумка такова, что при отправке сообщения содержащего строку "exception" поднимается исключение, которое должно вызвать откат транзакции. Для того чтобы вся эта магия заработала необходимо настроить транзакции. Все транзакционные артефакты я описал в отдельном spring-контексте.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi"
 xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd">

 <osgi:reference id="connectionFactory" interface="javax.jms.ConnectionFactory" />
 <osgi:reference id="platformTransactionManager" interface="org.springframework.transaction.PlatformTransactionManager" />

 <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="transactionManager" ref="platformTransactionManager" />
  <property name="transacted" value="true" />
 </bean>

 <bean id="jmstx" class="org.apache.camel.component.jms.JmsComponent">
  <constructor-arg ref="jmsConfig" />
 </bean>
 
 <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <property name="transactionManager" ref="platformTransactionManager" />
  <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
 </bean>
 
 <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <property name="transactionManager" ref="platformTransactionManager" />
  <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW" />
 </bean>

</beans>

Попробуем запустить и проверить. Модуль я запускаю на ServiceMix 5.0.0 последней на данный момент версии. Отправив сообщение "exception" в очередь okay. В логах видим следующее сообщение, повторяющееся некоторое количество раз.

12:51:52,683 | INFO  | msConsumer[okay] | BusinessLogic                    | 205 - transaction-client - 0.0.1 | exception
12:51:52,684 | INFO  | msConsumer[okay] | BusinessLogic                    | 205 - transaction-client - 0.0.1 | Rollback exception
12:51:52,685 | ERROR | msConsumer[okay] | DefaultErrorHandler              | 110 - org.apache.camel.camel-core - 2.12.3 | Failed delivery for (MessageId: ID:NovA-63543-1396509662689-1:3:1:1:2 on ExchangeId: ID-NovA-63522-1396509447471-9-1). Exhausted after delivery attempt: 1 caught: java.lang.IllegalArgumentException: This is forced

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route15           ] [route15           ] [jmstx://queue:okay                                                            ] [         3]
[route15           ] [transacted15      ] [transacted[]                                                                  ] [         3]
[route15           ] [to21              ] [jmstx:queue:final                                                             ] [         1]
[route15           ] [bean8             ] [bean[ref:businessLogic method: business]                                      ] [         2]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
 Id                  ID-NovA-63522-1396509447471-9-1
 ExchangePattern     InOnly
 Headers             {breadcrumbId=ID:NovA-63543-1396509662689-1:3:1:1:2, CamelRedelivered=false, CamelRedeliveryCounter=0, JMSCorrelationID=, JMSDeliveryMode=1, JMSDestination=queue://okay, JMSExpiration=0, JMSMessageID=ID:NovA-63543-1396509662689-1:3:1:1:2, JMSPriority=0, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1396515112680, JMSType=, JMSXGroupID=null, JMSXUserID=null}
 BodyType            String
 Body                exception
]

Сообщение действительно не попало в очередь final. Но это ли мы действительно хотели? В результате сообщение не находится ни в одной из очередей. Т.е. сообщение пропало! Ищите в логах. Это просто не допустимо. Итого корректное поведение, которого я хочу добиться следующее.
При возникновении исключения сообщение должно откатываться во всех системах, и для того чтобы оно не потерялось перекладываться в специальную очередь для ошибок. Иными словами, нам необходимо реализовать специльный паттерн для таких случаев "Dead letter chanel". Судя по документации есть несколько путей чтобы реализовать подобное.
Например в некоторых местах документации предлагают такой вариант:

// Java
import org.apache.camel.spring.SpringRouteBuilder;

public class MyRouteBuilder extends SpringRouteBuilder {
    ...
    public void configure() {
        onException(IllegalArgumentException.class).
            maximumRedeliveries(1)
            .handled(true)
            .to("file:target/messages?fileName=deadLetters.xml&fileExist=Append")
            .markRollbackOnly();  // NB: Must come *after* the dead letter endpoint.

        from("file:src/data?noop=true")
            .transacted()
            .beanRef("accountService","credit")
            .beanRef("accountService","debit")
            .beanRef("accountService","dumpTable")
            .to("file:target/messages");
    }
}
Попробуем адаптировать этот пример ближе к нам. Таким образом это будет выглядеть следующим образом:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
 xmlns:osgi="http://www.springframework.org/schema/osgi"
 xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

 <bean id="businessLogic" class="com.bssys.transaction_client.BusinessLogic" />

 <camelContext xmlns="http://camel.apache.org/schema/spring">

  <route>
   <!-- 1: from the jms queue -->
   <from uri="jmstx:queue:okay" />
   <onException>
    <exception>java.lang.IllegalArgumentException</exception>
    <redeliveryPolicy maximumRedeliveries="1"/>
    <handled>
     <constant>true</constant>
    </handled>
    <log message="${exception.message}" />
    <to uri="file:target/messages?fileName=deadLetters.xml&amp;fileExist=Append" />
    <rollback markRollbackOnly="true" />
   </onException>
   <transacted/>
   <to uri="jmstx:queue:final" />
   <bean ref="businessLogic" method="business" />
  </route>
  
 </camelContext>

</beans>

Да, действительно это похоже на то что нам надо, но только с одной оговоркой. в качестве dead letter chanel, сообщение скидывается в текстовый файл. Но нам то надо перекладывать его в отдельную очередь. Адаптируем пример, где в качестве dead letter queue, будет использоваться отдельная очередь.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
 xmlns:osgi="http://www.springframework.org/schema/osgi"
 xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

 <bean id="businessLogic" class="com.bssys.transaction_client.BusinessLogic" />

 <camelContext xmlns="http://camel.apache.org/schema/spring">

  <route>
   <!-- 1: from the jms queue -->
   <from uri="jmstx:queue:okay" />
   <onException>
    <exception>java.lang.IllegalArgumentException</exception>
    <camel:redeliveryPolicy maximumRedeliveries="1"/>
    <handled>
     <constant>true</constant>
    </handled>
    <log message="${exception.message}" />
    <to uri="jmstx:queue:bad" />
    <rollback markRollbackOnly="true" />
   </onException>
   <!-- <transacted ref="PROPAGATION_REQUIRED" /> -->
   <transacted/>
   <to uri="jmstx:queue:final" />
   <bean ref="businessLogic" method="business" />
  </route>
 </camelContext>

</beans>

Запустив мы увидим в логах следующее

13:46:09,301 | INFO  | msConsumer[okay] | BusinessLogic                    | 207 - transaction-client - 0.0.1 | exception
13:46:09,302 | INFO  | msConsumer[okay] | BusinessLogic                    | 207 - transaction-client - 0.0.1 | Rollback exception
13:46:09,302 | INFO  | msConsumer[okay] | BusinessLogic                    | 207 - transaction-client - 0.0.1 | exception
13:46:09,303 | INFO  | msConsumer[okay] | BusinessLogic                    | 207 - transaction-client - 0.0.1 | Rollback exception
13:46:09,303 | INFO  | msConsumer[okay] | route5                           | 110 - org.apache.camel.camel-core - 2.12.3 | This is forced
13:46:09,305 | WARN  | msConsumer[okay] | DefaultErrorHandler              | 110 - org.apache.camel.camel-core - 2.12.3 | Rollback (MessageId: ID:NovA-64390-1396518019146-1:1:1:1:3 on ExchangeId: ID-NovA-64373-1396518004181-2-1) due: This is forced

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route5            ] [route5            ] [jmstx://queue:okay                                                            ] [         5]
[route5            ] [transacted5       ] [transacted[]                                                                  ] [         5]
[route5            ] [to8               ] [jmstx:queue:final                                                             ] [         1]
[route5            ] [bean3             ] [bean[ref:businessLogic method: business]                                      ] [         4]
[route5            ] [log3              ] [log                                                                           ] [         0]
[route5            ] [to7               ] [jmstx:queue:bad                                                               ] [         1]
[route5            ] [rollback3         ] [rollback                                                                      ] [         0]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
 Id                  ID-NovA-64373-1396518004181-2-1
 ExchangePattern     InOnly
 Headers             {breadcrumbId=ID:NovA-64390-1396518019146-1:1:1:1:3, JMSCorrelationID=, JMSDeliveryMode=1, JMSDestination=queue://okay, JMSExpiration=0, JMSMessageID=ID:NovA-64390-1396518019146-1:1:1:1:3, JMSPriority=0, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1396518369297, JMSType=, JMSXGroupID=null, JMSXUserID=null}
 BodyType            String
 Body                exception
]

13:46:09,305 | WARN  | msConsumer[okay] | TransactionErrorHandler          | 110 - org.apache.camel.camel-core - 2.12.3 | Transaction rollback (0x2b0ef227) redelivered(false) for (MessageId: ID:NovA-64390-1396518019146-1:1:1:1:3 on ExchangeId: ID-NovA-64373-1396518004181-2-1) due exchange was marked for rollbackOnly
13:46:09,305 | WARN  | msConsumer[okay] | EndpointMessageListener          | 110 - org.apache.camel.camel-core - 2.12.3 | Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - org.apache.camel.RollbackExchangeException: Intended rollback. Exchange[JmsMessage[JmsMessageID: ID:NovA-64390-1396518019146-1:1:1:1:3]]]

По факту сообщение попало в очередь jmstx:queue:bad, но было отменено по rollback! Это совсем никуда не годится! Для того чтобы сообщение не откатывалось из очереди ошибок, отправку необходимо выполнять в отдельной транзакции. При этом надо настроить транзакции так, чтобы не было двойной переотправки. Дело в том что помимо повторений на стороне Camel, переотправкой сообщений будет также заниматься transactionManager.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi"
 xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd">

 <osgi:reference id="platformTransactionManager"
  interface="org.springframework.transaction.PlatformTransactionManager" />

  <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616" />
  <property name="userName" value="karaf" />
  <property name="password" value="karaf" />
  <property name="redeliveryPolicy">
   <bean class="org.apache.activemq.RedeliveryPolicy">
    <property name="maximumRedeliveries" value="0" />
   </bean>
  </property>
 </bean>
 
 <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="maxConnections" value="8"/>
        <property name="connectionFactory" ref="activemqConnectionFactory"/>
    </bean>

 <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
  <property name="connectionFactory" ref="pooledConnectionFactory" />
  <property name="transactionManager" ref="platformTransactionManager" />
  <property name="transacted" value="true" />
 </bean>

 <bean id="jmstx" class="org.apache.camel.component.jms.JmsComponent">
  <constructor-arg ref="jmsConfig" />
 </bean>

 <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <property name="transactionManager" ref="platformTransactionManager" />
  <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
 </bean>
 <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <property name="transactionManager" ref="platformTransactionManager" />
  <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW" />
 </bean>

</beans>

 Итак вот итоговый результат!

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
 xmlns:osgi="http://www.springframework.org/schema/osgi"
 xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

 <bean id="businessLogic" class="com.bssys.transaction_client.BusinessLogic" />

 <camelContext xmlns="http://camel.apache.org/schema/spring">

  <route>
   <!-- 1: from the jms queue -->
   <from uri="jmstx:queue:okay" />
   <onException>
    <exception>java.lang.IllegalArgumentException</exception>
    <camel:redeliveryPolicy maximumRedeliveries="3" redeliveryDelay="1000"/>
    <handled>
     <constant>true</constant>
    </handled>
    <log message="${exception.message}" />
    <to uri="direct:dead" />
    <rollback markRollbackOnly="true" />
   </onException>
   <transacted/>
   <to uri="jmstx:queue:final" />
   <bean ref="businessLogic" method="business" />
  </route>

  <route>
   <camel:from uri="direct:dead" />
   <transacted ref="PROPAGATION_REQUIRES_NEW" />
   <to uri="jmstx:queue:bad" />
  </route>
 </camelContext>

</beans>

В примере делается 3 попытки с интервалом 1сек, и в конечном итоге сообщение попадает в очередь ошибок.

14:07:36,765 | INFO  | msConsumer[okay] | BusinessLogic                    | 207 - transaction-client - 0.0.1 | exception
14:07:36,766 | INFO  | msConsumer[okay] | BusinessLogic                    | 207 - transaction-client - 0.0.1 | Rollback exception
14:07:37,766 | INFO  | msConsumer[okay] | BusinessLogic                    | 207 - transaction-client - 0.0.1 | exception
14:07:37,766 | INFO  | msConsumer[okay] | BusinessLogic                    | 207 - transaction-client - 0.0.1 | Rollback exception
14:07:38,767 | INFO  | msConsumer[okay] | BusinessLogic                    | 207 - transaction-client - 0.0.1 | exception
14:07:38,767 | INFO  | msConsumer[okay] | BusinessLogic                    | 207 - transaction-client - 0.0.1 | Rollback exception
14:07:39,767 | INFO  | msConsumer[okay] | BusinessLogic                    | 207 - transaction-client - 0.0.1 | exception
14:07:39,767 | INFO  | msConsumer[okay] | BusinessLogic                    | 207 - transaction-client - 0.0.1 | Rollback exception
14:07:39,768 | INFO  | msConsumer[okay] | route9                           | 110 - org.apache.camel.camel-core - 2.12.3 | This is forced
14:07:39,826 | WARN  | msConsumer[okay] | DefaultErrorHandler              | 110 - org.apache.camel.camel-core - 2.12.3 | Rollback (MessageId: ID:NovA-64390-1396518019146-1:2:1:1:2 on ExchangeId: ID-NovA-64373-1396518004181-4-1) due: This is forced

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route9            ] [route9            ] [jmstx://queue:okay                                                            ] [      3062]
[route9            ] [transacted9       ] [transacted[]                                                                  ] [      3062]
[route9            ] [to14              ] [jmstx:queue:final                                                             ] [         1]
[route9            ] [bean5             ] [bean[ref:businessLogic method: business]                                      ] [      3061]
[route9            ] [log5              ] [log                                                                           ] [         0]
[route9            ] [to13              ] [direct:dead                                                                   ] [        57]
[route10           ] [transacted10      ] [transacted[ref:PROPAGATION_REQUIRES_NEW]                                      ] [        57]
[route10           ] [to15              ] [jmstx:queue:bad                                                               ] [         3]
[route9            ] [rollback5         ] [rollback                                                                      ] [         0]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
 Id                  ID-NovA-64373-1396518004181-4-1
 ExchangePattern     InOnly
 Headers             {breadcrumbId=ID:NovA-64390-1396518019146-1:2:1:1:2, JMSCorrelationID=, JMSDeliveryMode=1, JMSDestination=queue://okay, JMSExpiration=0, JMSMessageID=ID:NovA-64390-1396518019146-1:2:1:1:2, JMSPriority=0, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1396519656761, JMSType=, JMSXGroupID=null, JMSXUserID=null}
 BodyType            String
 Body                exception
]

14:07:39,826 | WARN  | msConsumer[okay] | TransactionErrorHandler          | 110 - org.apache.camel.camel-core - 2.12.3 | Transaction rollback (0x2b75718c) redelivered(false) for (MessageId: ID:NovA-64390-1396518019146-1:2:1:1:2 on ExchangeId: ID-NovA-64373-1396518004181-4-1) due exchange was marked for rollbackOnly
14:07:39,827 | WARN  | msConsumer[okay] | EndpointMessageListener          | 110 - org.apache.camel.camel-core - 2.12.3 | Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - org.apache.camel.RollbackExchangeException: Intended rollback. Exchange[JmsMessage[JmsMessageID: ID:NovA-64390-1396518019146-1:2:1:1:2]]]

Весь код тут: git clone https://Hibernate2009@bitbucket.org/Hibernate2009/transaction-jms-client.git

Комментариев нет: