2013-11-21 2 views
3
  • 단일 노드에는 1 개의 스파우트와 1 개의 볼트가 있습니다. Spout은 RabbitMQ에서 데이터를 읽고 Cassandra에 데이터를 쓰는 유일한 볼트로 내 보냅니다.
  • 우리의 데이터 소스는 초당 10000 개의 메시지를 생성하며 폭풍은이를 처리하는 데 약 10 초가 걸리며 이는 우리에게는 너무 느립니다.
  • 토폴로지의 병렬 처리를 늘리려고했지만 아무런 차이가 없습니다.

1 개의 스파우트와 1 개의 볼트가있는 단일 노드 시스템에서 처리 할 수있는 이상적인 메시지는 무엇입니까? 폭풍 토폴로지의 처리 속도를 향상시킬 수있는 방법은 무엇입니까?폭풍 처리 데이터가 매우 느림

업데이트 : 이것은 샘플 코드이며, RabbitMQ와 cassandra에 대한 코드가 있지만 동일한 성능 문제가 있습니다.

// Topology Class 
public class SimpleTopology { 

public static void main(String[] args) throws InterruptedException { 
    System.out.println("hiiiiiiiiiii"); 
    TopologyBuilder topologyBuilder = new TopologyBuilder(); 
    topologyBuilder.setSpout("SimpleSpout", new SimpleSpout()); 
    topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 2).setNumTasks(4).shuffleGrouping("SimpleSpout"); 

    Config config = new Config(); 
    config.setDebug(true); 
    config.setNumWorkers(2); 

    LocalCluster localCluster = new LocalCluster(); 
    localCluster.submitTopology("SimpleTopology", config, topologyBuilder.createTopology()); 

    Thread.sleep(2000); 
} 

은}

// Simple Bolt 
public class SimpleBolt implements IRichBolt{ 

private OutputCollector outputCollector; 

public void prepare(Map map, TopologyContext tc, OutputCollector oc) { 
    this.outputCollector = oc; 
} 

public void execute(Tuple tuple) { 
    this.outputCollector.ack(tuple); 
} 

public void cleanup() { 
    // TODO 
} 

public void declareOutputFields(OutputFieldsDeclarer ofd) { 
    // TODO 
} 

public Map<String, Object> getComponentConfiguration() { 
    return null; 
} 

}

// Simple Spout 

public class SimpleSpout implements IRichSpout{ 

private SpoutOutputCollector spoutOutputCollector; 
private boolean completed = false; 
private static int i = 0; 

public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {  
    this.spoutOutputCollector = soc; 
} 

public void close() { 
    // Todo 
} 

public void activate() { 
    // Todo 
} 

public void deactivate() { 
    // Todo 
} 

public void nextTuple() { 
    if(!completed) 
    { 
     if(i < 100000) 
     { 
      String item = "Tag" + Integer.toString(i++); 
      System.out.println(item); 
      this.spoutOutputCollector.emit(new Values(item), item); 
     } 
     else 
     { 
      completed = true; 
     } 
    } 
    else 
    { 
     try { 
      Thread.sleep(2000); 
     } catch (InterruptedException ex) { 
      Logger.getLogger(SimpleSpout.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 
} 

public void ack(Object o) { 
    System.out.println("\n\n OK : " + o); 
} 

public void fail(Object o) { 
    System.out.println("\n\n Fail : " + o); 
} 

public void declareOutputFields(OutputFieldsDeclarer ofd) { 
    ofd.declare(new Fields("word")); 
} 

public Map<String, Object> getComponentConfiguration() { 
    return null; 
} 

}

업데이트 : 는이 같은 튜플을 그룹화 셔플에 한 번 이상 처리 될 가능성이 있습니까? 사용 된 구성 (스파우트 = 4 볼트 = 4)은 이제 볼트의 증가로 성능이 저하됩니다.

+0

, u는 몇 가지 코드를 게시 할 수 있습니까? RabbitMQ에서 정확히 얼마나 읽고 있습니까? – user2720864

+0

'우리의 데이터 소스는 초당 10000 개의 메시지를 생성합니다.''nextTuple' 메소드 내부에'if (i <100000)'문을 가지고 이것을 말하고 있습니까? – user2720864

+0

http : //blog.relateiq를보십시오.com/monitoring-storm/ – Chiron

답변

3

여기서 병목 현상이 무엇인지 알아야합니다 - RabbitMQ 또는 Cassandra. Storm UI를 열고 각 구성 요소의 대기 시간을 살펴보십시오.

증가하는 병렬 처리가 도움이되지 않으면 (일반적으로해야 함) RabbitMQ 또는 Cassandra에는 분명히 문제가 있으므로 집중해야합니다.

+0

예, RabbitMq가 느려졌습니다 – user3017482

0

우리는 RabbitMQ와 Storm을 성공적으로 사용하고 있습니다. 결과는 다른 DB에 저장되지만, 어쨌든. 우리는 처음에 Spout에서 basic_get을 사용했고 성능은 좋았지 만 basic_consume으로 전환했고 성능은 실제로 매우 좋습니다. 그래서 토끼에게서 메시지를 어떻게 소비하는지보십시오. 몇 가지 중요한 요인 : 대신 basic_get의

  • basic_consume
  • prefetch_count (충분히 높은 만들기)
  • 성능을 높이려면, 당신은 메시지를 잃어버린 걱정하지 않는 경우 - ACK하지 메시지를하고 delivery_mode를 1로 설정하십시오.
+0

질문이 있습니까?, 단일 노드에서 관찰 한 정상적인 튜플 통과율은 얼마입니까? – user3017482

+0

이것은 많은 것들이 영향을 미칠 것이라고 말한 것처럼 다릅니다. 이 시나리오에서는 CPU 집약적 인 볼트를 사용하여 많은 계산을 수행합니다. – Vor

2

코드에서 nextTuple()에 대한 호출 당 하나의 튜플 만 방출합니다. 호출 당 더 많은 튜플을 방출 해보십시오. 같은

무엇인가 : 당신이 시도 구성 무엇

public void nextTuple() { 

    int max = 1000; 
    int count = 0; 
    GetResponse response = channel.basicGet(queueName, autoAck); 
    while ((response != null) && (count < max)) { 

     // process message 

     spoutOutputCollector.emit(new Values(item), item); 

     count++; 
     response = channel.basicGet(queueName, autoAck); 
    } 

    try { Thread.sleep(2000); } catch (InterruptedException ex) { 
} 
+0

몇 가지 예를 들려주세요? 나는 시도했다. 그러나 그것을 이해할 수 없었다. – user3017482

+0

@ user3017482 내 대답을 일부 의사 코드로 업데이트했습니다. –