среда, мая 30, 2012

JDBC Polling Adapter c помощью Apache Camel

В качестве wiki and issue tracking system у нас используется TRAC http://trac.edgewall.org/ Появилась задача мониторинга изменения задач в этой системе. TRAC написан на Python и поддерживает из коробки только один вид нотификации-email.  Активность по тикетам в системе должна выгружаться в xml-файл для обработки другой системой. Идеальная задача для Camel ;)



Первоначальная идея решения была основана на работе с email. Однако по причине ее ущербности она была отброшена.
Быстро окинув взглядом что нам предоставляет Camel я к своему удивлению не нашел такой простой вещи как JDBC-адаптер. Адаптер, в полном смысле этого понятия, с помощью которого возможна передача сообщений из БД в интеграционное решение. Как пример что это такое можно посмотреть на JDBC-адаптер от Websphere ESB JDBC Adapter.

Основной принцип двунаправленной коммуникации заключается в следующем. В БД которую необходимо мониторить заводится служебная табличка, которую с некоторым интервалом опрашивает адаптер, забирает данные и передает в интеграционное решение. Данные в служебную табличку складываются с помощью тригера.

Основные компоненты у на сбудут следующие:

  1. Таймер, инициализирующий событие мониторинга служебной таблички: 
  2. Процессор, который выбирает из таблички сообщения и асинронно передает их на обработку. 
  3. Процессор, непосредственно обрабатывающий сообщения. 

 Далее нам протребуется механизм пуллинга, позволяющий отправлять сообщения напрямую из процессора. Для этого в Apache Camel предусмотрен ProducerTemplate.

 Итак вот каркас нашего приложения: camel-context.xml Внутри camelContext инстанцируется Producer, и далее по ID инжектируется в JDBCAdapter

<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
 license agreements. See the NOTICE file distributed with this work for additional 
 information regarding copyright ownership. The ASF licenses this file to 
 You under the Apache License, Version 2.0 (the "License"); you may not use 
 this file except in compliance with the License. You may obtain a copy of 
 the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
 by applicable law or agreed to in writing, software distributed under the 
 License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
 OF ANY KIND, either express or implied. See the License for the specific 
 language governing permissions and limitations under the License. -->

<!-- Configures the Camel Context -->

<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

 <bean id="JDBCAdapter" class="foo.JDBCAdapter">
  <property name="producer" ref="change" />
 </bean>

 <camelContext xmlns="http://camel.apache.org/schema/spring">
  <package>foo</package>
  <template id="change" defaultEndpoint="seda:change" />
 </camelContext>

</beans>


package foo;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;

public class JDBCAdapter implements Processor {

 private ProducerTemplate producer;

 public void setProducer(ProducerTemplate producer) {
  this.producer = producer;
 }

 @Override
 public void process(Exchange arg0) throws Exception {
  // TODO Auto-generated method stub
  producer.sendBody("Ok");
 }
}

Итоговый роутер:
package foo;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spring.Main;

import static org.apache.camel.builder.xml.XPathBuilder.xpath;

/**
 * A Camel Router
 */
public class MyRouteBuilder extends RouteBuilder {

    /**
     * A main() so we can easily run these routing rules in our IDE
     */
    public static void main(String... args) throws Exception {
        Main.main(args);
    }

    /**
     * Let's configure the Camel routing rules using Java code...
     */
    public void configure() {
     from("timer://foo?fixedRate=true&period=1000").processRef("JDBCAdapter");
     from("seda:change").log("Yes baby!${body}");
    }
}

