2017-10-22 1 views
1

Autofac 및 EasyNetQ 단위 : 내가 가입자Autofac EasyNetQ은 : 오늘 우리가 도전의 종류가 거의 모든 시간 작동이 용이하지만, 작업

class Program 
    { 
     static void Main(string[] args) 
     { 
      var container = AutofacContainerFactory.Build(); 

      using (var scope = container.BeginLifetimeScope()) 
      { 
       var bus = scope.Resolve<IBus>(); 

       bus.Subscribe<SomeMessage>("some.queue", container.Resolve<ISomeMessageHandler>().Handle); 

       Console.WriteLine("Listening for messages. Hit <return> to quit."); 
       Console.ReadLine(); 
      } 

     } 
    } 

에 대한 작업의 단위를 설정하려면이 상당히 간단합니다 구독자 콘솔 응용 프로그램에서하지만 제대로 설정하는 방법 UoW : 핸들러 Handle 메서드가 호출되고 실행 후 UoW가 호출되기 전에 일부 인터셉터가 호출 된 경우 좋을 것입니다. 내가 생각할 수있는 또 다른 해결책은 Handle 메서드 내에서 모든 것을 수행하는 것이다. 아마도 UnitOfWork 데코레이터 패턴을 사용하는 것이다. 나는 너의 생각을 기다리고있다.

답변

1

내가 모든 것을 할 수있는 AutofacMessageDispatcher 만든이 허용

public class AutofacMessageDispatcher : IAutoSubscriberMessageDispatcher 
{ 
    readonly ILifetimeScope _component; 
    readonly IMessageContextFactory _contextFactory; 
    public const string PerMessageLifeTimeScopeTag = "AutofacMessageScope"; 
    public const string GlobalPipeTag = "global"; 

    public AutofacMessageDispatcher(ILifetimeScope component, IMessageContextFactory contextFactory) 
    { 
     _component = component; 
     _contextFactory = contextFactory; 
    } 

    static IEnumerable<IErrorHandler> GetErrorHandlers<TConsumer>(TConsumer consumer, IComponentContext scope) 
    { 
     var errorHandlers = consumer.GetType() 
      .GetTypeInfo().GetAttributes<ErrorHandlerAttribute>() 
      .OrderBy(attribute => attribute.Order) 
      .Select(attribute => attribute.Initialize((IErrorHandler) scope.Resolve(attribute.ErrorHandlerType))) 
      .Union(scope.ResolveNamed<IEnumerable<IErrorHandler>>(GlobalPipeTag), a => a.GetType()); // perform the distinction in the union on GetType so we only get 1 handler of the same type 

     if (consumer is IErrorHandler consumerAsErrorHandler) 
      errorHandlers = errorHandlers.Concat(new[] { consumerAsErrorHandler }); 

     return errorHandlers; 
    } 

    static IEnumerable<IPipe> GetPipeLine<TConsumer>(TConsumer consumer, IComponentContext scope) 
    { 
     var pipeLine = consumer.GetType() 
      .GetTypeInfo().GetAttributes<PipeAttribute>() 
      .OrderBy(attribute => attribute.Order) 
      .Select(attribute => attribute.Initialize((IPipe) scope.Resolve(attribute.PipeType))) 
      .Union(scope.ResolveNamed<IEnumerable<IPipe>>(GlobalPipeTag), a => a.GetType()); // perform the distinction in the union on GetType so we only get 1 handler of the same type 

     return pipeLine; 
    } 

    [HandleProcessCorruptedStateExceptions] 
    public void Dispatch<TMessage, TConsumer>(TMessage message) 
     where TMessage : class 
     where TConsumer : IConsume<TMessage> 
    { 
     using (var scope = _component.BeginLifetimeScope(PerMessageLifeTimeScopeTag, _contextFactory.RegisterMessageContext(typeof(TConsumer), message))) 
     { 
      var consumer = scope.Resolve<TConsumer>(); 
      var pipeLine = GetPipeLine(consumer, scope).ToArray(); 
      pipeLine.Each(p => p.OnBeforeConsume(consumer, message)); 

      Exception exception = null; 
      try 
      { 
       consumer.Consume(message); 
      } 
      catch (Exception e) when (GetErrorHandlers(consumer, scope).Any(p => p.OnError(consumer, message, e))) 
      { 
       exception = e; 
      } 
      pipeLine.Reverse().Each(p => p.OnAfterConsume(consumer, message, exception)); 
     } 
    } 

