2013-10-24 3 views
3

ZeroMQ에 문제가 있습니다. ZeroMQ에 익숙하지 않아서입니다.ZeroMQ 성능 문제

여러 클라이언트가 서버에 연결하여 쿼리를 보내는 매우 간단한 서비스를 작성하려고합니다. 서버가이 쿼리에 응답합니다.

REQ-REP 소켓 조합 (REQ를 사용하는 클라이언트, REP 소켓에 바인딩하는 서버)을 사용할 때 클라이언트와 서버가 동일한 시스템에있을 때 서버 측에서 초당 60,000 개의 메시지를 얻을 수 있습니다. . 시스템에 분산되어있을 때 다른 시스템에있는 클라이언트의 새로운 인스턴스는 서버에서 초당 메시지를 선형 적으로 증가시키고 클라이언트 인스턴스가 충분 해지면 40,000+에 쉽게 도달합니다. , 성능이 완전히 나사를하지만

REQ (client) <----> [server ROUTER -- DEALER --- REP (workers running on different threads)] 

: 이제 REP 소켓이 차단

는, 그래서 ZeroMQ 가이드를 따라하고 rrbroker 패턴 (http://zguide.zeromq.org/cs:rrbroker)을 사용했다. 여러 대의 컴퓨터에서 실행될 때 서버에서 초당 약 4000 개의 메시지가 발생합니다. 뿐만 아니라 다른 컴퓨터에서 시작된 각각의 새 클라이언트는 다른 모든 클라이언트의 처리량을 줄입니다.

나는 바보 같은 짓을하고 있다고 확신한다. ZeroMQ의 전문가가 명백한 실수를 지적 할 수 있는지 궁금합니다. 감사!

편집 : 조언에 따라 코드를 추가하십시오. clrzmq nuget 패키지 (https://www.nuget.org/packages/clrzmq-x64/)를 사용하고 있습니다.

다음은 클라이언트 코드입니다. 타이머는 1 초마다 수신되는 응답 수를 계산합니다.

for (int i = 0; i < numTasks; i++) { Task.Factory.StartNew(() => Client(), TaskCreationOptions.LongRunning); } 

void Client() 
    { 
     using (var ctx = new Context()) 
     { 
      Socket socket = ctx.Socket(SocketType.REQ); 
      socket.Connect("tcp://192.168.1.10:1234"); 
      while (true) 
      { 
       socket.Send("ping", Encoding.Unicode); 
       string res = socket.Recv(Encoding.Unicode); 
      } 
     } 
    } 

서버 - 경우 1가 : 서버가이 설정으로 두 번째

using (var zmqContext = new Context()) 
{ 
    Socket socket = zmqContext.Socket(SocketType.REP); 
    socket.Bind("tcp://*:1234"); 
    while (true) 
    { 
     string q = socket.Recv(Encoding.Unicode); 
     if (q.CompareTo("ping") == 0) { 
      socket.Send("pong", Encoding.Unicode); 
     } 
    } 
}  

당 수신 얼마나 많은 요청을 추적, 서버 측에서, 나는 초당받은 약 60,000 요청을 볼 수 있습니다 (클라이언트가 동일한 시스템에있을 때). 서로 다른 시스템에있을 때 각각의 새로운 클라이언트는 예상대로 서버에서 수신 된 요청 수를 증가시킵니다.

서버 케이스 2 : 이것은 본질적으로 ZMQ 가이드의 rrbroker입니다.

void ReceiveMessages(Context zmqContext, string zmqConnectionString, int numWorkers) 
    { 
     List<PollItem> pollItemsList = new List<PollItem>(); 

     routerSocket = zmqContext.Socket(SocketType.ROUTER); 
     try 
     { 
      routerSocket.Bind(zmqConnectionString); 
      PollItem pollItem = routerSocket.CreatePollItem(IOMultiPlex.POLLIN); 
      pollItem.PollInHandler += RouterSocket_PollInHandler; 
      pollItemsList.Add(pollItem); 
     } 
     catch (ZMQ.Exception ze) 
     { 
      Console.WriteLine("{0}", ze.Message); 
      return; 
     } 

     dealerSocket = zmqContext.Socket(SocketType.DEALER); 
     try 
     { 
      dealerSocket.Bind("inproc://workers"); 
      PollItem pollItem = dealerSocket.CreatePollItem(IOMultiPlex.POLLIN); 
      pollItem.PollInHandler += DealerSocket_PollInHandler; 
      pollItemsList.Add(pollItem); 
     } 
     catch (ZMQ.Exception ze) 
     { 
      Console.WriteLine("{0}", ze.Message); 
      return; 
     } 

     // Start the worker pool; cant connect 
     // to inproc socket before binding. 
     workerPool.Start(numWorkers); 

     while (true) 
     { 
      zmqContext.Poll(pollItemsList.ToArray()); 
     } 
    } 

    void RouterSocket_PollInHandler(Socket socket, IOMultiPlex revents) 
    { 
     RelayMessage(routerSocket, dealerSocket); 
    } 

    void DealerSocket_PollInHandler(Socket socket, IOMultiPlex revents) 
    { 
     RelayMessage(dealerSocket, routerSocket); 
    } 

    void RelayMessage(Socket source, Socket destination) 
    { 
     bool hasMore = true; 
     while (hasMore) 
     { 
      byte[] message = source.Recv(); 
      hasMore = source.RcvMore; 
      destination.Send(message, message.Length, hasMore ? SendRecvOpt.SNDMORE : SendRecvOpt.NONE); 
     } 
    }  

작업자 풀의 시작 방법은 여기서

public void Start(int numWorkerTasks=8) 
    { 
     for (int i = 0; i < numWorkerTasks; i++) 
     { 
      QueryWorker worker = new QueryWorker(this.zmqContext); 
      Task task = Task.Factory.StartNew(() => 
      worker.Start(), 
      TaskCreationOptions.LongRunning); 
     } 
     Console.WriteLine("Started {0} with {1} workers.", this.GetType().Name, numWorkerTasks); 
    } 

public class QueryWorker 
{ 
    Context zmqContext; 

    public QueryWorker(Context zmqContext) 
    { 
     this.zmqContext = zmqContext; 
    } 

    public void Start() 
    { 
     Socket socket = this.zmqContext.Socket(SocketType.REP); 
     try 
     { 
      socket.Connect("inproc://workers"); 
     } 
     catch (ZMQ.Exception ze) 
     { 
      Console.WriteLine("Could not create worker, error: {0}", ze.Message); 
      return; 
     } 

     while (true) 
     { 
      try 
      { 
       string message = socket.Recv(Encoding.Unicode); 
       if (message.CompareTo("ping") == 0) 
       { 
        socket.Send("pong", Encoding.Unicode); 
       } 
      } 
      catch (ZMQ.Exception ze) 
      { 
       Console.WriteLine("Could not receive message, error: " + ze.ToString()); 
      } 
     } 
    } 
} 

답변

1

일부 소스 코드를 게시 할 수 또는 테스트 케이스의 이상에 대한 자세한 설명은? 일반적으로 디자인을 구현하는 방법은 한 번에 하나씩 변경하고 각 변경 사항을 측정하는 것입니다. 알려진 작동중인 설계에서 더 복잡한 설계로 항상 단계적으로 이동할 수 있습니다.

+0

피터, 답변 해 주셔서 감사합니다. 코드를 추가했습니다. – Andy

1

아마도 '라우터'가 병목 현상입니다.

체크 아웃이 이러한 관련 질문 :

  1. Client maintenance in ZMQ ROUTER
  2. Load testing ZeroMQ (ZMQ_STREAM) for finding the maximum simultaneous users it can handle

라우터 (및 ZMQ_STREAM, 라우터의 단지 변종이다) 내부적으로 클라이언트 매핑을 유지해야한다, 따라서 IMO는 특정 클라이언트로부터의 제한된 연결을 수용 할 수 있습니다. ROUTER는 여러 클라이언트를 다중화 할 수있는 것처럼 보입니다. 단, 각 클라이언트의 활성 연결은 하나뿐입니다.

내가 잘못 생각할 수도 있습니다.하지만 ROUTER 또는 STREAM과 함께 다중 연결을 사용하는 다중 클라이언트에 비례하는 간단한 작업 코드가 있습니다.

ZeroMQ와의 동시 연결에 대한 제한이 있습니다.하지만 아무도 그 원인을 알지 못하는 것처럼 보입니다.

1

전 C#에서 다양한 방법의 기본 관리되지 않는 DLL 함수 호출에서 수행 성능 테스트를 수행 한 : 1. C++/CLI 2 PInvoke를 래퍼를 3. ZeroMQ/clrzmq

마지막은 흥미 수 있습니다 당신.

성능 테스트가 끝나면 ZMQ 바인딩 clrzmq를 사용하는 것이 유용하지 않으며 바인딩 소스 코드 내에서 PInvoke 호출을 최적화하려고 시도한 후에 100 성능 오버 헤드가 발생한다는 것을 알게되었습니다. 따라서 바인딩없이 PInvoke 호출을 사용하여 ZMQ를 사용했습니다.이 호출은 cdecl 규칙과 "SuppressUnmanagedCodeSecurity"옵션을 사용하여 속도를 최대화해야합니다. 매우 쉬운 5 개의 함수를 가져와야했습니다. 결국 속도는 PInvoke 호출보다 약간 느리지 만 내 경우에는 "inproc"이상으로 ZMQ가 있습니다.

속도가 재미있는 경우 바인딩없이 시도해 볼 수있는 힌트를 제공 할 수 있습니다.

이 질문에 대한 직접적인 대답은 아니지만 일반적으로 성능을 향상시키는 데 도움이 될 수 있습니다.