2012-07-17 2 views
0

Java 응용 프로그램에서 ZMQ를 사용하고 있습니다. 나는 그것이 불균일하게 행동하는 것을 발견했다. 즉, 내가 약 100 개의 메시지를 보낸다면, 한 소비자는 1 초가 걸린다 고 말하면서 소비자를 늘리면, 걸린 시간은 2,1.5,3이된다. 점진적인 증가 또는 감소가 없습니다. 이 문제를 어떻게 해결할 수 있습니까?ZMQ에서 고르지 않은 흐름

// Producer

import org.zeromq.ZMQ; import org.zeromq.ZMQ.Socket;}

// Broker 

import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; import org.zeromq.ZMQStreamer;

public class Broker {

/** 
* @param args 
*/ 
public static void main(String[] args) 
{ 
    Context context = ZMQ.context(1); 

    Socket frontEnd = context.socket(ZMQ.PULL); 
    frontEnd.bind("tcp://*:5555"); 

    Socket backEnd = context.socket(ZMQ.PUSH); 
    backEnd.bind("tcp://*:5560"); 

    ZMQStreamer zmqStreamer = new ZMQStreamer(context, frontEnd, backEnd); 
    zmqStreamer.run(); 
} 

아래에 내 코드를 찾습니다

public class Producer {

public void init() 
{ 
    ZMQ.Context context = ZMQ.context(1); 
    socket = context.socket(ZMQ.PUSH); 
    socket.connect("tcp://localhost:5555"); 
} 

public void initMessage(String message) 
{ 
    this.message = message; 
} 

public void sendMessage() 
{ 
    String sendMessage = System.nanoTime() +"#"+ message; 
    socket.send(sendMessage.getBytes(), 0); 
} 
/** 
* @param args 
*/ 
public static void main(String[] args) 
{ 
    Producer producer = new Producer(); 
    producer.init(); 
    byte[] message = new byte[Integer.parseInt(args[0])]; 
    //message = "Hello".getBytes(); 
    producer.initMessage(new String(message)); 
    for(int i=0;i<100;i++) 
    { 
     producer.sendMessage(); 
    } 
} 

private Socket socket = null; 
private String message; 
} 

//Consumer 

import org.zeromq.ZMQ; 
import org.zeromq.ZMQ.Socket; 

public class Consumer 
{ 

public void init() 
{ 
    ZMQ.Context context = ZMQ.context(1); 
    socket = context.socket(ZMQ.PULL); 
    socket.connect("tcp://localhost:5560"); 
} 

public void reciveMessage() 
{ 
    byte[] recived = socket.recv(0); 
    //System.out.println(recived.length); 
    long recivedTime = System.nanoTime(); 
    String message = new String(recived); 
    String[] splitMessage = message.split("#"); 
    long sendTime = Long.parseLong(splitMessage[0]); 
    System.out.println("Send Time " + sendTime + " RecivedTime " 
      + recivedTime + " Time taken " + (recivedTime - sendTime) 
      + " Message " + message); 
} 
/** 
* @param args 
*/ 
public static void main(String[] args) 
{ 
    Consumer consumer = new Consumer(); 
    consumer.init(); 
    for (int i=0;i<100;i++) 
    { 
     consumer.reciveMessage(); 
    } 
} 
private Socket socket = null; 
} 
+0

일부 코드보기 – Schildmeijer

+0

Schildmeijer : 코드를 첨부했습니다. 친절하게 그것을 통과하십시오. – Muzy

+0

이것이 건설적인 것은 아니지만 귀하의 질문 제목이 Pearl Jam을 생각 나게합니다! –

답변

1

안정적으로 시간 멀티 스레드 코드의 조각, 당신은 당신이 현재 할 수있는 (컬렉터/싱크에 시간을 시작 동기화 및 종료의 어떤 방법이있을 필요가 할거야하려면 프로그래밍되지 않았 음).

데이터 집합 분할의 정확한 ZMQ 가지 방법 중 하나로 다음과 같은 과정을 내용의 ZMQ 가이드에서 체크 아웃 this example :

을 ...

우리의 슈퍼 컴퓨팅 응용 프로그램은 매우 전형적인 평행 처리 모델 :

We have a ventilator that produces tasks that can be done in parallel. 
We have a set of workers that process tasks. 
We have a sink that collects results back from the worker processes. 

...

관련 문제