2016-11-30 3 views
1

오류 대기열을 원래 대기열로 다시 이동하려고합니다. 이 작업을 수행하기 위해 Error Queue에 Consumer를 생성 한 다음 Required Queue에 게시했습니다. 이것을 사용하면 소비 된 메시지의 절반이 게시되지만 나머지 절반은 Error_Skipped 대기열로 전송됩니다.MassTransit RabbitMQ 오류 대기열에서 소비 된 메시지의 절반을 Error_Skipped 대기열로 이동합니다.

나는 성공하지 못한 채 많은 것을 시도해 보았습니다. 그래서 그것은 제가 누락 된 것일 수도 있습니다.

여기 내 코드의 샘플은 다음과 같습니다

public class ClaimsMessage 
{ 
    public string Description { get; set; } 

    public DateTime Date { get; set; } 

    public bool Handled { get; set; } 
} 

public class ClaimsMessageErrorConsumer : IConsumer<Fault<ClaimsMessage>> 
{ 
    public async Task Consume(ConsumeContext<Fault<ClaimsMessage>> context) 
    { 
     try 
     { 
      await context.Publish<ClaimsMessage>(context.Message.Message); 

     } 
     catch (Exception e) 
     { 
      string error = e.Message; 
     } 
    } 
} 

public static IBusControl CreateClaimsErrorConsumerBus(string endPoint) 
{ 
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
    { 
     var host = cfg.Host(new Uri("rabbitmq://localhost/"), h => 
     { 
      h.Username("guest"); 
      h.Password("guest"); 
     }); 

     cfg.ReceiveEndpoint(host, endPoint, e => 
     { 
      e.Consumer(() => new ClaimsMessageErrorConsumer()); 
     }); 
    }); 
    return busControl; 
} 
+0

[셔블?] (https://www.rabbitmq.com/shovel.html)을 사용 해본 적이 있습니까? – stuartd

+0

삽으로 보았습니다. 그러나 약간 기본입니다. 규칙을 추가하여 나중에 특정 메시지 만 다시 이동하고 나머지는 나중에 옮길 수 있습니다. –

+0

[메일 링리스트] (https://groups.google.com/forum/#!forum/masstransit-discuss)에서 시도 할 수 있습니다. – stuartd

답변

1

다시 처리 큐에 오류 큐에서 메시지를 이동하는 경우, 당신은 Publish를 호출해서는 안 -이 모든 가입자에게 메시지를 다시 보냅니다. 큐 이름을 이미 알고 있으므로 메시지를 다시 큐에 보냅니다. 여러분이보고있는 것은 오류 대기열에 소비자를 생성했기 때문에 해당 메시지에 대한 교환 바인딩이 만들어 졌다는 것입니다.

그래서, 대신 이렇게 : 메시지의 충실도가 유지되도록

추가 신용 들어
sbc.ReceiveEndpoint("input_error", x => 
{ 
    // this prevents extra message bindings from being created 
    x.BindMessageExchanges = false; 

    x.Consumer<MyMover>(() => new MyMover(inputQueueAddress); 
}); 

public class MyMover : 
    IConsumer<ClaimsMessage> 
{ 
    public async Task Consume(ConsumeContext<ClaimsMessage> context) 
    { 
     try 
     { 
      var endpoint = await context.GetSendEndpoint(_inputQueueAddress); 
      await endpoint.Send<ClaimsMessage>(context.Message); 
     } 
     catch (Exception e) 
     { 
      string error = e.Message; 
     } 
    } 
} 

원래 메시지 헤더를 통해 복사합니다.

+0

답장을 보내 주셔서 감사합니다. 불행히도 여전히 동일하고 하나의 메시지가 Claims Queue에 있고 다른 하나는 Claims_error_skipped 대기열에 있습니다. 헤더를 통해 복사하지 않았습니다. 그것이 그 원인이 될 수 있다고 생각합니까? –

+0

Chris 님, 안녕하세요. 여기에서 다운로드 할 수있는 예제 프로젝트를 만들었습니다 : https://drive.google.com/file/d/0B0FYiKs0DMyrYTJfZTcxdVlKSDg/view?usp=sharing[link] 여기에는 복제 방법에 대한 단계별 지침이 포함되어 있습니다 . 나는 아주 잘 빠져있을 수 있지만 문서를 아주 광범위하게 읽었습니다. –

+0

오류 대기열에서 읽는 경우 오류 이 아니라 원래 메시지를 사용해야합니다. 기본적으로 오류 큐에 오류 에 대한 추가 소비자 바인딩을 만들었습니다.이 큐는 큐에서 이동 한 원래 메시지 외에 게시 된 오류를 가져옵니다. 대신 T를 사용하고 RMQ의 바인딩을 정리하면 모든 설정이 완료됩니다. –

관련 문제