2017-03-31 4 views
0

storm-kafka-client 1.0.3과 함께 1.0.1 및 Kafka 0.10.0.0을 사용하고 있습니다.KafkaSpout 튜플 재생에서 null 포인터 예외가 발생했습니다.

내가 가지고있는 코드 설정을 찾으십시오.

kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 
      kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 


      KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreamsNamedTopics.Builder(new Fields(fieldNames), topics) 
        .build(); 

      KafkaSpoutRetryService retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), 
           TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); 


      KafkaSpoutTuplesBuilder tuplesBuilder = new KafkaSpoutTuplesBuilderNamedTopics.Builder(new TestTupleBuilder(topics)) 
         .build(); 

      KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService) 
                   .setOffsetCommitPeriodMs(10_000) 
                   .setFirstPollOffsetStrategy(LATEST) 
                   .setMaxRetries(5) 
                   .setMaxUncommittedOffsets(250) 
                   .build(); 

튜플에 실패하면 재생되지 않습니다. 스파우트가 아래 오류를 발생시킵니다. nullpointer 예외가 발생하는 이유를 알려주십시오.

53501 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.util - Async loop died! 
java.lang.NullPointerException 
    at org.apache.storm.kafka.spout.KafkaSpout.doSeekRetriableTopicPartitions(KafkaSpout.java:260) ~[storm-kafka-client-1.0.3.jar:1.0.3] 
    at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:248) ~[storm-kafka-client-1.0.3.jar:1.0.3] 
    at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:203) ~[storm-kafka-client-1.0.3.jar:1.0.3] 
    at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1] 
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1] 
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?] 
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102] 
53501 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.d.executor - 
java.lang.NullPointerException 
    at org.apache.storm.kafka.spout.KafkaSpout.doSeekRetriableTopicPartitions(KafkaSpout.java:260) ~[storm-kafka-client-1.0.3.jar:1.0.3] 
    at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:248) ~[storm-kafka-client-1.0.3.jar:1.0.3] 
    at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:203) ~[storm-kafka-client-1.0.3.jar:1.0.3] 
    at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1] 
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1] 
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?] 
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102] 
53527 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.util - Halting process: ("Worker died") 
java.lang.RuntimeException: ("Worker died") 
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.1.jar:1.0.1] 
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.8.0.jar:?] 
    at org.apache.storm.daemon.worker$fn__8554$fn__8555.invoke(worker.clj:761) [storm-core-1.0.1.jar:1.0.1] 
    at org.apache.storm.daemon.executor$mk_executor_data$fn__7773$fn__7774.invoke(executor.clj:271) [storm-core-1.0.1.jar:1.0.1] 
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494) [storm-core-1.0.1.jar:1.0.1] 
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?] 
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102] 

{key.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer, value.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer, 그룹 아래의 완전한 분출 CONFIGS을 찾아주세요. id = test-group, ssl.keystore.location = C : /test.jks, bootstrap.servers = localhost : 1000, auto.commit.interval.ms = 1000, security.protocol = SSL, enable.auto.commit = true , ssl.truststore.location = C : /test1.jks, ssl.keystore.password = pass123, ssl.key.password = pass123, ssl.truststore.password = pass123, session.timeout.ms = 30000, auto.offset. 재설정 = 최신}

답변

0

스톰 1.0.1은 베타 품질의 storm-kafka-client로 구성됩니다. 우리는 몇 가지 문제를 수정했으며 Storm 1.1 릴리스에서보다 안정적인 버전을 사용할 수 있으며 Kafka 0.10 이후 버전에도 사용할 수 있습니다. 토폴로지에서 storm-kafka-client 버전 1.1 및 kafka-clients 종속성을 적절한 버전으로 종속시킬 수 있습니다. 스톰 클러스터 자체를 업그레이드 할 필요가 없습니다.

+0

감사합니다. @Sriharsha Chintalapani https://github.com/apache/storm/pull/1924도 수정해야합니다. Storm 1.0.1에서 사용해야하는 storm-kafka-client의 버전을 알려주십시오. –

0

내가 enable.auto.commit = true 값을 false로 설정하면 문제가 해결되었습니다.

관련 문제