전송 서비스로 Azure Service Bus를 사용하지만 IConsumer 내부에서 전화를 걸 때와 달리 스케줄 된 메시지는 작동하지 않습니다.MassTransitStateMachine 일정이 잘못 되었습니까?
나는 몇 시간과 며칠을 보냈지 만 아직도 무슨 일이 일어나고 있는지 거의 알지 못합니다.
하늘색 서비스 버스를 사용하는 상태 시스템에서 일정을 잡으려면 내가해야 할 일을 설명 할 수 있습니까? 그리고 IConsumer 컨텍스트에서 메시지가 작동하는 이유는 무엇입니까?
public class BatchCollector : MassTransitStateMachine<BufferSaga>
{
public BatchCollector(IBatchFactory batchFactory)
{
InstanceState(saga => saga.State);
Event(() => BufferedCommandDetected,
_ => _.CorrelateById(context => context.Message.GetBatchId()));
Schedule(() => WindowElapsed, x => x.BatchCompletionId, x =>
{
x.Delay = TimeSpan.FromSeconds(5);
x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});
Initially(
When(BufferedCommandDetected)
.Then(
context =>
{
context.Instance.CorrelationId = context.Data.GetBatchId();
context.Instance.Id = Guid.NewGuid().ToString("N");
context.Instance.Buffer.Add(context.Data);
context.Instance.BatchStartTime = DateTimeOffset.Now;
context.Instance.AbsoluteDeadLine = DateTimeOffset.Now + context.Data.AbsoluteWindowSpan;
context.Instance.SlidingDeadLine = DateTimeOffset.Now + context.Data.SlidingWindowSpan;
})
.Schedule(WindowElapsed,
context => new WindowElapsed {CorrelationId = context.Instance.CorrelationId },
delayProvider: scheduleDelayProvider => scheduleDelayProvider.Data.SlidingWindowSpan < scheduleDelayProvider.Data.AbsoluteWindowSpan ? scheduleDelayProvider.Data.SlidingWindowSpan : scheduleDelayProvider.Data.AbsoluteWindowSpan)
.TransitionTo(Waiting));
During(Waiting,
When(BufferedCommandDetected)
.Then(context =>
{
context.Instance.SlidingDeadLine += context.Data.SlidingWindowSpan;
context.Instance.Buffer.Add(context.Data);
}),
When(WindowElapsed.Received, context => context.Instance.SlidingDeadLine > DateTimeOffset.Now && context.Instance.AbsoluteDeadLine > DateTimeOffset.Now)
.Schedule(WindowElapsed, context => new WindowElapsed { CorrelationId = context.Instance.CorrelationId }),
When(WindowElapsed.Received, context => context.Instance.SlidingDeadLine <= DateTimeOffset.Now || context.Instance.AbsoluteDeadLine <= DateTimeOffset.Now)
//.Unschedule(WindowElapsed)
.Publish(context => new Batch()
{
BatchId = context.Instance.BatchCompletionId ?? Guid.NewGuid(),
Content = context.Instance.Buffer,
StartTime = context.Instance.BatchStartTime,
EndTime = DateTimeOffset.Now
})
.Finalize()
.TransitionTo(BufferCompleted));
SetCompletedWhenFinalized();
}
public Event<BufferedCommand> BufferedCommandDetected { get; private set; }
public Schedule<BufferSaga, WindowElapsed> WindowElapsed { get; private set; }
public State Waiting { get; private set; }
public State BufferCompleted { get; private set; }
}
는 버스 초기화는 : 다음
container.RegisterType<IBusControl>(
new HierarchicalLifetimeManager(),
new InjectionFactory(c =>
{
var bus = Bus.Factory.CreateUsingAzureServiceBus(
cfg =>
{
var azSbHost = cfg.Host(new Uri(CloudConfigurationManager.GetSetting("ServiceBus.Url"))
, host =>
{
host.TokenProvider = TokenProvider
.CreateSharedAccessSignatureTokenProvider
(CloudConfigurationManager.GetSetting("ServiceBus.SharedAccessKeyName"),
CloudConfigurationManager.GetSetting("ServiceBus.AccessKey"),
TokenScope.Namespace);
});
cfg.ReceiveEndpoint(
azSbHost,
"Quartz.Scheduler",
sbConfig =>
{
cfg.UseMessageScheduler(sbConfig.InputAddress);
sbConfig.Consumer(() => new ScheduleMessageConsumer(c.Resolve<IScheduler>()));
}
);
cfg.ReceiveEndpoint(
azSbHost,
Assembly.GetExecutingAssembly().GetName().Name,
sbConfig =>
{
AllClasses.FromAssembliesInBasePath()
.Where(
@class =>
(@class?.Namespace?.StartsWith("bcn",
StringComparison.OrdinalIgnoreCase) ?? false)
&&
@class.GetParentClasses()
.Any(
parent =>
parent.Name.StartsWith("MassTransitStateMachine`1")))
.ForEach(@class =>
{
//dynamic cast to avoid having to deal with generic typing when type is not known until runtime.
dynamic stateMachineExtension =
new DynamicStaticWrapper(typeof(StateMachineSubscriptionExtensions));
stateMachineExtension
.StateMachineSaga(
sbConfig,
c.Resolve(@class),
c.Resolve(typeof(ISagaRepository<>).MakeGenericType(
@class.GetParentClasses().First(parent =>
parent.Name.StartsWith("MassTransitStateMachine`1"))
.GetGenericArguments().First())));
});
AllClasses.FromAssembliesInBasePath()
.Where(
@class =>
(@class?.Namespace?.StartsWith("bcn", StringComparison.OrdinalIgnoreCase) ??
false)
&& @class.GetInterfaces().Any(
@interface =>
@interface?.FullName?.StartsWith("MassTransit.IConsumer`1") ??
false))
.ForEach(@class =>
{
var factoryType = typeof(UnityConsumerFactory<>).MakeGenericType(@class);
//Automatically register consumers.
dynamic consumerFactory = Activator.CreateInstance(
factoryType,
container);
var consumingMethod = typeof(ConsumerExtensions).
GetMethods()
.First(
m =>
m.Name == "Consumer" && m.IsGenericMethod &&
m.GetGenericArguments().Length == 1 &&
m.GetParameters().Length == 3)
.MakeGenericMethod(@class)
.Invoke(null, new object[] {sbConfig, consumerFactory, null});
//Automatically detect which payload contains message data. This message data is stored in blob.
@class.GetInterfaces().Where(
@interface =>
@interface.FullName.StartsWith("MassTransit.IConsumer`1"))
.Select(@interface => @interface.GetGenericArguments().First())
.Where(payload => payload.GetProperties()
.Any(prop => prop.PropertyType.Name.StartsWith("MessageData`1")))
.ForEach(
BlobType =>
typeof(MessageDataConfiguratorExtensions)
.GetMethods()
.First(
method =>
method.GetParameters().First().ParameterType ==
typeof(IConsumePipeConfigurator)
&&
method.GetParameters().Last().ParameterType ==
typeof(IMessageDataRepository))
.MakeGenericMethod(BlobType)
.Invoke(null,
new object[]
{sbConfig, c.Resolve<IMessageDataRepository>()}));
});
});
cfg.UseServiceBusMessageScheduler();
//azSbHost.
});
return bus;
}));
container.RegisterType<IBus, IBusControl>();
container.RegisterType<IBus, IBusControl>(new ContainerControlledLifetimeManager());
그리고 시작 :
는var container = UnityConfig.GetConfiguredContainer();
var bus = container.Resolve<IBusControl>();
bus.Start();
var scheduler = container.Resolve<IScheduler>();
scheduler.Start();
bus.Publish<BufferedCommand>(new BufferedCommandAdapter<decimal>(10m, TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(5)));
크리스는 오랜 시간 후에는. RavenDb 사가 공급자가 고장 :(내가이 일을 얻을 수없는 것 같다 메모리에 잘 작동합니다.이 업데이트되지 않은 프로젝트를 보면 4 년 ... 그건 고무심이 아닙니다. – Alwyn
아마도 MT3 용으로 설계된 최신 버전이 있다고 생각합니다. –
이 프로젝트가 있습니다. https://github.com/alexeyzimarev/MassTransit.RavenDbIntegration 누젠트 패키지가 손상되어 다운로드 만하면됩니다. 원본과 참조를 직접적으로 처리 할 수 있습니다. 지금까지 더 잘 작동하는 것으로 보입니다. 잠금은 여전히 느리기 때문에 이벤트를 조인 할 때 병렬로 실행하는 경향이 있습니다. – Alwyn