В качестве wiki and issue tracking system у нас используется TRAC http://trac.edgewall.org/ Появилась задача мониторинга изменения задач в этой системе. TRAC написан на Python и поддерживает из коробки только один вид нотификации-email. Активность по тикетам в системе должна выгружаться в xml-файл для обработки другой системой. Идеальная задача для Camel ;)
Первоначальная идея решения была основана на работе с email. Однако по причине ее ущербности она была отброшена.
Быстро окинув взглядом что нам предоставляет Camel я к своему удивлению не нашел такой простой вещи как JDBC-адаптер. Адаптер, в полном смысле этого понятия, с помощью которого возможна передача сообщений из БД в интеграционное решение. Как пример что это такое можно посмотреть на JDBC-адаптер от Websphere ESB JDBC Adapter.
Основной принцип двунаправленной коммуникации заключается в следующем. В БД которую необходимо мониторить заводится служебная табличка, которую с некоторым интервалом опрашивает адаптер, забирает данные и передает в интеграционное решение. Данные в служебную табличку складываются с помощью тригера.
Основные компоненты у на сбудут следующие:Первоначальная идея решения была основана на работе с email. Однако по причине ее ущербности она была отброшена.
Быстро окинув взглядом что нам предоставляет Camel я к своему удивлению не нашел такой простой вещи как JDBC-адаптер. Адаптер, в полном смысле этого понятия, с помощью которого возможна передача сообщений из БД в интеграционное решение. Как пример что это такое можно посмотреть на JDBC-адаптер от Websphere ESB JDBC Adapter.
Основной принцип двунаправленной коммуникации заключается в следующем. В БД которую необходимо мониторить заводится служебная табличка, которую с некоторым интервалом опрашивает адаптер, забирает данные и передает в интеграционное решение. Данные в служебную табличку складываются с помощью тригера.
- Таймер, инициализирующий событие мониторинга служебной таблички:
- Процессор, который выбирает из таблички сообщения и асинронно передает их на обработку.
- Процессор, непосредственно обрабатывающий сообщения.
Далее нам протребуется механизм пуллинга, позволяющий отправлять сообщения напрямую из процессора. Для этого в Apache Camel предусмотрен ProducerTemplate.
Итак вот каркас нашего приложения: camel-context.xml Внутри camelContext инстанцируется Producer, и далее по ID инжектируется в JDBCAdapter
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<!-- Configures the Camel Context -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="JDBCAdapter" class="foo.JDBCAdapter">
<property name="producer" ref="change" />
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<package>foo</package>
<template id="change" defaultEndpoint="seda:change" />
</camelContext>
</beans>
package foo;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
public class JDBCAdapter implements Processor {
private ProducerTemplate producer;
public void setProducer(ProducerTemplate producer) {
this.producer = producer;
}
@Override
public void process(Exchange arg0) throws Exception {
// TODO Auto-generated method stub
producer.sendBody("Ok ");
}
}
Итоговый роутер:
package foo;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spring.Main;
import static org.apache.camel.builder.xml.XPathBuilder.xpath;
/**
* A Camel Router
*/
public class MyRouteBuilder extends RouteBuilder {
/**
* A main() so we can easily run these routing rules in our IDE
*/
public static void main(String... args) throws Exception {
Main.main(args);
}
/**
* Let's configure the Camel routing rules using Java code...
*/
public void configure() {
from("timer://foo?fixedRate=true&period=1000").processRef("JDBCAdapter");
from("seda:change").log("Yes baby!${body}");
}
}
Обратите внимание что endpoint у нас определяется как seda вместо direct, что обеспечивает нужную нам асинхронность!
Проверим что все работает как было задумано.
[ main] MainSupport INFO Apache Camel 2.9.2 starting [ main] AnnotationTypeConverterLoader INFO Found 3 packages with 15 @Converter classes to load [ main] DefaultTypeConverter INFO Loaded 170 core type converters (total 170 type converters) [ main] DefaultTypeConverter INFO Loaded additional 0 type converters (total 170 type converters) in 0.000 seconds [ main] SpringCamelContext INFO Apache Camel 2.9.2 (CamelContext: camel-1) is starting [ main] ManagementStrategyFactory INFO JMX enabled. Using ManagedManagementStrategy. [ main] ultManagementLifecycleStrategy INFO StatisticsLevel at All so enabling load performance statistics [ main] SpringCamelContext INFO Route: route1 started and consuming from: Endpoint[timer://foo?fixedRate=true&period=1000] [ main] SpringCamelContext INFO Route: route2 started and consuming from: Endpoint[seda://change] [ main] SpringCamelContext INFO Total 2 routes, of which 2 is started. [ main] SpringCamelContext INFO Apache Camel 2.9.2 (CamelContext: camel-1) started in 0.234 seconds [l-1) thread #2 - seda://change] route2 INFO Yes baby!Как можно догадаться вся логика обработки сообщений будет определятся роутингом:Ok [l-1) thread #2 - seda://change] route2 INFO Yes baby!Ok [l-1) thread #2 - seda://change] route2 INFO Yes baby!Ok [l-1) thread #2 - seda://change] route2 INFO Yes baby!Ok
from("seda:change").log("Yes baby!${body}");Вернемся к нашей задаче. Trac хранит все изменения по тикетам в табличке ticket_change следующей структуры Создадим служебную табличку, которая полностью копирует структуру ticket_change, у меня она называется ticket_change_audit Тригер, который срабатывает при добавлении данных:
CREATE OR REPLACE FUNCTION etk."notify"() RETURNS trigger AS $BODY$BEGIN insert into etk.ticket_change_audit (ticket, time, author, field, oldvalue, newvalue) values(NEW.ticket, NEW.time, NEW.author, NEW.field, NEW.oldvalue, NEW.newvalue); END;$BODY$ LANGUAGE plpgsql VOLATILE COST 100; ALTER FUNCTION etk."notify"() OWNER TO tracuser; COMMENT ON FUNCTION etk."notify"() IS 'dsd';
package com.bssys.trac.bo;
import java.io.Serializable;
public class TicketChange implements Serializable {
private Integer ticket;
private Long time;
private String author;
private String field;
private String oldValue;
private String newValue;
public Integer getTicket() {
return ticket;
}
public String getField() {
return field;
}
public void setField(String field) {
this.field = field;
}
public void setTicket(Integer ticket) {
this.ticket = ticket;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
public String getOldValue() {
return oldValue;
}
public void setOldValue(String oldValue) {
this.oldValue = oldValue;
}
public String getNewValue() {
return newValue;
}
public void setNewValue(String newValue) {
this.newValue = newValue;
}
}
И добавляем логику работы с БД с помощью спрингового JDBCTemplate. Далее необходимо будет просто реализвать нужную логику обработки сообщения, в теле message Exchange, будет прилетать объект TicketChange
Вот что у нас получилось в итоге:
package com.bssys.trac;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import com.bssys.trac.bo.TicketChange;
public class JDBCAdapter implements Processor {
private JdbcTemplate template;
private ProducerTemplate producer;
public void setTemplate(JdbcTemplate template) {
this.template = template;
}
public void setProducer(ProducerTemplate producer) {
this.producer = producer;
}
@Override
public void process(Exchange exchange) throws Exception {
// TODO Auto-generated method stub
List res = template.query("select * from etk.ticket_change_audit", new RowMapper() {
@Override
public TicketChange mapRow(ResultSet rs, int rowNum) throws SQLException {
// TODO Auto-generated method stub
TicketChange ticketChange = new TicketChange();
ticketChange.setTicket(rs.getInt("ticket"));
ticketChange.setTime(rs.getLong("time"));
ticketChange.setAuthor(rs.getString("author"));
ticketChange.setField(rs.getString("field"));
ticketChange.setOldValue(rs.getString("oldvalue"));
ticketChange.setNewValue(rs.getString("newvalue"));
return ticketChange;
}
});
for (TicketChange change : res) {
TicketChangeSendProcessor sendProcessor = new TicketChangeSendProcessor(change);
producer.send(sendProcessor);
String delete = "delete from etk.ticket_change_audit WHERE ticket=? and time=? and author=? and field=? and oldvalue=? and newvalue=?";
template.update(delete, change.getTicket(), change.getTime(), change.getAuthor(), change.getField(), change.getOldValue(), change.getNewValue());
}
exchange.getOut().setBody(res);
}
}
Комментариев нет:
Отправить комментарий