2016-12-22 6 views
2

저는 RabbitMQ와 함께 MassTransit을 사용하여 Service Fabric에서 실행되는 서비스에 Stateful 서비스로 메시지를 게시하는 웹 사이트의 데모를 제안하려고했습니다.MassTransit 및 서비스 패브릭 스테이트 풀 서비스?

IBusControl bus = BusConfigurator.ConfigureBus(); 
Uri sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}" + $"{RabbitMqConstants.PeopleServiceQueue}"); 
ISendEndpoint endPoint = await bus.GetSendEndpoint(sendToUri); 

await endPoint.Send<ICompanyRequest>(new {CompanyId = id }); 

내 서비스 패브릭 서비스 내 소비자가 다음과 같이 정의했다 :

 IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
     { 
      IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h => 
      { 
       h.Username(RabbitMqConstants.UserName); 
       h.Password(RabbitMqConstants.Password); 
      }); 

      cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e => 
      { 
       e.Consumer<PersonInformationConsumer>(); 
      }); 

     }); 

     busControl.Start(); 

이 날의 메시지를 소비 할 수 있도록 않는 모든 것이 잘 가고 있었다

, 내 클라이언트는 메시지를 게시 할 내 수업과 나는 그것을 잘 처리 할 수있다. 이 문제는 IReliableDictonary 또는 IReliableQueue 또는 서비스 패브릭 서비스의 RunAsync 함수에서 실행되는 컨텍스트를 참조해야하는 모든 것을 사용하려는 경우에 발생합니다.

제 질문은 어떻게 서비스 문맥 자체에 대한 지식이있는 Stateful Service Fabric Service 내에서 MassTransit이 작동하도록 구성 할 수 있습니까?

미리 감사드립니다. 마이크

업데이트 내 메시지 사용자 클래스에 등록 루틴을 가리키는 경우 가 좋아, 내가 (예),이에 대한 몇 가지 진전을했습니다

다음
ServiceRuntime.RegisterServiceAsync("ServiceType", context => new PersonInformationConsumer(context)).GetAwaiter().GetResult(); 
       ServiceEventSource.Current.ServiceTypeRegistered(Process.GetCurrentProcess().Id, typeof(PersonInformationConsumer).Name); 

내 소비자 클래스 내

0 :

internal sealed class PersonInformationConsumer : StatefulService, IConsumer<ICompanyRequest> 
{ 
    private static StatefulServiceContext _currentContext; 

    #region Constructors 

    public PersonInformationConsumer(StatefulServiceContext serviceContext) : base(serviceContext) 
    { 
     _currentContext = serviceContext; 
    } 

    public PersonInformationConsumer() : base(_currentContext) 
    { 
    } 

지금은 성공적으로 서비스 메시지를 호출 할 수 있습니다 : 메시지 나는 다음을 수행 할 수 있습니다

내가 지금 가지고있는 문제는 "객체 참조가 객체의 인스턴스로 설정되지 않았습니다"라는 오류로 인해 발생하는 IReliableDictionary에 무언가를 저장하려고하는 것입니다. 새해까지 지금)

 public async Task Consume(ConsumeContext<ICompanyRequest> context) 
    { 

     ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId); 

     using (ITransaction tx = StateManager.CreateTransaction()) 
     { 

      try 
      { 

       var myDictionary = await StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary"); 

이것은 오류의 원인입니다 .... 도움이됩니다! :)

+0

나는 서비스 패브릭 SDK가 설치되어 있지 않지만, 비동기 방식의 내부 _static_ 클래스에 대한 액세스를 보는 것은 확실히 신뢰할 수있는 사전 인스턴스에 액세스하는 또 다른 방법이 나 일시 중지 제공합니다. –

답변

0

MassTransit과 상태 저장 서비스를 함께 사용하려면 좀 더 많은 작업이 필요합니다. 여기에 관심이있는 몇 가지 문제가 있습니다.

상태 기반 파티션의 마스터 (n 개의 파티션 내의 n 마스터)만이 상태 저장 서비스에 대한 쓰기/업데이트가 가능하며 모든 복제본은 상태를 다시 쓰려고 할 때 예외를 throw합니다. 따라서이 문제를 해결할 필요가 있습니다. 마스터가 클러스터의 균형을 조정할 때 클러스터 주변을 이동할 수 있다고 생각할 때까지는 쉬운 일이 아닙니다. 일반적인 서비스 패브릭 응용 프로그램의 기본값은 복제본을 처리하고 마스터에서만 작업을 실행합니다. 이 모든 것은 RunAsync 메서드에 의해 수행됩니다 (RunAsync 메서드에서 noddy를 사용하여 3 개의 상태 저장 서비스를 실행 한 다음 마스터를 종료하십시오).

또한 파티션을 통한 상태 저장 서비스로 인해 서비스 버스에서 별도의 끝점에 데이터를 배포하는 방법을 만들거나 대기하는 별도의 대기열이 필요할 수도 있습니다. 주어진 파티션 범위? UserCreated 메시지가 있다고 가정하면 country에 분할 할 수 있습니다. UK은 1 번 파티션으로, US은 2 등으로갑니다.

기본 동작을 얻으려면 하나의 파티션으로 제한하고 RunAsync 내에 버스 만들기를 시도하고 취소 토큰에서 취소가 요청되면 버스를 종료하십시오.

protected override async Task RunAsync(CancellationToken cancellationToken) 
{ 
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
    { 
     IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h => 
     { 
      h.Username(RabbitMqConstants.UserName); 
      h.Password(RabbitMqConstants.Password); 
     }); 

     cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e => 
     { 
      // Pass in the stateful service context 
      e.Consumer(c => new PersonInformationConsumer(Context)); 
     }); 

    }); 

    busControl.Start(); 

    while (true) 
    { 
     if(cancellationToken.CancellationRequested) 
     { 
      //Service Fabric wants us to stop 
      busControl.Stop(); 

      cancellationToken.ThrowIfCancellationRequested(); 
     } 

     await Task.Delay(TimeSpan.FromSeconds(1)); 
    } 
} 
+0

Service Listener를 사용하여 POC를 약간 수행 했으므로이 예제를 확인하십시오. - https://github.com/kevbite/MassTransit.ServiceFabric –