2016-09-08 5 views
0

전송 서비스로 Azure Service Bus를 사용하지만 IConsumer 내부에서 전화를 걸 때와 달리 스케줄 된 메시지는 작동하지 않습니다.MassTransitStateMachine 일정이 잘못 되었습니까?

나는 몇 시간과 며칠을 보냈지 만 아직도 무슨 일이 일어나고 있는지 거의 알지 못합니다.

하늘색 서비스 버스를 사용하는 상태 시스템에서 일정을 잡으려면 내가해야 할 일을 설명 할 수 있습니까? 그리고 IConsumer 컨텍스트에서 메시지가 작동하는 이유는 무엇입니까?

public class BatchCollector : MassTransitStateMachine<BufferSaga> 
{ 
    public BatchCollector(IBatchFactory batchFactory) 
    { 
     InstanceState(saga => saga.State); 
     Event(() => BufferedCommandDetected, 
      _ => _.CorrelateById(context => context.Message.GetBatchId())); 

     Schedule(() => WindowElapsed, x => x.BatchCompletionId, x => 
     { 
      x.Delay = TimeSpan.FromSeconds(5); 
      x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); 
     }); 


     Initially(
      When(BufferedCommandDetected) 
       .Then(
        context => 
        { 
         context.Instance.CorrelationId = context.Data.GetBatchId(); 
         context.Instance.Id = Guid.NewGuid().ToString("N"); 
         context.Instance.Buffer.Add(context.Data); 
         context.Instance.BatchStartTime = DateTimeOffset.Now; 
         context.Instance.AbsoluteDeadLine = DateTimeOffset.Now + context.Data.AbsoluteWindowSpan; 
         context.Instance.SlidingDeadLine = DateTimeOffset.Now + context.Data.SlidingWindowSpan; 
        }) 
       .Schedule(WindowElapsed, 
        context => new WindowElapsed {CorrelationId = context.Instance.CorrelationId }, 
        delayProvider: scheduleDelayProvider => scheduleDelayProvider.Data.SlidingWindowSpan < scheduleDelayProvider.Data.AbsoluteWindowSpan ? scheduleDelayProvider.Data.SlidingWindowSpan : scheduleDelayProvider.Data.AbsoluteWindowSpan) 
       .TransitionTo(Waiting)); 

     During(Waiting, 
      When(BufferedCommandDetected) 
       .Then(context => 
       { 
        context.Instance.SlidingDeadLine += context.Data.SlidingWindowSpan; 
        context.Instance.Buffer.Add(context.Data); 
       }), 
      When(WindowElapsed.Received, context => context.Instance.SlidingDeadLine > DateTimeOffset.Now && context.Instance.AbsoluteDeadLine > DateTimeOffset.Now) 
       .Schedule(WindowElapsed, context => new WindowElapsed { CorrelationId = context.Instance.CorrelationId }), 
      When(WindowElapsed.Received, context => context.Instance.SlidingDeadLine <= DateTimeOffset.Now || context.Instance.AbsoluteDeadLine <= DateTimeOffset.Now) 
       //.Unschedule(WindowElapsed) 
       .Publish(context => new Batch() 
       { 
        BatchId = context.Instance.BatchCompletionId ?? Guid.NewGuid(), 
        Content = context.Instance.Buffer, 
        StartTime = context.Instance.BatchStartTime, 
        EndTime = DateTimeOffset.Now 
       }) 
       .Finalize() 
       .TransitionTo(BufferCompleted)); 

     SetCompletedWhenFinalized(); 
    } 

    public Event<BufferedCommand> BufferedCommandDetected { get; private set; } 


    public Schedule<BufferSaga, WindowElapsed> WindowElapsed { get; private set; } 

    public State Waiting { get; private set; } 

    public State BufferCompleted { get; private set; } 
} 

는 버스 초기화는 : 다음

