2016-10-05 4 views
1

아파치 스파크를 사용하여 Json 형식의 센서에서 가져온 Apache Kafka의 실시간 데이터를 가져옵니다. 데이터 형식의아파치 스파크를 통한 경고 발생

예 :

{ 
    "meterId" : "M1", 
    "meterReading" : "100" 
} 

나는 실시간으로 경고를 발생하는 규칙을 적용합니다. 즉, 지난 2 시간 동안 "미터 1"의 데이터를 얻지 못했거나 계량기가 일부 한계를 초과하면 경고가 생성되어야합니다.

이렇게 스칼라에서 어떻게하면됩니까?

+0

* 우선 *의 예이다 - 나는 카프카의 JSON이 있어야합니다 아시다시피 하나 당 하나의 메시지 대신에 다음과 같이 보내십시오 ->'{ "meterId": "M1", "meterReading": "100"}' * 두 번째 * : SparkStreamingContext를 생성하고 박쥐의 카프카에서 메시지를 읽도록 설정합니다 체스. * 세 번째 : 모든 경고가있는 장소 (예 : hdfs, kafka ...)에 스파크 및 쓰기 출력을 사용하여 로직을 구현합니다 (일부 제한을 초과, 일부 데이터를 사용하지 않음). – VladoDemcak

+0

감사합니다 @ VladoDemcak. 그것은 나를 많이 도울 것입니다. 그러나 meterId M1 meterReading과 같은 개별 객체를 어떻게 읽을 수 있는지 알고 싶습니다.이 경우 kafka에서 오는 각 라인을 파싱 할 수 있습니까? – Nilesh

+1

더 자세한 내용은 예제와 함께 답변을 추가했습니다. – VladoDemcak

답변

1

답글로 답장을 보내 드리겠습니다. 내가 말했듯이

는 카프카의 JSON은해야한다 : 하나의 메시지 한 줄 당 - 대신을 보내 ->{"meterId":"M1","meterReading":"100"}

당신은 카프카 사용하는 경우 KafkaUtils 당신이 스트림을 만들 수에있다 :

JavaPairDStream<String, String> input = KafkaUtils.createStream(jssc, zkQuorum, group, topics);

쌍 의미는 <kafkaTopicName, JsonMessage>입니다. 따라서 kafkaTopicName을 사용할 필요가 없다면 기본적으로 jsonmessage 만 볼 수 있습니다.

input의 경우 JavaPairDStream documentation에 설명 된 많은 방법을 사용할 수 있습니다. 맵을 사용하여 간단한 JavaDStream에 대한 메시지 만 가져올 수 있습니다.

물론 당신은 사용 사례, 등등 다른 경우와 성능에 따라 달라집니다 gson, jackson 또는 org.json 같은 일부 JSON 파서를 사용할 수 있습니다.

그래서 당신이 뭔가를 할 필요가 :

JavaDStream<String> messagesOnly = input.map(
    new Function<Tuple2<String, String>, String>() { 
     public String call(Tuple2<String, String> message) { 
      return message._2(); 
     } 
    } 
); 

는 이제 메시지 만 withou 카프카 주제 이름이, 당신이 질문에 설명 된 것처럼 이제 당신은 당신의 논리를 사용할 수 있습니다.

여기 경고 메시지 만 있습니다. 다른 곳으로 보낼 수 있습니다.


- 편집 한 후 아래

scala

// batch every 2 seconds 
val ssc = new StreamingContext(sparkConf, Seconds(2)) 
ssc.checkpoint("checkpoint") 

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap 

def filterLogic(message: String): Boolean= 
{ 
    // here your logic for filtering 
} 

// map _._2 takes your json messages 
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 

// filtered data after filter transformation 
val filtered = messages.filter(m => filterLogic(m)) 
+0

@Nilesh 답변에서 나는 'scala'에 대한 예제를 추가했습니다 – VladoDemcak

관련 문제