2012-04-23 4 views
1

이제는 낙타에서 몇 가지 작은 프로젝트를 수행했지만 큰 데이터 (메모리에 맞지 않는 데이터)를 처리하는 방법에 대해 고민 중입니다.) 낙타 루트에서 소비 할 때.Apache Camel : 데이터베이스의 GBs 데이터를 JMS 끝점으로 라우팅

필자는 낙타를 사용하여 경로를 지정하려는 몇 가지 GB 상당의 데이터가 포함 된 데이터베이스를 보유하고 있습니다. 분명히 모든 데이터를 메모리로 읽어들이는 것은 선택 사항이 아닙니다.

독립 실행 형 응용 프로그램으로이 작업을 수행했다면 데이터를 호출하여 JMS enpoint에 청크를 보내는 코드가 생겼습니다. 좋은 패턴을 제공하기 때문에 낙타를 사용하고 싶습니다. 파일을 사용하고 있다면 streaming() 호출을 사용할 수 있습니다.

또한 camel-sql/camel-jdbc/camel-jpa를 사용해야하나요, 아니면 빈을 사용하여 데이터베이스에서 읽어야합니까?

모두가 아직 나와 함께하기를 바랍니다. 저는 Java DSL에 익숙하지만 사람들이 제공 할 수있는 도움이나 제안을 주시면 감사하겠습니다.

업데이트 : 2 월 2012

그래서 나는이 함께 놀러 시간이 좀 했어 나는 실제로 내가 사용할 수 있도록 프로듀서의 개념을 남용하고있어 어떻게 생각 그것은 길에있다.

public class MyCustomRouteBuilder extends RouteBuilder { 

    public void configure(){ 
     from("timer:foo?period=60s").to("mycustomcomponent:TEST"); 

     from("direct:msg").process(new Processor() { 
       public void process(Exchange ex) throws Exception{ 
        System.out.println("Receiving value" : + ex.getIn().getBody()); 
       } 
     } 
    } 

} 

제 제작자는 다음과 유사합니다. 명확성을 위해 필자는 CustomEndpoint 또는 CustomComponent를 포함하지 않았으며 단순한 래퍼처럼 보입니다.

public class MyCustomProducer extends DefaultProducer{ 

    Endpoint e; 
    CamelContext c; 

    public MyCustomProducer(Endpoint epoint){ 
      super(endpoint) 
      this.e = epoint; 
      this.c = e.getCamelContext(); 
    } 

    public void process(Exchange ex) throws Exceptions{ 

     Endpoint directEndpoint = c.getEndpoint("direct:msg"); 
     ProducerTemplate t = new DefaultProducerTemplate(c); 

     // Simulate streaming operation/chunking of BIG data. 
     for (int i=0; i <20 ; i++){ 
      t.start(); 
      String s ="Value " + i ;     
      t.sendBody(directEndpoint, value) 
      t.stop();   
     } 
    } 
} 

첫째로 위의 것은 매우 깨끗해 보이지 않습니다. 이것을 수행하는 가장 깨끗한 방법은 내 낙타 경로가 소비하는 예정된 석영 작업을 통해 jms 대기열 (직접 대신 : msg 대신)을 채우는 것이고, 따라서 낙타 내에서 수신 된 메시지 크기보다 더 많은 유연성을 가질 수 있습니다 파이프 라인. 그러나 나는 Route의 일부로서 시간 기반 활성화를 설정하는 의미를 아주 좋아했습니다.

누구나 최선의 방법에 대해 생각하고 있습니까? 나의 이해에서

답변

0

은, 당신이 할 필요가있다 :

from("jpa:SomeEntity" + 
    "?consumer.query=select e from SomeEntity e where e.processed = false" + 
    "&maximumResults=150" + 
    "&consumeDelete=false") 
.to("jms:queue:entities"); 

maximumResults는 쿼리 당 얼마나 많은 개체의 한계를 정의합니다.

엔티티 인스턴스의 처리가 끝나면 엔티티 인스턴스가 다시 처리되지 않도록 e.processed = true;persist()을 설정해야합니다. 이 작업을 수행하는

한 가지 방법은 @Consumed 주석입니다 :

class SomeEntity { 
    @Consumed 
    public void markAsProcessed() { 
     setProcessed(true); 
    } 
} 

또 다른 것은, 당신은 당신이 대기열로 보내기 전에 개체를 직렬화하는 방법입니다주의해야합니다. from과 to 사이에 이 풍부해야을 사용해야 할 수도 있습니다.

+0

액션 북에서 낙타를보고 난 후에, 나는 이것을 줄 수 있다고 생각한다. 아니면 사용자 정의 구성 요소를 사용하여 좀 더 제어 할 수 있다고 생각한다. 변환은 사용자 지정 변환기/변환을 사용할 때 문제가됩니다. – K2J

+0

내가 발견 한 것은 엔터티의 기본 키를 직렬화하고 처리 논리를 그대로 두어 실제 엔터티를 검색하는 것입니다. 따라서 복잡한 사용자 정의 변환을 수행 할 필요가 없습니다.결국 엔티티는 키에 의해 구별됩니다. –

+0

Apache Nifi를 사용해 볼 수도 있습니다. Apache Nifi는이 사용 사례에 더 적합한 것으로 보입니다. – Pushkin