container.RegisterType<IBusControl>(
      new HierarchicalLifetimeManager(), 
      new InjectionFactory(c => 
      { 
       var bus = Bus.Factory.CreateUsingAzureServiceBus(
        cfg => 
        { 
         var azSbHost = cfg.Host(new Uri(CloudConfigurationManager.GetSetting("ServiceBus.Url")) 
          , host => 
          { 
           host.TokenProvider = TokenProvider 
            .CreateSharedAccessSignatureTokenProvider 
            (CloudConfigurationManager.GetSetting("ServiceBus.SharedAccessKeyName"), 
             CloudConfigurationManager.GetSetting("ServiceBus.AccessKey"), 
             TokenScope.Namespace); 
          }); 

         cfg.ReceiveEndpoint(
          azSbHost, 
          "Quartz.Scheduler", 
          sbConfig => 
           { 
            cfg.UseMessageScheduler(sbConfig.InputAddress); 
            sbConfig.Consumer(() => new ScheduleMessageConsumer(c.Resolve<IScheduler>())); 
           } 
         ); 

         cfg.ReceiveEndpoint(
          azSbHost, 
          Assembly.GetExecutingAssembly().GetName().Name, 
          sbConfig => 
          { 
           AllClasses.FromAssembliesInBasePath() 
            .Where(
             @class => 
              (@class?.Namespace?.StartsWith("bcn", 
               StringComparison.OrdinalIgnoreCase) ?? false) 
              && 
              @class.GetParentClasses() 
               .Any(
                parent => 
                  parent.Name.StartsWith("MassTransitStateMachine`1"))) 
            .ForEach(@class => 
            { 
             //dynamic cast to avoid having to deal with generic typing when type is not known until runtime.             
             dynamic stateMachineExtension = 
              new DynamicStaticWrapper(typeof(StateMachineSubscriptionExtensions)); 
             stateMachineExtension 
              .StateMachineSaga(
               sbConfig, 
               c.Resolve(@class), 
               c.Resolve(typeof(ISagaRepository<>).MakeGenericType(
                @class.GetParentClasses().First(parent => 
                   parent.Name.StartsWith("MassTransitStateMachine`1")) 
                 .GetGenericArguments().First()))); 
            }); 



           AllClasses.FromAssembliesInBasePath() 
            .Where(
             @class => 
              (@class?.Namespace?.StartsWith("bcn", StringComparison.OrdinalIgnoreCase) ?? 
              false) 
              && @class.GetInterfaces().Any(
               @interface => 
                @interface?.FullName?.StartsWith("MassTransit.IConsumer`1") ?? 
                false)) 
            .ForEach(@class => 
            { 
             var factoryType = typeof(UnityConsumerFactory<>).MakeGenericType(@class); 
             //Automatically register consumers. 
             dynamic consumerFactory = Activator.CreateInstance(
              factoryType, 
              container); 
             var consumingMethod = typeof(ConsumerExtensions). 
              GetMethods() 
              .First(
               m => 
                m.Name == "Consumer" && m.IsGenericMethod && 
                m.GetGenericArguments().Length == 1 && 
                m.GetParameters().Length == 3) 
              .MakeGenericMethod(@class) 
              .Invoke(null, new object[] {sbConfig, consumerFactory, null}); 

             //Automatically detect which payload contains message data. This message data is stored in blob. 
             @class.GetInterfaces().Where(
               @interface => 
                 @interface.FullName.StartsWith("MassTransit.IConsumer`1")) 
              .Select(@interface => @interface.GetGenericArguments().First()) 
              .Where(payload => payload.GetProperties() 
               .Any(prop => prop.PropertyType.Name.StartsWith("MessageData`1"))) 
              .ForEach(
               BlobType => 
                typeof(MessageDataConfiguratorExtensions) 
                 .GetMethods() 
                 .First(
                  method => 
                   method.GetParameters().First().ParameterType == 
                   typeof(IConsumePipeConfigurator) 
                   && 
                   method.GetParameters().Last().ParameterType == 
                   typeof(IMessageDataRepository)) 
                 .MakeGenericMethod(BlobType) 
                 .Invoke(null, 
                  new object[] 
                   {sbConfig, c.Resolve<IMessageDataRepository>()})); 
            }); 
          }); 

         cfg.UseServiceBusMessageScheduler(); 
         //azSbHost. 
        }); 

       return bus; 
      })); 
     container.RegisterType<IBus, IBusControl>(); 
     container.RegisterType<IBus, IBusControl>(new ContainerControlledLifetimeManager()); 

그리고 시작 :

var container = UnityConfig.GetConfiguredContainer(); 
     var bus = container.Resolve<IBusControl>(); 
     bus.Start(); 

     var scheduler = container.Resolve<IScheduler>(); 
     scheduler.Start(); 

     bus.Publish<BufferedCommand>(new BufferedCommandAdapter<decimal>(10m, TimeSpan.FromSeconds(5), 
      TimeSpan.FromSeconds(5))); 

답변

0

당신은 석영의 작업 공장을 설정하고 있는가? QuartzIntegration 라이브러리 설치를 수행하는 방법에 대해 살펴 보자, 또한

https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.QuartzIntegration/QuartzIntegrationExtensions.cs

을 그 석영/일시 정지 시작되도록 버스 주위 관찰자를 사용/버스 인라인 중단했다.

https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.QuartzIntegration/Configuration/SchedulerBusObserver.cs

+0

크리스는 오랜 시간 후에는. RavenDb 사가 공급자가 고장 :(내가이 일을 얻을 수없는 것 같다 메모리에 잘 작동합니다.이 업데이트되지 않은 프로젝트를 보면 4 년 ... 그건 고무심이 아닙니다. – Alwyn

+0

아마도 MT3 용으로 설계된 최신 버전이 있다고 생각합니다. –

+0

이 프로젝트가 있습니다. https://github.com/alexeyzimarev/MassTransit.RavenDbIntegration 누젠트 패키지가 손상되어 다운로드 만하면됩니다. 원본과 참조를 직접적으로 처리 할 수 ​​있습니다. 지금까지 더 잘 작동하는 것으로 보입니다. 잠금은 여전히 ​​느리기 때문에 이벤트를 조인 할 때 병렬로 실행하는 경향이 있습니다. – Alwyn