2016-09-16 4 views
1

RabbitMQ를 사용하여 하나의 요청과 하나의 응답 큐를 사용하여 분산 RPC와 같은 솔루션을 구현하려고합니다. 이미 많은 솔루션을 구현했습니다. Apache Apollo와 함께 RabbitMQ로 마이그레이션 할 수 있기를 원했을 것입니다. 여기서 중요한 포인트는RabbitMQ : 메시지 선택을 위해 라우팅 사용

  • 각 서버는 요청 큐에 연결 아폴로 내 구현에서 (헤더 필드)

을 그를 위해 있어야하는

  • 각 서버 만 처리 요청 요점은 선택기 (헤더 필드의 값에 대한 절과 같은)를 사용하는 것이 었습니다. 라우팅 및 라우팅 키를 통해 RabbitMQ에서이를 달성했다고 생각했지만, 작업자가 메시지가 아닌 메시지를 수신하는 것이 틀림 없어야합니다. .

    문제를 재현하기 위해 라우팅 샘플 (http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html)을 수정 했으므로 routingKey를 정의하는 여러 매개 변수와 소비자 중 하나에 대한 메시지를 생성 한 생산자로 시작할 수있는 두 명의 소비자가 있습니다. 내가 보는 행동은 메시지의 소비가 무작위로 보인다는 것입니다 ('John'은 소비자가 처음 'John'을, 소비자가 'Mary'를 두 번째로 처리 함)

    아무도 RabbitMQ에서 셀렉터 사용시 표시 또는 코드 스 니펫?

    public static void Main(String[] args) 
    { 
        var factory = new ConnectionFactory { HostName = "localhost" }; 
        using (var connection = factory.CreateConnection()) 
         using (var channel = connection.CreateModel()) 
         { 
          const String request = "request"; 
          channel.ExchangeDeclare(request, "direct"); 
    
          channel.QueueDeclare(request, true, false, false, null); 
    
          if (args.Length < 1) 
          { 
           Console.WriteLine(" Press [enter] to exit."); 
           Console.ReadLine(); 
           Environment.ExitCode = 1; 
           return; 
          } 
    
          var myRoutingKey = args[0]; 
          channel.QueueBind(request, request, myRoutingKey); 
    
          Console.WriteLine($" [*] Waiting for messages for {myRoutingKey}."); 
    
          var consumer = new EventingBasicConsumer(channel); 
          consumer.Received += (model, ea) => 
          { 
           var body = ea.Body; 
           var message = Encoding.UTF8.GetString(body); 
           var routingKey = ea.RoutingKey; 
           Console.WriteLine($" [x] Received '{routingKey}':'{message}'"); 
          }; 
          channel.BasicConsume(request, true, consumer); 
    
          Console.WriteLine(" Press [enter] to exit."); 
          Console.ReadLine(); 
         } 
    } 
    

    및 생산자에 대한 : 소비자에 대한 내 코드 아래

    사전에

    public static void Main(String[] args) 
    { 
        var factory = new ConnectionFactory { HostName = "localhost" }; 
        using (var connection = factory.CreateConnection()) 
         using (var channel = connection.CreateModel()) 
         { 
          const String request = "request"; 
          channel.ExchangeDeclare(request, "direct"); 
    
          channel.QueueDeclare(request, true, false, false, null); 
    
          var routingKey = args.Length > 0 ? args[0] : "John"; 
    
          const String message = "Hi"; 
          var body = Encoding.UTF8.GetBytes(message); 
          channel.BasicPublish(request, routingKey, null, body); 
          Console.WriteLine($" [x] Sent '{routingKey}':'{message}'"); 
         } 
    
        Console.WriteLine(" Press [enter] to exit."); 
        Console.ReadLine(); 
    } 
    

    감사합니다.

  • 답변

    0

    왜 이것이 작동하지 않았을지 짐작할 수 있습니다. 열쇠는 소비자의 두 줄입니다.

    "request"는 소비자의 "all"에 대한 대기열의 이름입니다. 여러 라우팅 키를 사용하여 여러 바인딩을 설정하는이 프로그램을 실행하면 "요청"이라는 대기열이 여러 라우팅 키 (예 : "John", "Mary")를 사용하는 교환기에 바인딩됩니다. 이 바인딩을 수행 할 때 바인딩은 RabbitMQ 서버에서 일시적이지 않으며 고정되어 있습니다.

    이제 문제 해결 방법으로 돌아갑니다. 여러 옵션이 있지만 여기에는 그 중 하나가 있습니다. 먼저 RabbitMQ Model을 읽는 것이 좋습니다.

    var queueName = channel.QueueDeclare().QueueName; 
    channel.QueueBind(queueName, request, myRoutingKey); 
    

    그러나 위의 새로운 큐가 원하는과 교환에 만든 바인딩 당신이 당신의 소비자 프로그램을 실행할 때마다 의미

    대신 당신의 이러한 선이 같은 튜토리얼 코드를 사용할 수 있습니다 라우팅 키. 다른 방법은 이전에했던 것과 동일한 코드를 사용하는 것이지만 고정 큐 이름 대신 큐 이름을 적절하게 선택하는 것입니다. 예를 들어, 키

    var queueName = myRoutingKey ; 
    channel.QueueDeclare(queueName, true, false, false, null); 
    channel.QueueBind(queueName, request, myRoutingKey); 
    

    라우팅 당 하나의 큐가 아니면 할 수 또는 당신이 튜토리얼 샘플과 유사 하나의 큐에 그룹 라우팅 키의 수를 할 수 있습니다.

    요점은 하나의 대기열로 수행 할 수 없다는 것입니다. (메시지를 소비 할 때 메시지를 필터링하는 것 외에는). 그러나 그것은 당신을위한 진정한 요구 사항처럼 들리지 않았습니다. 여러분이 물어 본 것은 각 소비자 서버가이 모델에서 할 수있는 관련 메시지 만 다루는 것입니다. 제작자는 그 교환 원에게만 게시합니다 (당신이 원했던 것입니다).

    +0

    안녕하세요 Amin, 답변 주셔서 감사합니다. 동적으로 생성 된 대기열을 사용하면 문제가 발생하지 않는다는 것도 발견했습니다. 어쨌든 성능에 대한 우려 때문에 그리고 다른 브로커와 함께 구현 한 다른 솔루션이 그 종류 였기 때문에 요청과 응답 대기열로만 솔루션을 찾고있었습니다. 후자는 문제가 아니며 약간의 재 설계가 필요하지만 다른 모델을 인식 한 후에는 반드시 성능을 검증해야합니다. – Leon

    +0

    또한 동적으로 생성 된 대기열을 사용하면 라우팅 소비자가 실행되지 않는 동안 생성 된 모든 메시지가 손실됩니다. 나는 또한 교환기를 내구성으로 선언하려했지만 도움이되지 않습니다. – Leon

    +0

    동적으로 생성 된 큐는 본질적으로 내가 생각한 큐의 이름을 잃어 버리기 때문에 임시 큐가 될 것입니까? 동적 큐 이름이 일관성있는 이름을 사용하는 솔루션 일 수 있다고 생각하지 않습니다. 이러한 이름을 실제로 선택자로 생각하십시오. 이런 식으로하는 것은 성과가 없습니다. 이것은 RabbitMQ가 설계된 것입니다. –

    관련 문제