    [HandleProcessCorruptedStateExceptions] 
    public async Task DispatchAsync<TMessage, TConsumer>(TMessage message) 
     where TMessage : class 
     where TConsumer : IConsumeAsync<TMessage> 
    { 
     using (var scope = _component.BeginLifetimeScope(PerMessageLifeTimeScopeTag, _contextFactory.RegisterMessageContext(typeof(TConsumer), message))) 
     { 
      var consumer = scope.Resolve<TConsumer>(); 
      var pipes = GetPipeLine(consumer, scope).ToArray(); 

      Exception exception = null; 

      foreach (var hook in pipes) 
       await hook.OnBeforeConsumeAsync(consumer, message); 
      try 
      { 
       await consumer.Consume(message); 
      } 
      catch (Exception e) when (GetErrorHandlers(consumer, scope).Any(p => p.OnErrorAsync(consumer, message, e))) 
      { 
       exception = e; 
      } 
      foreach (var hook in pipes.Reverse()) 
       await hook.OnAfterConsumeAsync(consumer, message, exception); 
     } 
    } 
}   

public interface IMessageContextFactory 
{ 
    Action<ContainerBuilder> RegisterMessageContext<TMessage>(Type consumerType, TMessage message) where TMessage : class; 
} 

[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)] 
public class ErrorHandlerAttribute : Attribute 
{ 
    public ErrorHandlerAttribute(Type errorHandlerType, int order = 0) 
    { 
     ErrorHandlerType = errorHandlerType; 
     Order = order; 
    } 

    public Type ErrorHandlerType { get; set; } 
    public int Order { get; set; } 

    public virtual IErrorHandler Initialize(IErrorHandler handler) 
    { 
     return handler; 
    } 
} 

public interface IErrorHandler 
{ 
    bool OnError<TMessage, TConsumer>(TConsumer consumer, TMessage message, Exception exception) 
     where TMessage : class 
     where TConsumer : IConsume<TMessage>; 

    bool OnErrorAsync<TMessage, TConsumer>(TConsumer consumer, TMessage message, Exception exception) 
     where TMessage : class 
     where TConsumer : IConsumeAsync<TMessage>; 
} 

[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)] 
public class PipeAttribute : Attribute 
{ 
    public PipeAttribute(Type pipeType, int order = 0) 
    { 
     PipeType = pipeType; 
     Order = order; 
    } 

    public Type PipeType { get; set; } 
    public int Order { get; set; } 

    public IPipe Initialize(IPipe pipe) 
    { 
     return pipe; 
    } 
} 

public interface IPipe 
{ 
    void OnBeforeConsume<TMessage, TConsumer>(TConsumer consumer, TMessage message) 
     where TMessage : class 
     where TConsumer : IConsume<TMessage>; 

    void OnAfterConsume<TMessage, TConsumer>(TConsumer consumer, TMessage message, [CanBeNull] Exception exception) 
     where TMessage : class 
     where TConsumer : IConsume<TMessage>; 

    Task OnBeforeConsumeAsync<TMessage, TConsumer>(TConsumer consumer, TMessage message) 
     where TMessage : class 
     where TConsumer : IConsumeAsync<TMessage>; 

    Task OnAfterConsumeAsync<TMessage, TConsumer>(TConsumer consumer, TMessage message, [CanBeNull] Exception exception) 
     where TMessage : class 
     where TConsumer : IConsumeAsync<TMessage>; 
} 

public interface IMessageContext 
{ 
     object Message { get; } 
} 
public class MessageContext : IMessageContext 
{ 
    public MessageContext(object message) 
    { 
     Message = message; 
    } 

    public object Message { get; set; } 
} 