Обратите внимание что endpoint у нас определяется как seda вместо direct, что обеспечивает нужную нам асинхронность! Проверим что все работает как было задумано.
[                          main] MainSupport                    INFO  Apache Camel 2.9.2 starting
[                          main] AnnotationTypeConverterLoader  INFO  Found 3 packages with 15 @Converter classes to load
[                          main] DefaultTypeConverter           INFO  Loaded 170 core type converters (total 170 type converters)
[                          main] DefaultTypeConverter           INFO  Loaded additional 0 type converters (total 170 type converters) in 0.000 seconds
[                          main] SpringCamelContext             INFO  Apache Camel 2.9.2 (CamelContext: camel-1) is starting
[                          main] ManagementStrategyFactory      INFO  JMX enabled. Using ManagedManagementStrategy.
[                          main] ultManagementLifecycleStrategy INFO  StatisticsLevel at All so enabling load performance statistics
[                          main] SpringCamelContext             INFO  Route: route1 started and consuming from: Endpoint[timer://foo?fixedRate=true&period=1000]
[                          main] SpringCamelContext             INFO  Route: route2 started and consuming from: Endpoint[seda://change]
[                          main] SpringCamelContext             INFO  Total 2 routes, of which 2 is started.
[                          main] SpringCamelContext             INFO  Apache Camel 2.9.2 (CamelContext: camel-1) started in 0.234 seconds
[l-1) thread #2 - seda://change] route2                         INFO  Yes baby!Ok
[l-1) thread #2 - seda://change] route2                         INFO  Yes baby!Ok
[l-1) thread #2 - seda://change] route2                         INFO  Yes baby!Ok
[l-1) thread #2 - seda://change] route2                         INFO  Yes baby!Ok
Как можно догадаться вся логика обработки сообщений будет определятся роутингом:
from("seda:change").log("Yes baby!${body}");
Вернемся к нашей задаче. Trac хранит все изменения по тикетам в табличке ticket_change следующей структуры Создадим служебную табличку, которая полностью копирует структуру ticket_change, у меня она называется ticket_change_audit Тригер, который срабатывает при добавлении данных:
CREATE OR REPLACE FUNCTION etk."notify"()
  RETURNS trigger AS
$BODY$BEGIN
insert into etk.ticket_change_audit (ticket, time, author, field, oldvalue, newvalue) values(NEW.ticket, NEW.time, NEW.author, NEW.field, NEW.oldvalue, NEW.newvalue);
END;$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;
ALTER FUNCTION etk."notify"() OWNER TO tracuser;
COMMENT ON FUNCTION etk."notify"() IS 'dsd';

Теперь нам осталось доработать логику процессора JDBCAdapter'а. Создаем следующий бизнес-объект для нашей таблички(JPA и другие ORM не использую специально чтобы не усложнять наш пример)
package com.bssys.trac.bo;

import java.io.Serializable;

public class TicketChange implements Serializable {

 private Integer ticket;

 private Long time;

 private String author;

 private String field;

 private String oldValue;

 private String newValue;

 public Integer getTicket() {
  return ticket;
 }

 public String getField() {
  return field;
 }

 public void setField(String field) {
  this.field = field;
 }

 public void setTicket(Integer ticket) {
  this.ticket = ticket;
 }

 public Long getTime() {
  return time;
 }

 public void setTime(Long time) {
  this.time = time;
 }

 public String getAuthor() {
  return author;
 }

 public void setAuthor(String author) {
  this.author = author;
 }

 public String getOldValue() {
  return oldValue;
 }

 public void setOldValue(String oldValue) {
  this.oldValue = oldValue;
 }

 public String getNewValue() {
  return newValue;
 }

 public void setNewValue(String newValue) {
  this.newValue = newValue;
 }
}
И добавляем логику работы с БД с помощью спрингового JDBCTemplate. Далее необходимо будет просто реализвать нужную логику обработки сообщения, в теле message Exchange, будет прилетать объект TicketChange Вот что у нас получилось в итоге:
package com.bssys.trac;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;

import com.bssys.trac.bo.TicketChange;

public class JDBCAdapter implements Processor {

 private JdbcTemplate template;
 private ProducerTemplate producer;

 public void setTemplate(JdbcTemplate template) {
  this.template = template;
 }

 public void setProducer(ProducerTemplate producer) {
  this.producer = producer;
 }

 @Override
 public void process(Exchange exchange) throws Exception {
  // TODO Auto-generated method stub

  List res = template.query("select * from etk.ticket_change_audit", new RowMapper() {

   @Override
   public TicketChange mapRow(ResultSet rs, int rowNum) throws SQLException {
    // TODO Auto-generated method stub
    TicketChange ticketChange = new TicketChange();
    ticketChange.setTicket(rs.getInt("ticket"));
    ticketChange.setTime(rs.getLong("time"));
    ticketChange.setAuthor(rs.getString("author"));
    ticketChange.setField(rs.getString("field"));
    ticketChange.setOldValue(rs.getString("oldvalue"));
    ticketChange.setNewValue(rs.getString("newvalue"));

    return ticketChange;
   }
  });

  for (TicketChange change : res) {

   TicketChangeSendProcessor sendProcessor = new TicketChangeSendProcessor(change);
   producer.send(sendProcessor);

   String delete = "delete from etk.ticket_change_audit WHERE ticket=? and time=? and author=? and field=? and oldvalue=? and newvalue=?";
   template.update(delete, change.getTicket(), change.getTime(), change.getAuthor(), change.getField(), change.getOldValue(), change.getNewValue());

  }

  exchange.getOut().setBody(res);
 }
}

Комментариев нет: