2015-01-23 2 views
0

KafkaSpout을 사용하여 Kafka 대기열에서 메시지를 읽으려고합니다.StormSpout으로 데이터를 읽을 수 없습니다.

2 [Thread-10-kafka-storm-spout] ERROR util:0 - Async loop died! 
java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String; 
    at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:38) 
    at kafka.javaapi.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:34) 
    at storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:43) 
    at storm.kafka.PartitionManager.<init>(PartitionManager.java:57) 
    at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:80) 
    at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:52) 
    at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:118) 
    at backtype.storm.daemon.executor$fn__3284$fn__3299$fn__3328.invoke(executor.clj:563) 
    at backtype.storm.util$async_loop$fn__452.invoke(util.clj:431) 
    at clojure.lang.AFn.run(AFn.java:24) 
    at java.lang.Thread.run(Thread.java:744) 
11 [Thread-10-kafka-storm-spout] ERROR executor:0 - 
java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String; 
    at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:38) 
    at kafka.javaapi.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:34) 
    at storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:43) 
    at storm.kafka.PartitionManager.<init>(PartitionManager.java:57) 
    at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:80) 
    at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:52) 
    at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:118) 
    at backtype.storm.daemon.executor$fn__3284$fn__3299$fn__3328.invoke(executor.clj:563) 
    at backtype.storm.util$async_loop$fn__452.invoke(util.clj:431) 
    at clojure.lang.AFn.run(AFn.java:24) 
    at java.lang.Thread.run(Thread.java:744) 

여기 내 코드입니다 :

TopologyBuilder builder = new TopologyBuilder(); 
     String TOPIC_NAME = "topic"; 
     String spoutName = "kafka-storm-spout";    

     BrokerHosts brokerHosts = new ZkHosts("localhost:2181"); 
     SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, TOPIC_NAME, "", "storm"); 

     builder.setSpout(spoutName, new KafkaSpout(kafkaConfig), 1); 
     builder.setBolt("kafka-bolt", new TestBolt()).shuffleGrouping(spoutName); 

     Config config = new Config(); 
     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology("kafka-test", config, builder.createTopology()); 

     System.out.println("Topology submitted"); 
     Utils.sleep(5000); 
     System.out.println("Shutting down"); 
     cluster.shutdown(); 

어떤 아이디어 나도 전혀 아무것도 또는 다음과 같은 오류가없는거야?

답변

1

다른 버전의 스칼라를 사용하려고했을 가능성이 큽니다. 카프카는 다양한 버전의 스칼라 (https://kafka.apache.org/downloads.html) 용으로 제작되었습니다. 의존성을 조사하고 한 가지 버전의 스칼라 만 사용하고 있는지 확인하십시오.

관련 문제