스파크 스트리밍에서 Kafka 스트림으로부터 수 백만의 메시지를 받고 있습니다. 15 가지 유형의 메시지가 있습니다. 메시지는 단일 주제에서 온 것입니다. 메시지의 내용 만 차별화 할 수 있습니다. 그래서 나는 rdd.contains 메서드를 사용하여 다른 유형의 rdd를 얻고있다.은 spark-scala에서 rdd.contains 함수입니다.
샘플 메시지
{ "A" "foo에", "B": "바", "종류": "제".......}
{ "A": " foo1 ","b ":"bar1 ","type ":"second ".......}
{"a ":"foo2 ","b ":"bar2 ","type ":" 세 번째 ".......}
{"a ":"foo ","b ":"bar ","type ":"first ".......}
.... ..........
...............
.........
코드
DStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val rdd_first = rdd.filter {
ele => ele.contains("First")
}
if (!rdd_first.isEmpty()) {
insertIntoTableFirst(hivecontext.read.json(rdd_first))
}
val rdd_second = rdd.filter {
ele => ele.contains("Second")
}
if (!rdd_second.isEmpty()) {
insertIntoTableSecond(hivecontext.read.json(rdd_second))
}
.............
......
same way for 15 different rdd
는 카프카의 주제 메시지에서 다른 RDD를 얻을 수있는 방법은 무엇입니까?
하이브에 데이터를 저장해야합니다. 하이브에는 15 가지 테이블이 있습니다. 업데이트 된 질문. 사실 JSON의 단일 유형에는 50 개 이상의 열이 있습니다. 그래서 저는 15 개의 케이스 클래스를 만들어야합니다. 케이스 클래스를 만드는 대신 다른 클래스가 있습니까 ?? –
@KishoreKumarSuthar 데이터가 '사례 클래스'(Spark 용어로)로 '구조화'된 후에 특정 테이블과 일치하도록 데이터를 투영 할 수 있습니다. '(val tableProjection1 = records select ($ "column", $ "column", ...) where ($ "type"=== ...)' – maasg