2014-11-03 1 views
1

저의 삶에 대해 RabbitMQ와 함께 RPC를 임시 응답 큐와 함께 사용할 수 없었습니다. 다음은 this test에서 파생 된 간단한 예제입니다. 출력 창에 많은 예외가 있으며 dlq가 가득 차 있지만 메시지는 결코 인식되지 않습니다.ServiceStack 분산 서비스에서 RabbitMQ RPC를 사용할 수 없습니다.

namespace ConsoleApplication4 
{ 
    class Program 
    { 
     public static IMessageService CreateMqServer(int retryCount = 1) 
     { 
      return new RabbitMqServer { RetryCount = retryCount }; 
     } 

     static void Main(string[] args) 
     { 

      using (var mqServer = CreateMqServer()) 
      { 
       mqServer.RegisterHandler<HelloIntro>(m => 
        new HelloIntroResponse { Result = "Hello, {0}!".Fmt(m.GetBody().Name) }); 
       mqServer.Start(); 
      } 

      Console.WriteLine("ConsoleAppplication4"); 
      Console.ReadKey(); 
     } 
    } 
} 



namespace ConsoleApplication5 
{ 
    class Program 
    { 
     public static IMessageService CreateMqServer(int retryCount = 1) 
     { 
      return new RabbitMqServer { RetryCount = retryCount }; 
     } 

     static void Main(string[] args) 
     { 
      using (var mqServer = CreateMqServer()) 
      { 
       using (var mqClient = mqServer.CreateMessageQueueClient()) 
       { 
        var replyToMq = mqClient.GetTempQueueName(); 
        mqClient.Publish(new Message<HelloIntro>(new HelloIntro { Name = "World" }) 
        { 
         ReplyTo = replyToMq 
        }); 

        IMessage<HelloIntroResponse> responseMsg = mqClient.Get<HelloIntroResponse>(replyToMq); 
        mqClient.Ack(responseMsg); 

       } 
      } 

      Console.WriteLine("ConsoleAppplication5"); 
      Console.ReadKey(); 
     } 
    } 
} 

우선 예외

RabbitMQ.Client.Exceptions.OperationInterruptedException occurred 
    _HResult=-2146233088 
    _message=The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=405, text="RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'mq:tmp:10dd20804ee546d6bf5a3512f66143ec' in vhost '/'", classId=50, methodId=20, cause= 
    HResult=-2146233088 
    IsTransient=false 
    Message=The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=405, text="RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'mq:tmp:10dd20804ee546d6bf5a3512f66143ec' in vhost '/'", classId=50, methodId=20, cause= 
    Source=RabbitMQ.Client 
    StackTrace: 
     at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply() 
     at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body) 
     at RabbitMQ.Client.Framing.Impl.v0_9_1.Model._Private_QueueBind(String queue, String exchange, String routingKey, Boolean nowait, IDictionary`2 arguments) 
     at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey, IDictionary`2 arguments) 
     at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey) 
     at ServiceStack.RabbitMq.RabbitMqExtensions.RegisterQueue(IModel channel, String queueName) 
     at ServiceStack.RabbitMq.RabbitMqExtensions.RegisterQueueByName(IModel channel, String queueName) 
     at ServiceStack.RabbitMq.RabbitMqProducer.PublishMessage(String exchange, String routingKey, IBasicProperties basicProperties, Byte[] body) 
    InnerException: 

그럼 시간과 중단의 횟수를 반복 이와

System.Threading.ThreadInterruptedException occurred 
    _HResult=-2146233063 
    _message=Thread was interrupted from a waiting state. 
    HResult=-2146233063 
    IsTransient=true 
    Message=Thread was interrupted from a waiting state. 
    Source=mscorlib 
    StackTrace: 
     at System.Threading.Monitor.ObjWait(Boolean exitContext, Int32 millisecondsTimeout, Object obj) 
     at System.Threading.Monitor.Wait(Object obj, Int32 millisecondsTimeout, Boolean exitContext) 
    InnerException: 

