В этой статье я расскажу про фееричный фейл Apache Camel , на реальной бизнес-задаче.
Возникла задача по обработке 5000000 zip-файлов (O_o). Каждый файлик необходимо распаковать, достать из архива XML файл, распарсить его, и некоторую информацию дописать в лог в определенном формате.
Идеальная задача для Apache Camel подумал я и бросился в бой!
Один роутинг и один процессор, вот что у меня вышло.
Роутинг:
Возникла задача по обработке 5000000 zip-файлов (O_o). Каждый файлик необходимо распаковать, достать из архива XML файл, распарсить его, и некоторую информацию дописать в лог в определенном формате.
Идеальная задача для Apache Camel подумал я и бросился в бой!
Один роутинг и один процессор, вот что у меня вышло.
Роутинг:
public void configure() {
from("file:c://data/inbox?noop=true").
log("${file:name}").
convertBodyTo(InputStream.class).
processRef("ZipProcess").
to("file:c://data/outbox/?fileExist=Append&fileName=report.txt");
}
Процессор:
package com.bssys.zoom.zipbot;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
public class ZipProcess implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
// TODO Auto-generated method stub
InputStream is = exchange.getIn().getBody(InputStream.class);
ZipInputStream zipInputStream = new ZipInputStream(new BufferedInputStream(is));
String res = null;
ZipEntry zipEntry = null;
while ((zipEntry = zipInputStream.getNextEntry()) != null) {
String fileName = zipEntry.getName();
if (fileName.endsWith(".XML")) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
byte buf[] = new byte[1024];
int count = -1;
while ((count = zipInputStream.read(buf)) != -1) {
byteArrayOutputStream.write(buf, 0, count);
}
String xml = new String(byteArrayOutputStream.toByteArray());
//... Тут парсим XML и выбираем нужные данные
res = String.format("%s:\r\n", fileName);
}
zipInputStream.closeEntry();
}
zipInputStream.close();
exchange.getOut().setBody(res);
}
}
Эта штука еле взлетела отожрав 5гб оперативной памяти. Скорость работы вполне приличная 1000 файлов за 300ms. Производительность вполне достаточная, если учесть что парсер можно оптимизировать или совсем отказаться от него, используя indexOf.
Через пару часов работы, заглядывая в результирующий файлик, всплыло неладное. Данные повторялись! Т.е. Camel брал один и тотже файл по нескольку раз. На моем тестовом экземпляре в 1000 файлов все работало без нареканий. Заглянув документацию заметил следующий комментарий по параметру noop:
If true, the file is not moved or deleted in any way. This option is good for readonly data, or for ETL type requirements. If noop=true, Camel will set idempotent=true as well, to avoid consuming the same files over and over again.Этот параметр я проставляю для конечной точки, что бы Camel не удалял и не перемещал файлы из исходной папки. Если проставляется noop=true то автоматически проставляется также параметр idempotent=true
Option to use the Idempotent Consumer EIP pattern to let Camel skip already processed files. Will by default use a memory based LRUCache that holds 1000 entries. If noop=true then idempotent will be enabled as well to avoid consuming the same files over and over again.Все стало ясно. Camel в режиме мониторинга устанавливает кэш в 1000 файлов, чтобы не забирать уже обработанные файлы. И просто не предназначен для такой ситуации с 5000000 файлов. Можно не устанавливать флаг noop, но при этом сильно режется производительность. Вот этого я никак не ожидал. Хотя по логике ничего работа Camel Не вызывает нареканий. В режиме мониторинга мы упираемся в этуже проблему. В результате без использования Camel, была написана программка, которая и выполнила эту непосильную задачу, заняв при этом меньшее количество памяти. Привожу часть просто для интереса.
public void go(String inbox, String outbox) throws Exception {
File inboxDir = new File(inbox);
if (inboxDir.isDirectory()) {
String[] listFiles = inboxDir.list();
System.out.println("Count files: " + listFiles.length);
BufferedOutputStream bufferedOutputStream = null;
try {
bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(new File(outbox + "/report.txt")));
long count = 0;
for (String fileName : listFiles) {
count++;
System.out.println(count + ":" + fileName);
FileInputStream fileInputStream = new FileInputStream(new File(inbox + "/" + fileName));
try{
String res = process(fileInputStream);
bufferedOutputStream.write(res.getBytes());
}catch(Exception e){
FileOutputStream fileOutputStream = new FileOutputStream(new File(outbox + "/fail.txt"), true);
fileOutputStream.write((fileName+"\r\n").getBytes());
fileInputStream.close();
}
}
bufferedOutputStream.flush();
} finally {
if (bufferedOutputStream != null) {
bufferedOutputStream.close();
}
}
}
}
Метод process, собственно на 99% схож с кодом процессора.
Удачи!
4 комментария:
Статья конечно старая, но все равно... скажите, а что произойдет, если ваша "программка" в какой-то момент будет остановлена? Скажем после успешной обработки 2045845-го файла? А если еще сами файлы в каталоге обновятся? Скажем будет добавлено еще парочка файлов?
Программа для этого не предназначена. Задача отработать один раз полностью до конца. Если файлы обновятся никто об этом не узнает. Надо будет запустить еще раз.
Для мониторинга папки на предмет добавления новых файлов, надо будет использовать стандартный файловый адаптер из комплекта Apache Camel, только не забыть добавить проверку на пустой файл, иначе при большой нагрузке, адаптер будет хватать пустые файлы и процессинг будет падать.
Привет из будущего. Не уверен, что знаю, как было, но сейчас при noop=true, выставляется флаг idempotent=true, что означает, что список обработанных файлов будет храниться в репозиторие(idempotentRepository) и повторно не обработается. Размер репозитория можно выставить, подробнее здесь - http://people.apache.org/~dkulp/camel/file2.html
Привет из будущего. Не уверен, что знаю, как было, но сейчас при noop=true, выставляется флаг idempotent=true, что означает, что список обработанных файлов будет храниться в репозиторие(idempotentRepository) и повторно не обработается. Размер репозитория можно выставить, подробнее здесь - http://people.apache.org/~dkulp/camel/file2.html
Отправить комментарий