2016-12-28 1 views
1

Scala 2.10.6 및 Spark 1.6.2을 사용하여 카프카 항목의 메시지를 사용하고 싶습니다. 이 코드는 잘 컴파일Scala에서 Kafka 소비자를 컴파일 할 수 없습니다.

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

그러나 나는 auto.offset.reset을 정의하려면 여기에 문제가 발생 : 내가 kafkaParams를 추가 할 때

val topicMap = topic.split(",").map((_, kafkaNumThreads.toInt)).toMap 
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
          StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) 

것은, 그것은 더 이상 컴파일되지 않습니다 카프카를 위해 나는이 종속성을 사용하고 있습니다 :

val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> group, 
"zookeeper.connection.timeout.ms" -> "10000", 
"auto.offset.reset" -> "smallest") 

val data = KafkaUtils.createStream(ssc, kafkaParams, topicMap, 
           StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) 

오류 메시지 :

94: error: missing parameter type for expanded function ((x$3) => x$3._2) 
[ERROR]             StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) 

createStream의 매개 변수가 여러 가지로 시도되었지만 모두 실패합니다. 누군가 제발 도와 드릴까요?

답변

1

KafkaUtils.createStream에 유형 매개 변수를 추가하여 스트림의 기본 유형을 해결해야합니다. 예를 들어, 키와 값이 String :

val data: DStream[String] = 
    KafkaUtils 
    .createStream[String, String, StringDecoder, StringDecoder](
     ssc, 
     kafkaParams, 
     topicMap, 
     StorageLevel.MEMORY_AND_DISK_SER_2 
).map(_._2) 
+0

자입니다. 확인해 봅니다. 실제로 문제는 원격 Kafka 대기열에서 메시지를 사용하고 싶다는 것입니다. 터미널에서'curl' 명령과 합류 API를 사용하여 얻을 수 있습니다. 하지만 스칼라 코드를 실행하면 얻지 못합니다. 그래서, 제 가정은 오프셋을 지정해야한다는 것입니다. – Dinosaurius

+0

@Dinosaurius 귀하의 질문에 오류가 오프셋과 관련이 없습니다. 그것은 단순히 올바른 유형을 추론 할 수없는 컴파일러입니다. –

+0

그래, 알아. 오프셋을 설정해야하는 이유를 설명하고 싶었습니다. – Dinosaurius

관련 문제