Pytanie Najlepsza strategia do przetwarzania dużych plików CSV w Apache Camel


Chciałbym opracować trasę, która sonduje katalog zawierający pliki CSV, a dla każdego pliku usuwa wszystkie wiersze za pomocą Bindy i umieszcza je w activemq.

Problem polega na tym, że pliki mogą być dość duże (milion wierszy), więc wolałbym ustawiać kolejki po jednym wierszu, ale otrzymuję wszystkie wiersze w java.util.ArrayList na końcu Bindy, co powoduje problemy z pamięcią.

Do tej pory mam mały test i unmarshaling działa, więc konfiguracja Bindy przy użyciu adnotacji jest w porządku.

Oto trasa:

from("file://data/inbox?noop=true&maxMessagesPerPoll=1&delay=5000")
  .unmarshal()
  .bindy(BindyType.Csv, "com.ess.myapp.core")           
  .to("jms:rawTraffic");

Środowisko to: Eclipse Indigo, Maven 3.0.3, Camel 2.8.0

Dziękuję Ci


12
2017-11-14 14:06


pochodzenie




Odpowiedzi:


Jeśli używasz Splitter EIP, możesz użyć trybu strumieniowego, co oznacza, że ​​Camel będzie przetwarzał plik w kolejności od rzędu.

from("file://data/inbox?noop=true&maxMessagesPerPoll=1&delay=5000")
  .split(body().tokenize("\n")).streaming()
    .unmarshal().bindy(BindyType.Csv, "com.ess.myapp.core")           
    .to("jms:rawTraffic");

27
2017-11-14 14:33



Dziękuję Clausowi za odpowiedź. Teraz mam inny problem. Podążając za moim małym ćwiczeniem, próbuję wyodrębnić z kolejki i napisać do pliku z .convertBodyTo(String.class).to("file:data/outbox?fileExist=Append") ale tylko pierwszy wiersz zostanie zapisany. Mimo wszystko, jeśli użyję opcji pliku Zastąp, otrzymam tylko ostatni wiersz. Czy istnieje sposób na zapisanie wszystkich wierszy z pliku CSV do pliku ?. Dziękuję Ci - Taka
Musisz podać nazwę pliku .to ("file: data / outbox? FileName = data.csv i fileExist = Append") - Claus Ibsen
Dodaj .thread() po .streaming() czy może być bardziej wydajny? - Pith
Rozdzielacz EIP ma wbudowaną obsługę wielowątkowości, możesz odwołać się do executorService. Byłoby to bardziej idealne do użycia niż wątki. Ale to drugie jest również możliwe. Zobacz dokumentację Camel dla przykładów. - Claus Ibsen
Widzę tokenizację według nowej linii. A co z wielowierszowymi rzędami? Czy obsługiwane? - Daneel S. Yaitskov


Dla rekordu i dla innych użytkowników, którzy mogliby szukać tego tak samo jak ja, tymczasem wydaje się, że istnieje łatwiejsza metoda, która również dobrze sprawdza się w przypadku useMaps:

CsvDataFormat csv = new CsvDataFormat()
    .setLazyLoad(true)
    .setUseMaps(true);

from("file://data/inbox?noop=true&maxMessagesPerPoll=1&delay=5000")
    .unmarshal(csv)
    .split(body()).streaming()
    .to("log:mappedRow?multiline=true");

2
2018-06-01 12:01





Używanie EIP-ów Splittera i Aggregatora byłoby najlepszą strategią do przetwarzania dużych plików CSV w Apache Camel. Przeczytaj więcej o tej formie Złożony procesor wiadomości

Oto przykład użycia Java DSL:

package com.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.dataformat.csv.CsvDataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.QuoteMode;

public class FileSplitter {

    public static void main(String args[]) throws Exception {
        CamelContext context = new DefaultCamelContext();
        CsvDataFormat csvParser = new CsvDataFormat(CSVFormat.DEFAULT);
        csvParser.setSkipHeaderRecord(true);
        csvParser.setQuoteMode(QuoteMode.ALL);
        context.addRoutes(new RouteBuilder() {
            public void configure() {
                String fileName = "Hello.csv";
                int lineCount = 20;
                System.out.println("fileName = " + fileName);
                System.out.println("lineCount = " + lineCount);
                from("file:data/inbox?noop=true&fileName=" + fileName).unmarshal(csvParser).split(body()).streaming()
                        .aggregate(constant(true), new ArrayListAggregationStrategy()).completionSize(lineCount)
                        .completionTimeout(1500).marshal(csvParser)
                        .to("file:data/outbox?fileName=${file:name.noext}_${header.CamelSplitIndex}.csv");
            }
        });
        context.start();
        Thread.sleep(10000);
        context.stop();
        System.out.println("End");
    }
}

0
2018-03-27 12:24