2017-09-27 4 views
0

스파크 스트리밍에서 Kafka 스트림으로부터 수 백만의 메시지를 받고 있습니다. 15 가지 유형의 메시지가 있습니다. 메시지는 단일 주제에서 온 것입니다. 메시지의 내용 만 차별화 할 수 있습니다. 그래서 나는 rdd.contains 메서드를 사용하여 다른 유형의 rdd를 얻고있다.은 spark-scala에서 rdd.contains 함수입니다.

샘플 메시지

{ "A" "foo에", "B": "바", "종류": "제".......}
{ "A": " foo1 ","b ":"bar1 ","type ":"second ".......}
{"a ":"foo2 ","b ":"bar2 ","type ":" 세 번째 ".......}
{"a ":"foo ","b ":"bar ","type ":"first ".......}
.... ..........
...............
.........

에 이렇게

코드

DStream.foreachRDD { rdd => 
    if (!rdd.isEmpty()) { 
    val rdd_first = rdd.filter { 
     ele => ele.contains("First") 
    } 
    if (!rdd_first.isEmpty()) { 
     insertIntoTableFirst(hivecontext.read.json(rdd_first)) 
    } 
    val rdd_second = rdd.filter { 
     ele => ele.contains("Second") 
    } 
    if (!rdd_second.isEmpty()) { 
    insertIntoTableSecond(hivecontext.read.json(rdd_second)) 
    } 
     ............. 
     ...... 
    same way for 15 different rdd 

는 카프카의 주제 메시지에서 다른 RDD를 얻을 수있는 방법은 무엇입니까?

답변

1

rdd.contains은 없습니다. 여기에 사용 된 contains 함수는 RDD에있는 String에 적용됩니다.

여기처럼 : String의 다른 콘텐츠 오류의 결과로, 비교를 만날 수 있기 때문에

val rdd_first = rdd.filter { 
    element => element.contains("First") // each `element` is a String 
} 

이 방법은 강력하지 않습니다.

{"a":"foo", "b":"bar","type":"second", "c": "first", .......} 

이 문제를 처리하는 한 가지 방법은 먼저 JSON 데이터를 적절한 레코드로 변환 한 다음 해당 레코드에 그룹화 또는 필터링 논리를 적용하는 것입니다. 이를 위해서는 먼저 데이터의 스키마 정의가 필요합니다. 스키마, 우리는 JSON으로 기록을 분석 할 수 있으며, 그 위에 어떤 처리를 적용

case class Record(a:String, b:String, `type`:String) 

import org.apache.spark.sql.types._ 
val schema = StructType(
       Array(
       StructField("a", StringType, true), 
       StructField("b", StringType, true), 
       StructField("type", String, true) 
       ) 
      ) 

val processPerType: Map[String, Dataset[Record] => Unit ] = Map(...) 

stream.foreachRDD { rdd => 
    val records = rdd.toDF("value").select(from_json($"value", schema)).as[Record] 
    processPerType.foreach{case (tpe, process) => 
     val target = records.filter(entry => entry.`type` == tpe) 
     process(target) 
    } 
} 

문제를 논리의 종류는 기록의 각 유형에 적용 할 필요가 무엇을 지정하지 않습니다. 여기서 제시되는 것은 커스텀 로직이 함수 Dataset[Record] => Unit으로 표현 될 수있는 문제에 접근하는 일반적인 방법입니다.

논리를 집계로 표현할 수 있다면 아마도 Dataset 집계 함수가 더 적합 할 것입니다.

+0

하이브에 데이터를 저장해야합니다. 하이브에는 15 가지 테이블이 있습니다. 업데이트 된 질문. 사실 JSON의 단일 유형에는 50 개 이상의 열이 있습니다. 그래서 저는 15 개의 케이스 클래스를 만들어야합니다. 케이스 클래스를 만드는 대신 다른 클래스가 있습니까 ?? –

+0

@KishoreKumarSuthar 데이터가 '사례 클래스'(Spark 용어로)로 '구조화'된 후에 특정 테이블과 일치하도록 데이터를 투영 할 수 있습니다. '(val tableProjection1 = records select ($ "column", $ "column", ...) where ($ "type"=== ...)' – maasg

관련 문제