1
Scala 2.10.6
및 Spark 1.6.2
을 사용하여 카프카 항목의 메시지를 사용하고 싶습니다. 이 코드는 잘 컴파일Scala에서 Kafka 소비자를 컴파일 할 수 없습니다.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
</dependency>
그러나 나는 auto.offset.reset
을 정의하려면 여기에 문제가 발생 : 내가 kafkaParams
를 추가 할 때
val topicMap = topic.split(",").map((_, kafkaNumThreads.toInt)).toMap
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap,
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
것은, 그것은 더 이상 컴파일되지 않습니다 카프카를 위해 나는이 종속성을 사용하고 있습니다 :
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> group,
"zookeeper.connection.timeout.ms" -> "10000",
"auto.offset.reset" -> "smallest")
val data = KafkaUtils.createStream(ssc, kafkaParams, topicMap,
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
오류 메시지 :
94: error: missing parameter type for expanded function ((x$3) => x$3._2)
[ERROR] StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
createStream
의 매개 변수가 여러 가지로 시도되었지만 모두 실패합니다. 누군가 제발 도와 드릴까요?
자입니다. 확인해 봅니다. 실제로 문제는 원격 Kafka 대기열에서 메시지를 사용하고 싶다는 것입니다. 터미널에서'curl' 명령과 합류 API를 사용하여 얻을 수 있습니다. 하지만 스칼라 코드를 실행하면 얻지 못합니다. 그래서, 제 가정은 오프셋을 지정해야한다는 것입니다. – Dinosaurius
@Dinosaurius 귀하의 질문에 오류가 오프셋과 관련이 없습니다. 그것은 단순히 올바른 유형을 추론 할 수없는 컴파일러입니다. –
그래, 알아. 오프셋을 설정해야하는 이유를 설명하고 싶었습니다. – Dinosaurius