ZeroMQ에 문제가 있습니다. ZeroMQ에 익숙하지 않아서입니다.ZeroMQ 성능 문제
여러 클라이언트가 서버에 연결하여 쿼리를 보내는 매우 간단한 서비스를 작성하려고합니다. 서버가이 쿼리에 응답합니다.
REQ-REP 소켓 조합 (REQ를 사용하는 클라이언트, REP 소켓에 바인딩하는 서버)을 사용할 때 클라이언트와 서버가 동일한 시스템에있을 때 서버 측에서 초당 60,000 개의 메시지를 얻을 수 있습니다. . 시스템에 분산되어있을 때 다른 시스템에있는 클라이언트의 새로운 인스턴스는 서버에서 초당 메시지를 선형 적으로 증가시키고 클라이언트 인스턴스가 충분 해지면 40,000+에 쉽게 도달합니다. , 성능이 완전히 나사를하지만
REQ (client) <----> [server ROUTER -- DEALER --- REP (workers running on different threads)]
: 이제 REP 소켓이 차단
는, 그래서 ZeroMQ 가이드를 따라하고 rrbroker 패턴 (http://zguide.zeromq.org/cs:rrbroker)을 사용했다. 여러 대의 컴퓨터에서 실행될 때 서버에서 초당 약 4000 개의 메시지가 발생합니다. 뿐만 아니라 다른 컴퓨터에서 시작된 각각의 새 클라이언트는 다른 모든 클라이언트의 처리량을 줄입니다.
나는 바보 같은 짓을하고 있다고 확신한다. ZeroMQ의 전문가가 명백한 실수를 지적 할 수 있는지 궁금합니다. 감사!
편집 : 조언에 따라 코드를 추가하십시오. clrzmq nuget 패키지 (https://www.nuget.org/packages/clrzmq-x64/)를 사용하고 있습니다.
다음은 클라이언트 코드입니다. 타이머는 1 초마다 수신되는 응답 수를 계산합니다.
for (int i = 0; i < numTasks; i++) { Task.Factory.StartNew(() => Client(), TaskCreationOptions.LongRunning); }
void Client()
{
using (var ctx = new Context())
{
Socket socket = ctx.Socket(SocketType.REQ);
socket.Connect("tcp://192.168.1.10:1234");
while (true)
{
socket.Send("ping", Encoding.Unicode);
string res = socket.Recv(Encoding.Unicode);
}
}
}
서버 - 경우 1가 : 서버가이 설정으로 두 번째
using (var zmqContext = new Context())
{
Socket socket = zmqContext.Socket(SocketType.REP);
socket.Bind("tcp://*:1234");
while (true)
{
string q = socket.Recv(Encoding.Unicode);
if (q.CompareTo("ping") == 0) {
socket.Send("pong", Encoding.Unicode);
}
}
}
당 수신 얼마나 많은 요청을 추적, 서버 측에서, 나는 초당받은 약 60,000 요청을 볼 수 있습니다 (클라이언트가 동일한 시스템에있을 때). 서로 다른 시스템에있을 때 각각의 새로운 클라이언트는 예상대로 서버에서 수신 된 요청 수를 증가시킵니다.
서버 케이스 2 : 이것은 본질적으로 ZMQ 가이드의 rrbroker입니다.
void ReceiveMessages(Context zmqContext, string zmqConnectionString, int numWorkers)
{
List<PollItem> pollItemsList = new List<PollItem>();
routerSocket = zmqContext.Socket(SocketType.ROUTER);
try
{
routerSocket.Bind(zmqConnectionString);
PollItem pollItem = routerSocket.CreatePollItem(IOMultiPlex.POLLIN);
pollItem.PollInHandler += RouterSocket_PollInHandler;
pollItemsList.Add(pollItem);
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("{0}", ze.Message);
return;
}
dealerSocket = zmqContext.Socket(SocketType.DEALER);
try
{
dealerSocket.Bind("inproc://workers");
PollItem pollItem = dealerSocket.CreatePollItem(IOMultiPlex.POLLIN);
pollItem.PollInHandler += DealerSocket_PollInHandler;
pollItemsList.Add(pollItem);
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("{0}", ze.Message);
return;
}
// Start the worker pool; cant connect
// to inproc socket before binding.
workerPool.Start(numWorkers);
while (true)
{
zmqContext.Poll(pollItemsList.ToArray());
}
}
void RouterSocket_PollInHandler(Socket socket, IOMultiPlex revents)
{
RelayMessage(routerSocket, dealerSocket);
}
void DealerSocket_PollInHandler(Socket socket, IOMultiPlex revents)
{
RelayMessage(dealerSocket, routerSocket);
}
void RelayMessage(Socket source, Socket destination)
{
bool hasMore = true;
while (hasMore)
{
byte[] message = source.Recv();
hasMore = source.RcvMore;
destination.Send(message, message.Length, hasMore ? SendRecvOpt.SNDMORE : SendRecvOpt.NONE);
}
}
작업자 풀의 시작 방법은 여기서
public void Start(int numWorkerTasks=8)
{
for (int i = 0; i < numWorkerTasks; i++)
{
QueryWorker worker = new QueryWorker(this.zmqContext);
Task task = Task.Factory.StartNew(() =>
worker.Start(),
TaskCreationOptions.LongRunning);
}
Console.WriteLine("Started {0} with {1} workers.", this.GetType().Name, numWorkerTasks);
}
public class QueryWorker
{
Context zmqContext;
public QueryWorker(Context zmqContext)
{
this.zmqContext = zmqContext;
}
public void Start()
{
Socket socket = this.zmqContext.Socket(SocketType.REP);
try
{
socket.Connect("inproc://workers");
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("Could not create worker, error: {0}", ze.Message);
return;
}
while (true)
{
try
{
string message = socket.Recv(Encoding.Unicode);
if (message.CompareTo("ping") == 0)
{
socket.Send("pong", Encoding.Unicode);
}
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("Could not receive message, error: " + ze.ToString());
}
}
}
}
피터, 답변 해 주셔서 감사합니다. 코드를 추가했습니다. – Andy