2016-06-20 3 views
1

2 개의 입력이 있습니다. 첫 번째 입력은 스트림 (입력 1)이고 두 번째 입력은 배치 (예 : input2)입니다. 첫 번째 입력의 키가 두 번째 입력의 단일 행 또는 두 개 이상의 행과 일치하는지 파악하고 싶습니다. 상기 추가 변환/로직 로우 일치 또는 복수의 열이 일치 단일 여부 지금까지DStream이 비어 있는지 알아내는 방법

val input1Pair = streamData.map(x => (x._1, x)) 
val input2Pair = input2.map(x => (x._1, x)) 
val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)} 
val result = joinData.mapValues{ 
    case(v, Some(a)) => 1L 
    case(v, None) => 0 
}.reduceByKey(_ + _).filter(_._2 > 1) 
시도

if(single row matches){ 
    // do something 
}else{ 
    // do something 
} 

코드 (이어야 하나의 첫 번째 입력 키에 대한) 일치하는 행의 수에 의존

위 코드를 작성했습니다. result.print를 수행하면 모든 키가 input2의 한 행과 만 일치하면 아무 것도 인쇄되지 않습니다. DStream에 여러 RDD가있을 수 있으므로 DStream이 비어 있는지 여부를 파악할 방법이 확실하지 않습니다. 이것이 가능하다면 if check를 할 수 있습니다.

답변

3

DStream은 시간 경과에 따른 컬렉션을 나타내므로 DStream이 비어 있는지 확인할 수있는 기능이 없습니다. 개념적 관점에서 비어있는 DStream은 결코 데이터가없고 매우 유용하지 않은 스트림입니다. 무엇을 할 수 있는지

는 주어진 microbatch 데이터가 있는지 여부를 확인하는 것입니다 :

dstream.foreachRDD{ rdd => if (rdd.isEmpty) {...} } 

즉 특정 시점에서 하나의 RDD 거기에 있습니다.

실제 질문은 참조 RDD와 DStream의 데이터 간의 일치 수를 확인하는 방법입니다. 우리는 또한 foreachRDD 작업 내에서 RDD 중심의 변환을 배치 할 수 있습니다

val intersectionDStream = streamData.transform{rdd => rdd.intersection(input2)} 
intersectionDStream.foreachRDD{rdd => 
    if (rdd.count > 1) { 
     ..do stuff with the matches 
    } else { 
     ..do otherwise 
    } 
} 

:

streamData.foreachRDD{rdd => 
    val matches = rdd.intersection(input2) 
    if (matches.count > 1) { 
     ..do stuff with the matches 
    } else { 
     ..do otherwise 
    } 
} 
+0

주셔서 감사합니다 응답을 많이 아마도 가장 쉬운 방법은 두 컬렉션을 교차 및 교차 크기를 확인하는 것입니다. 필자의 경우 input1 RDD 타입은 input2 RDD 타입과 다릅니다. 마지막으로 코 그룹을 사용하여 구현되었습니다. – Dazzler

관련 문제