답글로 답장을 보내 드리겠습니다. 내가 말했듯이
는 카프카의 JSON은해야한다 : 하나의 메시지 한 줄 당 - 대신을 보내 ->{"meterId":"M1","meterReading":"100"}
당신은 카프카 사용하는 경우 KafkaUtils 당신이 스트림을 만들 수에있다 :
JavaPairDStream<String, String> input = KafkaUtils.createStream(jssc, zkQuorum, group, topics);
을
쌍 의미는 <kafkaTopicName, JsonMessage>
입니다. 따라서 kafkaTopicName을 사용할 필요가 없다면 기본적으로 jsonmessage 만 볼 수 있습니다.
input
의 경우 JavaPairDStream documentation에 설명 된 많은 방법을 사용할 수 있습니다. 맵을 사용하여 간단한 JavaDStream에 대한 메시지 만 가져올 수 있습니다.
물론 당신은 사용 사례, 등등 다른 경우와 성능에 따라 달라집니다 gson
, jackson
또는 org.json
같은 일부 JSON 파서를 사용할 수 있습니다.
그래서 당신이 뭔가를 할 필요가 :
JavaDStream<String> messagesOnly = input.map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> message) {
return message._2();
}
}
);
는 이제 메시지 만 withou 카프카 주제 이름이, 당신이 질문에 설명 된 것처럼 이제 당신은 당신의 논리를 사용할 수 있습니다.
여기 경고 메시지 만 있습니다. 다른 곳으로 보낼 수 있습니다.
- 편집 한 후 아래
는 scala
// batch every 2 seconds
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
def filterLogic(message: String): Boolean=
{
// here your logic for filtering
}
// map _._2 takes your json messages
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
// filtered data after filter transformation
val filtered = messages.filter(m => filterLogic(m))
* 우선 *의 예이다 - 나는 카프카의 JSON이 있어야합니다 아시다시피 하나 당 하나의 메시지 대신에 다음과 같이 보내십시오 ->'{ "meterId": "M1", "meterReading": "100"}' * 두 번째 * : SparkStreamingContext를 생성하고 박쥐의 카프카에서 메시지를 읽도록 설정합니다 체스. * 세 번째 : 모든 경고가있는 장소 (예 : hdfs, kafka ...)에 스파크 및 쓰기 출력을 사용하여 로직을 구현합니다 (일부 제한을 초과, 일부 데이터를 사용하지 않음). – VladoDemcak
감사합니다 @ VladoDemcak. 그것은 나를 많이 도울 것입니다. 그러나 meterId M1 meterReading과 같은 개별 객체를 어떻게 읽을 수 있는지 알고 싶습니다.이 경우 kafka에서 오는 각 라인을 파싱 할 수 있습니까? – Nilesh
더 자세한 내용은 예제와 함께 답변을 추가했습니다. – VladoDemcak