public class MessageContextFactory : IMessageContextFactory 
{ 
    readonly ILogger _logger; 

    public MessageContextFactory() 
    { 
     _logger = logger; 
    } 

    public Action<ContainerBuilder> RegisterMessageContext<TMessage>(Type consumerType, TMessage message) where TMessage : class 
    { 
     return builder => 
     { 
      builder.RegisterInstance(new MessageContext(message)).As<IMessageContext>().AsSelf(); 
      var forContext = _logger.ForContext(message.GetType()); 
      builder.RegisterInstance(forContext).As<ILogger>().AsSelf(); 
     }; 
    } 
} 


public interface IMessageContextFactory 
{ 
    Action<ContainerBuilder> RegisterMessageContext<TMessage>(Type consumerType, TMessage message) where TMessage : class; 
} 

을 당신에게 :

  • 등록 사용자 정의하여 소비자
  • 을에 속성을 사용 '파이프'와 'errorhandlers'
  • '글로벌'이름으로 등록하면 파이프 및 오류 처리기를 컨테이너에 등록 할 수 있습니다.
  • 'IMessageContex 당신이 serilog를 사용하는 경우, 그것은 당신의 메시지를 포함 로깅 컨텍스트를 생성합니다
  • 어디 의존성에서 t가 '(그렇지 않으면 단지 ILogger입니다 참조 제거) 메시지에 액세스하려면

가 미안 방금 추가 코드를 빨리 작성하면 일부 종속성을 놓친 것일 수 있습니다. 당신이 무엇이든 놓친다면 내 EasyNetQ 확장을 유지하는 곳은 다음과 같습니다. https://github.com/zidad/net-tools/tree/master/src/Net.EasyNetQ

희망이 있습니다.

[Test] 
    public void UnitOfWork() 
    { 
     var builder = new ContainerBuilder(); 

     builder.RegisterType<MessageCommandHandler>().As<IHandler<Message>>().InstancePerLifetimeScope(); 
     builder.RegisterType<MessageCommandHandler2>().As<IHandler<Message>>().InstancePerLifetimeScope(); 

     builder.RegisterGeneric(typeof(UnitOfWorkDecorator<,>)).AsSelf().SingleInstance(); 

     var container = builder.Build(); 

     var handler = container.Resolve<IHandler<Message>>(); 

     var uow = container.Resolve<UnitOfWorkDecorator<Message, IHandler<Message>>>(); 
     uow.Handle(new Message()); 

    } 

+0

너의 코드는 좋아 보이지만 내 단순한 필요성 때문에 지나치게 복잡하다고 생각하거나 너무 일반적이라고 말해야한다. – Macko

+0

그것은 일반적으로, 나는 단지이 답변을 위해 작성하지 않았다. :) 그러나 나는 그것을 UoW 패턴과 간단한 무용담 구현에 사용했지만, 필요하지 않은 것은 제거 할 수있다. –

0
public class UnitOfWorkDecorator<TRequest, THandler> : IHandler<TRequest> 
     where TRequest : class 
     where THandler : IHandler<TRequest> 
    { 
     protected readonly Func<ILifetimeScope> ParentScope; 

     public UnitOfWorkDecorator(Func<ILifetimeScope> parentScope) 
     { 
      ParentScope = parentScope; 
     } 

     public void Handle(TRequest request) 
     { 
      Console.WriteLine("UoW handler start"); 

      using (var scope = ParentScope().BeginLifetimeScope()) 
      { 
       var scopedHandler = scope.Resolve<THandler>(); 
       scopedHandler.Handle(request); 
      } 

      Console.WriteLine("UoW handler end"); 
     } 
    } 

시험 때문에 주요 아이디어는 장식 핸들러를 해결하기위한 중첩 된 범위를 만드는 것입니다. 작업 단위 (UOW)를 관리하는 논리는 범위 내부에서 해석 될 수 있으므로 데코 레이팅 된 핸들러 및 uow는 동일한 EF DB 컨텍스트 인스턴스를 가질 수 있습니다.

관련 문제