2014-11-03 3 views
3

슬라이딩 윈도우를 통해 DStream 배치에 캡처 된 모든 RDD의 결합 부분 함수를 전달하려고합니다.Spark : PartialFunction을 DStream에 전달하려면 어떻게해야합니까?

window
val ssc = new StreamingContext(new SparkConf(), Seconds(1)) 
val stream = ssc.socketStream(...) 
val window = stream.window(Seconds(10)) 

는 K 많은 RDDs있을 것이다 : 나는 1 개 초 배치에 이산화 스트림에 10 초 동안 창 작업을 구성 말할 수 있습니다. 이 RDD의 모든 K 조합에 collect(f: PartialFunction[T, U])을 사용하고 싶습니다. foreachRDD을 사용하여 조합 연산자 ++을 호출 할 수 있지만 이 아닌 RDD을 반환하고 부작용을 방지하고 싶습니다.

은 내가 찾고 내가 지금처럼 사용할 수있는 DStream

def reduce(f: (RDD[T], RDD[T]) ⇒ RDD[T]): RDD[T] 

같은 감속기입니다 :

window.reduce(_ ++ _).transform(_.collect(myPartialFunc)) 

을하지만이 불꽃 스트리밍 API에 사용할 수 없습니다.

스트림에서 캡처 한 RDD를 단일 RDD로 결합하는 데 유용한 아이디어가 있습니까? 부분 기능을 전달할 수 있습니까? 아니면 내 자신의 RDD 감속기를 구현? 아마도이 기능은 이후의 Spark 릴리스에 포함될 것입니까?

+0

당신은 일정 기간 동안 RDD를 얻을 수 있습니다. – Anant

+0

@Anant 기간은 어디에서 시작하고 끝나나요? DStream 메서드'compute'는'validTime' 매개 변수만을 받아들입니다. 이게 내 창문의 시작이나 끝인가요? 또한, 배치와 같은 간격으로 '계산'을 반복적으로 호출해야하는 것을 어떻게 처리할까요? 나는 덜 무언가를 찾고있다. – nmurthy

+0

@nmurthy DStream에서'collect'를 할 수 없습니다. 당신이하려는 일에 대해 더 설명해 주시겠습니까? 다른 방법이있을 것입니다. – maasg

답변

2

부분 함수는 DStream 연산에서 직접 지원되지 않지만 동일한 기능을 수행하는 것은 어렵지 않습니다. 예를 들어

,의이 숫자인지 문자열 문자열의 int를 생산한다 사소한 부분 기능 보자 :

val pf:PartialFunction[String,Int] = {case x if (Try(x.toInt).isSuccess) => x.toInt} 

을 우리는 문자열의 d 스트림이 있습니다

val stringDStream:DStream[String] = ??? // use your stream source here 

그러면 부분 함수를 다음과 같이 DStream에 적용 할 수 있습니다.

val intDStream = stringDStream.filter(x => pf.isDefinedAt(x)).map(pf) 
관련 문제