2016-09-16 2 views
0

지난 2 일 동안 저는 토폴로지 내에서 KafkaSpout을 구현하려고했습니다. 여기 은 중요한 정보입니다.Storm - KafkaSpout이 open()에서 실패했습니다.

세 가지 서비스가 모두 동일한 인스턴스에서 실행됩니다. Kafka의 브로커는 기본적으로 9092 포트를 사용하고 advertised.listenersPLAINTEXT://localhost:9092으로 설정합니다. Zookeeper는 기본값 클라이언트 포트 2181을 사용합니다. 반면에 Storm Nimbus 호스트 이름은 localhost로 설정되었습니다.

zkCli 사육사 스크립트를 사용하여 나는/브로커 경로를 사용하는 경우, 파티션 및 기타 관련 정보가 제대로 저장되어있는 것을 본 적이 반면 카프카 프로듀서가 성공적으로 로그 메시지를 생성하는 사용자 정의.

그러나 활성화 할 때 오류가 계속 발생하고 이후 토폴로지를 모니터링합니다. 반면

Exception in thread "main" java.lang.IllegalArgumentException: stream: default not found 
at org.apache.storm.utils.Monitor.metrics(Monitor.java:223) 
at org.apache.storm.utils.Monitor.metrics(Monitor.java:159) 
at org.apache.storm.command.monitor$_main.doInvoke(monitor.clj:36) 
at clojure.lang.RestFn.applyTo(RestFn.java:137) 
at org.apache.storm.command.monitor.main(Unknown Source) 

를 로그를 검사하여 :

BrokerHosts hosts = new ZkHosts("127.0.0.1:2181"); 

SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "/kafkastorm/", "bytes" + UUID.randomUUID().toString()); 
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
spoutConfig.zkServers = Arrays.asList("127.0.0.1"); 
spoutConfig.zkPort = 2181; 

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 

TopologyBuilder builder = new TopologyBuilder(); 
builder.setSpout("bytes", kafkaSpout); 
builder.setBolt("byteSize", new KafkaByteProcessingBolt()).shuffleGrouping("bytes"); 

StormTopology topology = builder.createTopology(); 

Config config = new Config(); 

StormSubmitter.submitTopology("topology", config, topology); 

그러나, 나는이 bin/storm monitor <topology_name> -m bytes을 실행할 때 점점 계속 오류 메시지는 다음과 같다 : 여기 내가 구현 한 폭풍 토폴로지의 소스 코드 worker (log.log 파일)의 경우, open() 메소드에서 KafkaSpout이 실패했다고 결론을 냈습니다.

java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy 
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75) ~[storm-kafka-1.0.2.jar:1.0.2] 
at org.apache.storm.daemon.executor$fn__7990$fn__8005.invoke(executor.clj:604) ~[storm-core-1.0.2.jar:1.0.2] 
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482) [storm-core-1.0.2.jar:1.0.2] 
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] 
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101] 
Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy 
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_101] 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_101] 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[?:1.8.0_101] 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_101] 
... 5 more 

누군가가 KafkaSpout가 open() 메서드에 실패하는 이유가 무엇인지 설명 할 수 있을까요?

정말 도움을 주셔서 감사합니다.

+0

사용중인 Kafka 및 Storm의 버전은 무엇입니까? 또한 storm-kafka의 버전을보십시오. HDP 클러스터를 사용하고 있습니까? –

답변

관련 문제