пятница, октября 14, 2016

The Skip Locked feature in Postgres 9.5 (message queue)

В углу что-то мяукают по ActiveMQ. А между тем в postgress запилили крутую фичу. for update skip locked позволяет пропустить в select строки заблокированные в другой транзакции. К сожалению такое не прокатит в Oracle. Но тем не менее для мессажинга можно использовать уже сейчас. Из коробки имеем:

  • транзакции
  • персистенцию сообщений
  • клиент(обычный jdbc)
  • кластер(обычные средства предоставляемые postgres)

package com.bssys.blog.messaging;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.jdbc.datasource.SingleConnectionDataSource;
import org.springframework.stereotype.Component;

@Component("consumer")
public class Consumer implements Runnable {

 private String select = "select id, message from skiptable where status = 'TODO' order by priority asc limit 1 for update skip locked";
 private String delete = "delete from skiptable where id = ? ";

 @Autowired
 private DataSource dataSource;

 public void run() {

  while (!Thread.currentThread().isInterrupted()) {
   Connection con = DataSourceUtils.getConnection(dataSource);
   SingleConnectionDataSource singleDs = new SingleConnectionDataSource(con, true);
   try {
    JdbcTemplate jdbcTemplate = new JdbcTemplate(singleDs);
    List<Map<String, Object>> queryForList = jdbcTemplate.queryForList(select);
    if (!queryForList.isEmpty()) {
     for (Map<String, Object> map : queryForList) {
      String id = (String) map.get("id");
      String message = (String) map.get("message");
      jdbcTemplate.update(delete, new Object[] { id });
      System.out.println(Thread.currentThread().getName() + ":" + message);
     }
     con.commit();
    } else {
     System.out.println(Thread.currentThread().getName() + ": no mass");
    }
   } catch (Exception e) {
    try {
     con.rollback();
    } catch (SQLException e1) {
     e1.printStackTrace();
    }
   } finally {
    singleDs.destroy();
   }
  }
 }
}

Комментариев нет: