2016-06-20 4 views
0

트라이던트에서 토폴로지를 동적으로 생성하는 방법이 있습니까? 아무도 예제를 제공해 줄 수 있습니까?Apache storm Trident - 토폴로지를 동적으로 생성

+0

당신은 당신이 당신이 그 파일을 읽을 수있는 토폴로지를 배포 일부 속성 파일 (JSON) 때의 토폴로지의 설정을 저장할 수 있습니다. 하지만 한번 배포하면 동적으로 변경할 수 없습니다. –

답변

0

우선 토폴로지를 만드는 것이 Trident의 일부가 아니라는 것을 알고있을 수도 있습니다. Trident는 마이크로 배칭을위한 API 일뿐입니다.

새로운 토폴로지를 만드는 것은 정의에 따라 동적입니다. 이것은 TopologyBuilder 클래스가 수행하는 것입니다.

질문에 답하기 위해, 그렇습니다. Trident 또는 간단한 Storm 스파우트와 볼트에서 새로운 토폴로지를 만들 수 있습니다. 당신이 필요로하는 유일한 것은 당신의 토폴로지 생성 논리가 Storm에서 논리를 실행하면 다시 정의에 의해 충족되는 Storm 클러스터 (클래스 및 기타 자원)에 액세스 할 수 있어야한다는 것입니다.

마지막으로 필요한 것은 새로 생성 된 토폴로지를 제출하는 방법을 찾는 것입니다.이 것은 StormSubmitter 클래스가 만들어진 것으로 다시 클래스 프롬프트에 만족하는 정의로 작성되었습니다 (깜짝 :)) 트라이던트 또는 일반 스파우트/볼트 내부에서 논리를 실행할 때

호기심에서 벗어나서 왜이 작업을 수행 할 계획입니까? 요구 사항은 무엇입니까?

예 :

import java.util.Map; 

import org.apache.storm.Config; 
import org.apache.storm.StormSubmitter; 
import org.apache.storm.generated.StormTopology; 
import org.apache.storm.spout.SpoutOutputCollector; 
import org.apache.storm.task.TopologyContext; 
import org.apache.storm.topology.OutputFieldsDeclarer; 
import org.apache.storm.topology.TopologyBuilder; 
import org.apache.storm.topology.base.BaseRichSpout; 
import org.apache.storm.trident.operation.TridentCollector; 
import org.apache.storm.trident.spout.IBatchSpout; 
import org.apache.storm.tuple.Fields; 

public class DynamicTopologySpout implements IBatchSpout { 

    private static final long serialVersionUID = -3269435263455830842L; 

    @Override 
    @SuppressWarnings("rawtypes") 
    public void open(Map conf, TopologyContext context) {} 

    @Override 
    public void emitBatch(long batchId, TridentCollector collector) { 
     if (newTopologyNeeded()) { 
      TopologyBuilder builder = new TopologyBuilder(); 
      builder 
      .setSpout("spout", new BaseRichSpout() { 
       private static final long serialVersionUID = 1L; 
       @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {} 
       @Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {} 
       @Override public void nextTuple() {} 
      }, 1) 
      .setMaxSpoutPending(15) 
      .setNumTasks(1); 
      StormTopology topology = builder.createTopology(); 
      Config config = new Config(); 
      try { 
       StormSubmitter.submitTopology("dynamic-topology", config, topology); 
      } catch (Exception e) { 
       e.printStackTrace(); 
       collector.reportError(e); 
      } 
     } 
    } 

    private boolean newTopologyNeeded() { 
     // Check if topology needed ... 
     return false; 
    } 

    @Override 
    public void ack(long batchId) {} 

    @Override 
    public void close() {} 

    @Override 
    public Map<String, Object> getComponentConfiguration() { return null; } 

    @Override 
    public Fields getOutputFields() { return null; } 

} 
관련 문제