트라이던트에서 토폴로지를 동적으로 생성하는 방법이 있습니까? 아무도 예제를 제공해 줄 수 있습니까?Apache storm Trident - 토폴로지를 동적으로 생성
0
A
답변
0
우선 토폴로지를 만드는 것이 Trident의 일부가 아니라는 것을 알고있을 수도 있습니다. Trident는 마이크로 배칭을위한 API 일뿐입니다.
새로운 토폴로지를 만드는 것은 정의에 따라 동적입니다. 이것은 TopologyBuilder
클래스가 수행하는 것입니다.
질문에 답하기 위해, 그렇습니다. Trident 또는 간단한 Storm 스파우트와 볼트에서 새로운 토폴로지를 만들 수 있습니다. 당신이 필요로하는 유일한 것은 당신의 토폴로지 생성 논리가 Storm에서 논리를 실행하면 다시 정의에 의해 충족되는 Storm 클러스터 (클래스 및 기타 자원)에 액세스 할 수 있어야한다는 것입니다.
마지막으로 필요한 것은 새로 생성 된 토폴로지를 제출하는 방법을 찾는 것입니다.이 것은 StormSubmitter
클래스가 만들어진 것으로 다시 클래스 프롬프트에 만족하는 정의로 작성되었습니다 (깜짝 :)) 트라이던트 또는 일반 스파우트/볼트 내부에서 논리를 실행할 때
호기심에서 벗어나서 왜이 작업을 수행 할 계획입니까? 요구 사항은 무엇입니까?
예 :
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
public class DynamicTopologySpout implements IBatchSpout {
private static final long serialVersionUID = -3269435263455830842L;
@Override
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context) {}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
if (newTopologyNeeded()) {
TopologyBuilder builder = new TopologyBuilder();
builder
.setSpout("spout", new BaseRichSpout() {
private static final long serialVersionUID = 1L;
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
@Override public void nextTuple() {}
}, 1)
.setMaxSpoutPending(15)
.setNumTasks(1);
StormTopology topology = builder.createTopology();
Config config = new Config();
try {
StormSubmitter.submitTopology("dynamic-topology", config, topology);
} catch (Exception e) {
e.printStackTrace();
collector.reportError(e);
}
}
}
private boolean newTopologyNeeded() {
// Check if topology needed ...
return false;
}
@Override
public void ack(long batchId) {}
@Override
public void close() {}
@Override
public Map<String, Object> getComponentConfiguration() { return null; }
@Override
public Fields getOutputFields() { return null; }
}
관련 문제
- 1. Redis에서 쓰는 Trident 또는 Storm 토폴로지
- 2. Apache Storm 오류가 로컬 클러스터에서 토폴로지를 전송할 수 없습니다.
- 3. Trident kafka 트랜잭션 오우 토
- 4. Apache Storm 작업자 프로세스가 종료됩니다.
- 5. Apache Storm emitDirect 문제
- 6. 자동 Apache STORM 배치
- 7. Apache Storm Flux의 외부 속성을 입력하십시오.
- 8. Apache Marathon에서 Hadoop/Storm 작업 실행
- 9. Apache Storm 아키텍처 - 이것이 올바른 방향입니까?
- 10. Apache Storm File 클러스터 모드의 FileNotFoundException
- 11. Trident Storm-Cassandra, 복수 기본 키가있는 테이블에 쓰기
- 12. storm-deploy 토폴로지 제출 로컬 클러스터에서 토폴로지를 실행 한 후
- 13. Apache Storm : 다른 볼트로 방출되는 가변 객체
- 14. Apache Storm bolt 가능성이 시간 초과 됨
- 15. 사육사, Apache Kafka, Apcahe Storm
- 16. Apache Storm 릴리스의 차이점은 무엇입니까?
- 17. KafkaSpouts를 Apache Storm 토폴로지에 동적으로 추가/제거 할 수 있습니까?
- 18. STL 파일에서 토폴로지를 효율적으로 생성
- 19. Twitter Storm v/s Apache Hadoop
- 20. apache-storm-2.x.x가 출시 되었습니까?
- 21. Apache Storm Hbase 버전 호환성, java.lang.NoSuchFieldError : HBASE_CLIENT_LIMIT
- 22. Apache Storm - 여러 개의 스파우트를 사용 하시겠습니까?
- 23. Apache Storm 다중 프로토콜 프로토콜 Acking
- 24. Apache Storm - 생산 모드가 오래된 코드를 실행합니다.
- 25. 다른 원격 클러스터에 토폴로지를 배포 하시겠습니까?
- 26. Apache activeMQ, 클라이언트 측에서 동적으로 대기열 생성
- 27. 폭풍 토폴로지를 실행할 때 InvalidClassException 로컬 클래스가 호환되지 않음
- 28. 시뮬레이션 된 네트워크 토폴로지를 Java로 동적으로 표시하는 방법
- 29. Storm - KafkaSpout이 open()에서 실패했습니다.
- 30. Apache Storm 스파우트가 서로 통신 할 수 있습니까?
당신은 당신이 당신이 그 파일을 읽을 수있는 토폴로지를 배포 일부 속성 파일 (JSON) 때의 토폴로지의 설정을 저장할 수 있습니다. 하지만 한번 배포하면 동적으로 변경할 수 없습니다. –