2012-07-27 6 views
8

저는 MassTransit을 처음 사용했기 때문에 이해가 안됩니다.MassTransit 및 이벤트 대 명령 게시

모든 노드가 동일한 작업을 수행 할 수있는 서버 팜이 있다고 가정 해 보겠습니다. 응용 프로그램 프레임 워크는 CQRS의 스타일입니다. 그게 내가 게시 할 메시지의 두 가지 기본 종류가 의미

  • 명령 : 처리해야합니다 그들
  • 이벤트 (무료 직업 슬롯에 첫번째) 중 하나가 서버 정확히 하나에 의해 처리되어야한다 모든 서버

저는 매우 간단한 MassTransit 프로토 타입 (매 X 초마다 hello를 보내는 콘솔 응용 프로그램)을 빌드했습니다.

API에는 '게시'방법이 있습니다. 어떤 종류의 메시지인지 (모든 서버 대 하나의 서버) 어떻게 지정할 수 있습니까?

"처리기"구성을 보면 대기열 uri를 지정할 수 있습니다. 모든 호스트에 대해 동일한 큐를 지정하면 모든 호스트가 메시지를 가져 오지만 하나의 서버에만 실행을 제한 할 수는 없습니다.

호스트 전용 큐에서 수신 대기하는 경우 하나의 서버 만 메시지를 처리하지만 다른 종류의 메시지를 브로드 캐스팅하는 방법을 알지 못합니다.

제가 누락 된 부분을 이해하도록 도와주세요.

추신 : 내 메시지 시스템은 rabbitmq입니다.

public static class ActualProgram 
{ 
    private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource(); 

    private static readonly Random g_Random = new Random(); 

    public static void ActualMain(int delay, int instanceName) 
    { 
     Thread.Sleep(delay); 
     SetupBus(instanceName); 

     Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token); 

     Console.WriteLine("Press enter at any time to exit"); 
     Console.ReadLine(); 
     g_Shutdown.Cancel(); 

     Bus.Shutdown(); 
    } 

    private static void PublishRandomMessage() 
    { 
     Bus.Instance.Publish(new Message 
     { 
      Id = g_Random.Next(), 
      Body = "Some message", 
      Sender = Assembly.GetEntryAssembly().GetName().Name 
     }); 

     if (!g_Shutdown.IsCancellationRequested) 
     { 
      Thread.Sleep(g_Random.Next(500, 10000)); 
      Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token); 
     } 
    } 

    private static void SetupBus(int instanceName) 
    { 
     Bus.Initialize(sbc => 
     { 
      sbc.UseRabbitMqRouting(); 
      sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName); 
      sbc.Subscribe(subs => 
      { 
       subs.Handler<Message>(MessageHandled); 
      }); 
     }); 
    } 

    private static void MessageHandled(Message msg) 
    { 
     ConsoleColor color = ConsoleColor.Red; 
     switch (msg.Sender) 
     { 
      case "test_app1": 
       color = ConsoleColor.Green; 
       break; 

      case "test_app2": 
       color = ConsoleColor.Blue; 
       break; 

      case "test_app3": 
       color = ConsoleColor.Yellow; 
       break; 
     } 
     Console.ForegroundColor = color; 
     Console.WriteLine(msg.ToString()); 
     Console.ResetColor(); 
    } 

    private static void MessageConsumed(Message msg) 
    { 
     Console.WriteLine(msg.ToString()); 
    } 
} 

public class Message 
{ 
    public long Id { get; set; } 

    public string Sender { get; set; } 

    public string Body { get; set; } 

    public override string ToString() 
    { 
     return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body); 
    } 
} 

난 그냥 ActualMain 방법 실행도 3 콘솔 응용 프로그램이 있습니다 :

internal class Program 
{ 
    private static void Main(string[] args) 
    { 
     ActualProgram.ActualMain(0, 1); 
    } 
} 

답변

9

당신이 원하는 무엇을 나는이 클래스와 일반 클래스 라이브러리를 만들 수있는 테스트하기 위해

당신이 더 많은 정보를 찾을 수 있도록 검색 소비자)로 알려져 있습니다 RabbitMQ를 사용하면 인생을 쉽게 만들어줍니다. 시작하는 각 소비자마다 동일한 대기열 이름을 지정하기 만하면됩니다. 그들. 매번 고유 한 큐를 생성하는 대신.

private static void SetupBus(int instanceName) 
{ 
    Bus.Initialize(sbc => 
    { 
     sbc.UseRabbitMqRouting(); 
     sbc.ReceiveFrom("rabbitmq://localhost/Commands); 
     sbc.Subscribe(subs => 
     { 
      subs.Handler<Message>(MessageHandled); 
     }); 
    }); 
} 

AFAIK, 이벤트 처리기와는 달리 명령 처리기에는 별도의 프로세스가 있어야합니다. 모든 명령 처리기는 동일한 대기열에서 Receive, 모든 이벤트 처리기는 고유 한 대기열에서 Receive됩니다.

퍼즐의 다른 부분은 버스에 메시지를받는 방법입니다. 명령에 대해 게시 기능을 계속 사용할 수는 있지만 메시지를 모든 사용자에게 전달할 수 있으므로 소비자를 잘못 구성한 경우 하나의 큐에 메시지가 끝나도록하려면 게시보다는 보내기를 사용할 수 있습니다.

Bus.Instance 
    .GetEndpoint(new Uri("rabbitmq://localhost/Commands")) 
    .Send(new Message 
    { 
     Id = g_Random.Next(), 
     Body = "Some message", 
     Sender = Assembly.GetEntryAssembly().GetName().Name 
    }); 
+0

감사합니다. 이 점을 이해하는 데 많은 도움이됩니다. 유일하게 두려워하는 것은 "이벤트 처리기와는 달리 명령 처리기를위한 별도의 프로세스가 필요합니다"라고 생각합니다. 이것은 글로벌 아키텍처에 영향을 미칠 것입니다.하지만 다른 선택이 없다면 나는 그걸로 살 것입니다. –

+0

경쟁 소비자의 경우 각 소비자에 대해 별도의 버스 인스턴스가 필요합니다.명령이 보통 별도의 버스 인스턴스에있는 이유는 대개 소비자가 여러 스레드에서 처리되는 경우 동 기적으로 처리되기 때문입니다. MT는 메시지 유형별로 동시성을 지정할 수는 없습니다. 단일 프로세스에서 여러 버스를 호스트 할 수 있다고 생각하지만 시도하지는 않았습니다. 나는 각자의 "서비스"를 호스트하기 위해 같은 녀석들의 탑 셰프를 사용한다. 그것은 당신이 프로세스 (appdomains) 또는 별도의 프로세스/별도의 컴퓨터에서 배포 시간에 매우 쉽게 선택할 수 있습니다 –

+0

thx. 나는 한번 볼게. –