하였다. 이 특정 post은 ServerStack 및 RabbitMQ RPC를 사용하여 일종의 성공을 거둘 수 있다고 제안하는 것 같지만 코드를 변경하기 전에 내 코드가 작동하지 않는 이유를 알고 싶습니다. 클라이언트 호출 GetTempQueueName는(), 그것은 다른 연결 (즉 서버)에서 액세스 할 수없는 독점적 인 큐를 만들 때

는 스티븐

+1

독점 큐를 선언하면 다른 AMQP 채널에서 해당 큐에 액세스 할 수 없습니다. –

+0

이 문제는 해결되었으며 이제 모든 것이 완벽하게 작동합니다. –

답변

1

더 이상 수행되지 않는 독점적 인 대기열을 다시 선언하는 데 문제가 발생했습니다. in this commit.

두 개의 독립적 인 콘솔 응용 프로그램을 통해 통신하는 간단한 클라이언트/서버 예제를 보여주는 new RabbitMqTest project도 있습니다.

이 변경 내용은 v4.0.34 +에서 사용할 수 있으며 now on MyGet입니다.

ServiceStack.RabbitMq 패키지 RabbitMq.Client NuGet 의존성도 v3.4.0으로 업그레이드되었습니다.

2

을 주셔서 감사합니다.

public class MqClient : IDisposable 
    { 
     ConnectionFactory factory = new ConnectionFactory() 
     { 
      HostName = "192.168.97.201", 
      UserName = "guest", 
      Password = "guest", 
      //VirtualHost = "test", 
      Port = AmqpTcpEndpoint.UseDefaultPort, 
     }; 

     private IConnection connection; 
     private string exchangeName; 

     public MqClient(string defaultExchange) 
     { 
      this.exchangeName = defaultExchange; 
      this.connection = factory.CreateConnection(); 
     } 

     public TResponse RpcCall<TResponse>(IReturn<TResponse> reqDto, string exchange = null) 
     { 
      using (var channel = connection.CreateModel()) 
      { 
       string inq_queue_name = string.Format("mq:{0}.inq", reqDto.GetType().Name); 

       string responseQueueName = channel.QueueDeclare("",false,false,true,null).QueueName; 
       //string responseQueueName = channel.QueueDeclare().QueueName; 

       var props = channel.CreateBasicProperties(); 
       props.ReplyTo = responseQueueName; 

       var message = ServiceStack.Text.JsonSerializer.SerializeToString(reqDto); 

       channel.BasicPublish(exchange ?? this.exchangeName, inq_queue_name, props, UTF8Encoding.UTF8.GetBytes(message)); 

       var consumer = new QueueingBasicConsumer(channel); 
       channel.BasicConsume(responseQueueName, true, consumer); 


       var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
       //channel.BasicAck(ea.DeliveryTag, false); 

       string response = UTF8Encoding.UTF8.GetString(ea.Body); 
       string responseType = ea.BasicProperties.Type; 
       Console.WriteLine(" [x] New Message of Type '{1}' Received:{2}{0}", response, responseType, Environment.NewLine); 

       return ServiceStack.Text.JsonSerializer.DeserializeFromString<TResponse>(response); 

      } 
     } 

     ~MqClient() 
     { 
      this.Dispose(); 
     } 

     public void Dispose() 
     { 
      if (connection != null) 
      { 
       this.connection.Dispose(); 
       this.connection = null; 
      } 
     } 

    } 

그것은 그처럼 사용할 수 있습니다 :

using (var mqClient = new MqClient("mx.servicestack")) 
{ 
    var pingResponse = mqClient.RpcCall<PingResponse>(new Ping { }); 
} 

그러므로 나는 servicestack의 MQ 클라이언트를 사용하지 않으며 rabbitmq의 .NET 라이브러리에 따라 다릅니다 만 않는 내 자신의 간단한 MQ 클라이언트를 생성

중요 : 서비스 버전 4.0.32 이상을 사용해야합니다.

관련 문제