파일을 서버로 보내는 클라이언트가 여러 대 있습니다. 한 세트의 데이터에는 해당 데이터에 대한 정보가 들어있는 두 개의 파일이 있습니다. 각 파일에는 동일한 이름이 사용됩니다. 파일이 수신되면 서버는 파일 경로, 파일 이름, 클라이언트의 ID 및 파일의 "유형"(모두 동일한 파일 확장자를 갖지만 두 개의 유형이있는)을 포함하는 메시지를 내 대기열로 보냅니다. "그들에게 A와 B라고 부름).낙타를 사용하여 동일한 헤더의 메시지 집계
한 세트의 데이터에 대한 두 개의 파일은 동일한 파일 이름을 갖습니다. 서버가 두 파일을 모두받는 즉시 두 파일을 결합한 프로그램을 시작해야합니다. 현재 I는 다음과 같습니다 뭔가가 : 나는 끼 었어
from("jms:queue.name").aggregate(header("CamelFileName")).completionSize(2).to("exec://FILEPATH?args=");
헤더 ("CamelFileName은")입니다,보다 구체적으로 어떻게 그리 게이터 작동합니다.
completionSize를 2로 설정하면 첫 번째 메시지와 일치하는 두 번째 메시지가 나타날 때까지 모든 메시지를 빨아 들여 일부 데이터 구조에 저장합니까? 또한, header()는 특정 값을 기대합니까? 헤더에 클라이언트 ID와 파일 이름을 가지고 있다고 생각 했으므로 여러 클라이언트가 있지만 특정 값을 부여해야하는지 다시 알지 못합니다. 나는 또한 내가 정규식을 사용할 수 있는지 여부를 모른다.
어떤 아이디어 나 조언도 도움이 될 것입니다. 감사합니다.
편집 : 다음은 현재 가지고있는 코드입니다. 여기에있는 문제에 대한 내 설명과 선택한 대답에 대한 의견을 바탕으로 정확하지 않은 것 같습니다 (내가 덮어 쓰지 않은 닫는 괄호 외에)? 당신은 파일 이름에만 관심이 정확히 2 교류를 수신, 당신은 단지 UseLatestAggregationStrategy에 사용할 수있는 경우
public static void main(String args[]) throws Exception{
CamelContext c = new DefaultCamelContext();
c.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
//ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
//c.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
c.addRoutes(new RouteBuilder() {
public void configure() {
from("activemq:queue:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("activemq:queue:analytics.success");
}
});
c.start();
while (true) {
System.out.println("Waiting on messages to come through for camel");
Thread.sleep(2 * 1000);
}
//c.stop();
}
private static class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null)
return newExchange;
// and here is where combo stuff goes
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
boolean oldSet = oldBody.contains("set");
boolean newSet = newBody.contains("set");
boolean oldFlow = oldBody.contains("flow");
boolean newFlow = newBody.contains("flow");
if ((oldSet && newFlow) || (oldFlow && newSet)) {
//they match so return new exchange with info so extractor can be started with exec
String combined = oldBody + "\n" + newBody + "\n";
newExchange.getIn().setBody(combined);
return newExchange;
}
else {
// no match so do something....
return null;
}
}
}
그래서 AggregationStrategy는 어떻게 든 메시지를 결합하여 'exec'에 부여 할 수 있습니까? "두 거래소"는 무엇을 의미합니까? 방금 어제 Camel을 사용하기 시작 했으므로 모든 것이 여전히 나에게 새롭다. – thaweatherman
np, Camel에 오신 것을 환영합니다. Aggregator는 꽤 복잡하고 강력한 도구입니다 ... 간단히 말해서, Exchange는 메시지 (귀하의 대기열 등) , 그래서 당신이 2 개의 메시지 (fileName에 의해 상관 된)를 기다리고 있다면, 함께 묶인 2 개의 교환으로 끝나게 될 것입니다 ... 그러면 관련 데이터를 끌어 와서 exec (fileName, clientIDs, etc) ... –
파일 이름은 다른 클라이언트에서 동일 할 수 있으므로 ID와 파일 이름을 연관시킬 수 있습니까? – thaweatherman