2013-06-21 4 views
1

파일을 서버로 보내는 클라이언트가 여러 대 있습니다. 한 세트의 데이터에는 해당 데이터에 대한 정보가 들어있는 두 개의 파일이 있습니다. 각 파일에는 동일한 이름이 사용됩니다. 파일이 수신되면 서버는 파일 경로, 파일 이름, 클라이언트의 ID 및 파일의 "유형"(모두 동일한 파일 확장자를 갖지만 두 개의 유형이있는)을 포함하는 메시지를 내 대기열로 보냅니다. "그들에게 A와 B라고 부름).낙타를 사용하여 동일한 헤더의 메시지 집계

한 세트의 데이터에 대한 두 개의 파일은 동일한 파일 이름을 갖습니다. 서버가 두 파일을 모두받는 즉시 두 파일을 결합한 프로그램을 시작해야합니다. 현재 I는 다음과 같습니다 뭔가가 : 나는 끼 었어

from("jms:queue.name").aggregate(header("CamelFileName")).completionSize(2).to("exec://FILEPATH?args="); 

헤더 ("CamelFileName은")입니다,보다 구체적으로 어떻게 그리 게이터 작동합니다.

completionSize를 2로 설정하면 첫 번째 메시지와 일치하는 두 번째 메시지가 나타날 때까지 모든 메시지를 빨아 들여 일부 데이터 구조에 저장합니까? 또한, header()는 특정 값을 기대합니까? 헤더에 클라이언트 ID와 파일 이름을 가지고 있다고 생각 했으므로 여러 클라이언트가 있지만 특정 값을 부여해야하는지 다시 알지 못합니다. 나는 또한 내가 정규식을 사용할 수 있는지 여부를 모른다.

어떤 아이디어 나 조언도 도움이 될 것입니다. 감사합니다.

편집 : 다음은 현재 가지고있는 코드입니다. 여기에있는 문제에 대한 내 설명과 선택한 대답에 대한 의견을 바탕으로 정확하지 않은 것 같습니다 (내가 덮어 쓰지 않은 닫는 괄호 외에)? 당신은 파일 이름에만 관심이 정확히 2 교류를 수신, 당신은 단지 UseLatestAggregationStrategy에 사용할 수있는 경우

public static void main(String args[]) throws Exception{ 
     CamelContext c = new DefaultCamelContext(); 
     c.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false")); 
     //ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 
     //c.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); 
     c.addRoutes(new RouteBuilder() { 
      public void configure() { 
       from("activemq:queue:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("activemq:queue:analytics.success"); 
      } 
     }); 
     c.start(); 
     while (true) { 
      System.out.println("Waiting on messages to come through for camel"); 
      Thread.sleep(2 * 1000); 
     } 
     //c.stop(); 
    } 

    private static class MyAggregationStrategy implements AggregationStrategy { 

     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
      if (oldExchange == null) 
       return newExchange; 
      // and here is where combo stuff goes 
      String oldBody = oldExchange.getIn().getBody(String.class); 
      String newBody = newExchange.getIn().getBody(String.class); 
      boolean oldSet = oldBody.contains("set"); 
      boolean newSet = newBody.contains("set"); 
      boolean oldFlow = oldBody.contains("flow"); 
      boolean newFlow = newBody.contains("flow"); 
      if ((oldSet && newFlow) || (oldFlow && newSet)) { 
       //they match so return new exchange with info so extractor can be started with exec 
       String combined = oldBody + "\n" + newBody + "\n"; 
       newExchange.getIn().setBody(combined); 
       return newExchange; 
      } 
      else { 
       // no match so do something.... 
       return null; 
      } 
     } 
    } 

답변

3

당신이 교환을 결합하는 방법을 정의 할 AggregationStrategy를 제공해야합니다 ...

단지 한 번 2 '집계'한을 통해 최신 교환을 통과 ...

는 말했다, 당신은 (각 클라이언트 ID를) 모두 교류를 유지해야하는 것처럼 들린다 그래서 당신은 '간부'단계에 그 정보를 전달할 수 있습니다 ... 만약 그렇다면, 당신은 Exchanges를 GroupedExchange 홀더에 결합 할 수 있습니다. groupExchanges 옵션을 통해 유도하거나 사용자 정의 AggregationStrategy를 구체화하여 원하는대로 조합 할 수 있습니다. 단지

예제 이러한 단위 테스트를 참조 ... 당신의 '간부'단계는 사용하기로 결정 집계 어떤 구조를 처리해야 명심해야합니다

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java

+0

그래서 AggregationStrategy는 어떻게 든 메시지를 결합하여 'exec'에 부여 할 수 있습니까? "두 거래소"는 무엇을 의미합니까? 방금 어제 Camel을 사용하기 시작 했으므로 모든 것이 여전히 나에게 새롭다. – thaweatherman

+0

np, Camel에 오신 것을 환영합니다. Aggregator는 꽤 복잡하고 강력한 도구입니다 ... 간단히 말해서, Exchange는 메시지 (귀하의 대기열 등) , 그래서 당신이 2 개의 메시지 (fileName에 의해 상관 된)를 기다리고 있다면, 함께 묶인 2 개의 교환으로 끝나게 될 것입니다 ... 그러면 관련 데이터를 끌어 와서 exec (fileName, clientIDs, etc) ... –

+0

파일 이름은 다른 클라이언트에서 동일 할 수 있으므로 ID와 파일 이름을 연관시킬 수 있습니까? – thaweatherman

관련 문제