2017-01-20 2 views
1

분할 및 집계 기능이있는 Camel에서 경로를 정의했지만 집계 자 다음에 예외를 다시 분할로 전파 할 수 없습니다. 어떤 원인이 우리가 예외 아래Apache Camel 분할 및 집계 예외 처리

에게 발생하는 경우 분할도 실행의 아래에있는 위의 코드에서

from("direct:MyRoute") 
    .routeId("MyRouteID") 
     .split().tokenize("\n", 1) 
     .streaming().stopOnException() 
     .choice() 
     .when(simple("${property.CamelSplitIndex} > 0")) 
      .unmarshal(domainDataFormat) 
      .choice() 
      .when(simple("${property.CamelSplitComplete}")) 
      .process(
       new Processor() 
       { 
       @Override 
       public void process(Exchange exchange) throws Exception 
       { 
        exchange.getIn().getHeaders().put(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, true); 
       } 
       } 
      ) 
      .end() 
      .aggregate(myAggregationStrategy).constant(true) //if i comment this line split will be stop on exception 
      .threads().executorService(executorService) 
      .process(myProcessor).end() 
    .end(); 

프로세서 (myProcessor)를 작동하지 않는 코드 :

counter.incrementAndGet(); //atomic counter 
if(counter.get()==3) 
    { 
    exchange.setException(new RuntimeException()); 
    throw new RuntimeCamelException(); 
    } 

그러나 , 경로에서 집계를 제거하는 순간 Split은 예외에서 경로를 중지 할 수 있습니다.

답변

1

동일한 작업 단위 (UOW)에서 split + aggregate (fork/join) 인 복합 메시지 프로세서 EIP를 사용하십시오.

은 문서를 참조하십시오 : http://camel.apache.org/composed-message-processor.html

그리고 당신이 함께 일해야하는 스플리터로 집계 전략을 지정할 수 있습니다 스플리터 만 절을 참조하십시오.

+0

감사합니다. Claus. 으로 확인할 수 있습니다. 집계 호출 전에 .streaming 및 .unmarshal을 사용하고 있는지 확인할 수 있습니다. 우리는 split + aggregate로 그렇게 할 수 없습니다. 주위에 다른 방법이 있습니까? –

+0

계속 그렇게 할 수 있습니다. 집계를 제거하고 스플리터에 agg 전략을 추가하면 split complete et all을 검사하는 논리가 필요하지 않습니다. Camel in Action 책을 가지고 있다면 더 많은 세부 정보가있는이 패턴에 대해 읽어보십시오 –

+0

안녕하세요 Claus, 우리가 우리의 요구 사항에서 이것을 어떻게 사용하는지 설명하겠습니다. 위에서 설명한 경로를 사용하여 거대한 평탄한 고정 길이 파일을 처리하므로 일반적인 흐름은 파일을 한 줄씩 나눠서 스트림으로 만든 다음 비 정렬으로 POJO를 가져옵니다 (bindy 사용). (멀티 스레드) 하지만이 작업은 집계 및 분할 후에 언 마샬링 및 스트리밍을 수행하는 데 도움이되지 않으므로 split + 집계에서는 가능하지 않습니다. –