2016-06-06 3 views
1

저는 spark와 kafka에서 새롭고 kafka의 json 형식으로 데이터를 스트림으로 보내고 싶습니다. 내 문제는 내 KafkaUtils.createDirectStream의 매개 변수()를 정의하는 방법도 데이터의지도를 정의하고KafkaUtils.createDirectStream()의 내부에서 매개 변수를 정의하는 방법

val ssc = new StreamingContext(sparkConfig, Seconds(10)) 
case class dataMap (number: Int, address: String, product: String, store: String, seller : String) 
val messages = KafkaUtils.createDirectStream[ Int, String, String, String, String](ssc, kafkaParams, topics).map(m => m.as[dataMap]) 

나는 다음과 같은 오류 수신하고 위의 코드를 사용하고 있습니다 :

error: type arguments [Int,String,String,String,String] conform to the bounds of none of the overloaded alternatives of value createDirectStream 

을 추신 : 올바른 형식으로 kafkaParams 및 주제를 정의했습니다.

답변

1

난 당신이 뭔가 할 생각 : 그것은 당신의 오류를 제거 할 것을

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, dataMap](...) 

참고,하지만 당신은 할 더 많은 일을해야합니다. 그러면 JSON String을 얻을 수 있습니다. 그런 다음 case classString을 변환해야합니다. 솔직히 그건 별도의 질문이며 아파치 스파크와 관련이 없으며 심지어 카프카와도 관련이 없습니다. 이 문제에 대한 몇 가지 해결책을 여기에서 찾을 수 있습니다 : How to read json data using scala from kafka topic in apache spark

+0

그런 다음 Kafkaparams 구성을 변경해야합니까? 내 마지막 설정은 : val kafkaParams = Map [String, String] (...) – Farnaz

+1

나는 그렇게 생각하지 않을 것이다. JSON을 읽었다 고 했으므로 String을 읽는다고 가정합니다. 또한 Kafka 키와 메시지 값이 모두 String이라고 가정했습니다. 그래서 처음 네 개의 타입 값'String, String, StringDecoder, StringDecoder'를 다루고 있습니다. 마지막 타입 값은'case 클래스 '입니다. 이제 나는 그것에 대해 생각하고, 할 일이 더 많습니다. 당신은'map (m => m.as [dataMap])'을 할 수 없다. 'm'은'String'이 될 것입니다. 'JSON' 파서를 사용하여'm'을'dataMap'의 인스턴스로 변환해야합니다. –

관련 문제