Разберемся детально с локальными транзакциями на примере взаимодействия JMS очередей (ActiveMQ). В документации по этому поводу говорится, что для того чтобы с route был транзакционным достаточно в начале объявить его с помощью инструкции <transacted/> . Но как будет выглядеть транзакционное поведение умалчивается. Обычно в качестве примера демонстрируется подобный route.
Модифицируем код следующим образом
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="jmstx:queue:okay"/>
<transacted/>
<to uri="jmstx:queue:final"/>
</route>
</camelContext>
</beans>
Который не демонстрирует ровным счетом ничего.Модифицируем код следующим образом
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="jmstx:queue:okay"/>
<transacted/>
<to uri="jmstx:queue:final"/>
<bean ref="businessLogic" method="business">
</route>
</camelContext>
</beans>
Поставим посередине вызов bean содержащей бизнес-логику. Сама бизнес-логика простейшая
package com.bssys.transaction_client;
package com.bssys.transaction_client;
import org.apache.log4j.Logger;
public class BusinessLogic {
private Logger log = Logger.getLogger(BusinessLogic.class);
public void business(String message){
log.info(message);
if ("exception".equals(message)){
log.info("Rollback " + message);
throw new IllegalArgumentException("This is forced");
}
}
}
Задумка такова, что при отправке сообщения содержащего строку "exception" поднимается исключение, которое должно вызвать откат транзакции. Для того чтобы вся эта магия заработала необходимо настроить транзакции. Все транзакционные артефакты я описал в отдельном spring-контексте.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd">
<osgi:reference id="connectionFactory" interface="javax.jms.ConnectionFactory" />
<osgi:reference id="platformTransactionManager" interface="org.springframework.transaction.PlatformTransactionManager" />
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="connectionFactory" />
<property name="transactionManager" ref="platformTransactionManager" />
<property name="transacted" value="true" />
</bean>
<bean id="jmstx" class="org.apache.camel.component.jms.JmsComponent">
<constructor-arg ref="jmsConfig" />
</bean>
<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="platformTransactionManager" />
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
</bean>
<bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="platformTransactionManager" />
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW" />
</bean>
</beans>
Попробуем запустить и проверить. Модуль я запускаю на ServiceMix 5.0.0 последней на данный момент версии. Отправив сообщение "exception" в очередь okay. В логах видим следующее сообщение, повторяющееся некоторое количество раз.
12:51:52,683 | INFO | msConsumer[okay] | BusinessLogic | 205 - transaction-client - 0.0.1 | exception
12:51:52,684 | INFO | msConsumer[okay] | BusinessLogic | 205 - transaction-client - 0.0.1 | Rollback exception
12:51:52,685 | ERROR | msConsumer[okay] | DefaultErrorHandler | 110 - org.apache.camel.camel-core - 2.12.3 | Failed delivery for (MessageId: ID:NovA-63543-1396509662689-1:3:1:1:2 on ExchangeId: ID-NovA-63522-1396509447471-9-1). Exhausted after delivery attempt: 1 caught: java.lang.IllegalArgumentException: This is forced
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route15 ] [route15 ] [jmstx://queue:okay ] [ 3]
[route15 ] [transacted15 ] [transacted[] ] [ 3]
[route15 ] [to21 ] [jmstx:queue:final ] [ 1]
[route15 ] [bean8 ] [bean[ref:businessLogic method: business] ] [ 2]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-NovA-63522-1396509447471-9-1
ExchangePattern InOnly
Headers {breadcrumbId=ID:NovA-63543-1396509662689-1:3:1:1:2, CamelRedelivered=false, CamelRedeliveryCounter=0, JMSCorrelationID=, JMSDeliveryMode=1, JMSDestination=queue://okay, JMSExpiration=0, JMSMessageID=ID:NovA-63543-1396509662689-1:3:1:1:2, JMSPriority=0, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1396515112680, JMSType=, JMSXGroupID=null, JMSXUserID=null}
BodyType String
Body exception
]
Сообщение действительно не попало в очередь final. Но это ли мы действительно хотели? В результате сообщение не находится ни в одной из очередей. Т.е. сообщение пропало! Ищите в логах. Это просто не допустимо. Итого корректное поведение, которого я хочу добиться следующее.
При возникновении исключения сообщение должно откатываться во всех системах, и для того чтобы оно не потерялось перекладываться в специальную очередь для ошибок. Иными словами, нам необходимо реализовать специльный паттерн для таких случаев "Dead letter chanel". Судя по документации есть несколько путей чтобы реализовать подобное.
Например в некоторых местах документации предлагают такой вариант:
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
onException(IllegalArgumentException.class).
maximumRedeliveries(1)
.handled(true)
.to("file:target/messages?fileName=deadLetters.xml&fileExist=Append")
.markRollbackOnly(); // NB: Must come *after* the dead letter endpoint.
from("file:src/data?noop=true")
.transacted()
.beanRef("accountService","credit")
.beanRef("accountService","debit")
.beanRef("accountService","dumpTable")
.to("file:target/messages");
}
}
Попробуем адаптировать этот пример ближе к нам. Таким образом это будет выглядеть следующим образом:<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="businessLogic" class="com.bssys.transaction_client.BusinessLogic" />
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<!-- 1: from the jms queue -->
<from uri="jmstx:queue:okay" />
<onException>
<exception>java.lang.IllegalArgumentException</exception>
<redeliveryPolicy maximumRedeliveries="1"/>
<handled>
<constant>true</constant>
</handled>
<log message="${exception.message}" />
<to uri="file:target/messages?fileName=deadLetters.xml&fileExist=Append" />
<rollback markRollbackOnly="true" />
</onException>
<transacted/>
<to uri="jmstx:queue:final" />
<bean ref="businessLogic" method="business" />
</route>
</camelContext>
</beans>
Да, действительно это похоже на то что нам надо, но только с одной оговоркой. в качестве dead letter chanel, сообщение скидывается в текстовый файл. Но нам то надо перекладывать его в отдельную очередь. Адаптируем пример, где в качестве dead letter queue, будет использоваться отдельная очередь.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="businessLogic" class="com.bssys.transaction_client.BusinessLogic" />
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<!-- 1: from the jms queue -->
<from uri="jmstx:queue:okay" />
<onException>
<exception>java.lang.IllegalArgumentException</exception>
<camel:redeliveryPolicy maximumRedeliveries="1"/>
<handled>
<constant>true</constant>
</handled>
<log message="${exception.message}" />
<to uri="jmstx:queue:bad" />
<rollback markRollbackOnly="true" />
</onException>
<!-- <transacted ref="PROPAGATION_REQUIRED" /> -->
<transacted/>
<to uri="jmstx:queue:final" />
<bean ref="businessLogic" method="business" />
</route>
</camelContext>
</beans>
Запустив мы увидим в логах следующее
13:46:09,301 | INFO | msConsumer[okay] | BusinessLogic | 207 - transaction-client - 0.0.1 | exception
13:46:09,302 | INFO | msConsumer[okay] | BusinessLogic | 207 - transaction-client - 0.0.1 | Rollback exception
13:46:09,302 | INFO | msConsumer[okay] | BusinessLogic | 207 - transaction-client - 0.0.1 | exception
13:46:09,303 | INFO | msConsumer[okay] | BusinessLogic | 207 - transaction-client - 0.0.1 | Rollback exception
13:46:09,303 | INFO | msConsumer[okay] | route5 | 110 - org.apache.camel.camel-core - 2.12.3 | This is forced
13:46:09,305 | WARN | msConsumer[okay] | DefaultErrorHandler | 110 - org.apache.camel.camel-core - 2.12.3 | Rollback (MessageId: ID:NovA-64390-1396518019146-1:1:1:1:3 on ExchangeId: ID-NovA-64373-1396518004181-2-1) due: This is forced
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route5 ] [route5 ] [jmstx://queue:okay ] [ 5]
[route5 ] [transacted5 ] [transacted[] ] [ 5]
[route5 ] [to8 ] [jmstx:queue:final ] [ 1]
[route5 ] [bean3 ] [bean[ref:businessLogic method: business] ] [ 4]
[route5 ] [log3 ] [log ] [ 0]
[route5 ] [to7 ] [jmstx:queue:bad ] [ 1]
[route5 ] [rollback3 ] [rollback ] [ 0]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-NovA-64373-1396518004181-2-1
ExchangePattern InOnly
Headers {breadcrumbId=ID:NovA-64390-1396518019146-1:1:1:1:3, JMSCorrelationID=, JMSDeliveryMode=1, JMSDestination=queue://okay, JMSExpiration=0, JMSMessageID=ID:NovA-64390-1396518019146-1:1:1:1:3, JMSPriority=0, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1396518369297, JMSType=, JMSXGroupID=null, JMSXUserID=null}
BodyType String
Body exception
]
13:46:09,305 | WARN | msConsumer[okay] | TransactionErrorHandler | 110 - org.apache.camel.camel-core - 2.12.3 | Transaction rollback (0x2b0ef227) redelivered(false) for (MessageId: ID:NovA-64390-1396518019146-1:1:1:1:3 on ExchangeId: ID-NovA-64373-1396518004181-2-1) due exchange was marked for rollbackOnly
13:46:09,305 | WARN | msConsumer[okay] | EndpointMessageListener | 110 - org.apache.camel.camel-core - 2.12.3 | Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - org.apache.camel.RollbackExchangeException: Intended rollback. Exchange[JmsMessage[JmsMessageID: ID:NovA-64390-1396518019146-1:1:1:1:3]]]
По факту сообщение попало в очередь jmstx:queue:bad, но было отменено по rollback! Это совсем никуда не годится! Для того чтобы сообщение не откатывалось из очереди ошибок, отправку необходимо выполнять в отдельной транзакции. При этом надо настроить транзакции так, чтобы не было двойной переотправки. Дело в том что помимо повторений на стороне Camel, переотправкой сообщений будет также заниматься transactionManager.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd">
<osgi:reference id="platformTransactionManager"
interface="org.springframework.transaction.PlatformTransactionManager" />
<bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="userName" value="karaf" />
<property name="password" value="karaf" />
<property name="redeliveryPolicy">
<bean class="org.apache.activemq.RedeliveryPolicy">
<property name="maximumRedeliveries" value="0" />
</bean>
</property>
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="maxConnections" value="8"/>
<property name="connectionFactory" ref="activemqConnectionFactory"/>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="transactionManager" ref="platformTransactionManager" />
<property name="transacted" value="true" />
</bean>
<bean id="jmstx" class="org.apache.camel.component.jms.JmsComponent">
<constructor-arg ref="jmsConfig" />
</bean>
<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="platformTransactionManager" />
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
</bean>
<bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="platformTransactionManager" />
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW" />
</bean>
</beans>
Итак вот итоговый результат!
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="businessLogic" class="com.bssys.transaction_client.BusinessLogic" />
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<!-- 1: from the jms queue -->
<from uri="jmstx:queue:okay" />
<onException>
<exception>java.lang.IllegalArgumentException</exception>
<camel:redeliveryPolicy maximumRedeliveries="3" redeliveryDelay="1000"/>
<handled>
<constant>true</constant>
</handled>
<log message="${exception.message}" />
<to uri="direct:dead" />
<rollback markRollbackOnly="true" />
</onException>
<transacted/>
<to uri="jmstx:queue:final" />
<bean ref="businessLogic" method="business" />
</route>
<route>
<camel:from uri="direct:dead" />
<transacted ref="PROPAGATION_REQUIRES_NEW" />
<to uri="jmstx:queue:bad" />
</route>
</camelContext>
</beans>
В примере делается 3 попытки с интервалом 1сек, и в конечном итоге сообщение попадает в очередь ошибок.
14:07:36,765 | INFO | msConsumer[okay] | BusinessLogic | 207 - transaction-client - 0.0.1 | exception
14:07:36,766 | INFO | msConsumer[okay] | BusinessLogic | 207 - transaction-client - 0.0.1 | Rollback exception
14:07:37,766 | INFO | msConsumer[okay] | BusinessLogic | 207 - transaction-client - 0.0.1 | exception
14:07:37,766 | INFO | msConsumer[okay] | BusinessLogic | 207 - transaction-client - 0.0.1 | Rollback exception
14:07:38,767 | INFO | msConsumer[okay] | BusinessLogic | 207 - transaction-client - 0.0.1 | exception
14:07:38,767 | INFO | msConsumer[okay] | BusinessLogic | 207 - transaction-client - 0.0.1 | Rollback exception
14:07:39,767 | INFO | msConsumer[okay] | BusinessLogic | 207 - transaction-client - 0.0.1 | exception
14:07:39,767 | INFO | msConsumer[okay] | BusinessLogic | 207 - transaction-client - 0.0.1 | Rollback exception
14:07:39,768 | INFO | msConsumer[okay] | route9 | 110 - org.apache.camel.camel-core - 2.12.3 | This is forced
14:07:39,826 | WARN | msConsumer[okay] | DefaultErrorHandler | 110 - org.apache.camel.camel-core - 2.12.3 | Rollback (MessageId: ID:NovA-64390-1396518019146-1:2:1:1:2 on ExchangeId: ID-NovA-64373-1396518004181-4-1) due: This is forced
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route9 ] [route9 ] [jmstx://queue:okay ] [ 3062]
[route9 ] [transacted9 ] [transacted[] ] [ 3062]
[route9 ] [to14 ] [jmstx:queue:final ] [ 1]
[route9 ] [bean5 ] [bean[ref:businessLogic method: business] ] [ 3061]
[route9 ] [log5 ] [log ] [ 0]
[route9 ] [to13 ] [direct:dead ] [ 57]
[route10 ] [transacted10 ] [transacted[ref:PROPAGATION_REQUIRES_NEW] ] [ 57]
[route10 ] [to15 ] [jmstx:queue:bad ] [ 3]
[route9 ] [rollback5 ] [rollback ] [ 0]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-NovA-64373-1396518004181-4-1
ExchangePattern InOnly
Headers {breadcrumbId=ID:NovA-64390-1396518019146-1:2:1:1:2, JMSCorrelationID=, JMSDeliveryMode=1, JMSDestination=queue://okay, JMSExpiration=0, JMSMessageID=ID:NovA-64390-1396518019146-1:2:1:1:2, JMSPriority=0, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1396519656761, JMSType=, JMSXGroupID=null, JMSXUserID=null}
BodyType String
Body exception
]
14:07:39,826 | WARN | msConsumer[okay] | TransactionErrorHandler | 110 - org.apache.camel.camel-core - 2.12.3 | Transaction rollback (0x2b75718c) redelivered(false) for (MessageId: ID:NovA-64390-1396518019146-1:2:1:1:2 on ExchangeId: ID-NovA-64373-1396518004181-4-1) due exchange was marked for rollbackOnly
14:07:39,827 | WARN | msConsumer[okay] | EndpointMessageListener | 110 - org.apache.camel.camel-core - 2.12.3 | Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - org.apache.camel.RollbackExchangeException: Intended rollback. Exchange[JmsMessage[JmsMessageID: ID:NovA-64390-1396518019146-1:2:1:1:2]]]
Весь код тут: git clone https://Hibernate2009@bitbucket.org/Hibernate2009/transaction-jms-client.git
Комментариев нет:
Отправить комментарий