2016-06-16 2 views
0

내 요구 사항은 감시중인 디렉토리의 모든 기존 파일을 읽는 Spark Streaming 응용 프로그램을 구축하고 있습니다.파일 필터가 Spark StreamingContext.fileStream (...) API에서 작동하지 않습니다.

나는 이것을 위해 StreamingContext.fileStream(...) API를 사용하고 있습니다. 이 API는 필터 함수를 전달하는 데 필요합니다. 내 경우에는 항상 모든 파일을 읽을 필요가 있으므로 true을 반환합니다. newFilesOnly 플래그가 StreamingContext.fileStream(...) 인 경우 false으로 설정됩니다.

은 [여기 API doc]

없다하지만, 상관없이 필터 함수가 반환 또는 newFilesOnly 플래그는 d 스트림 대응에서 만든 RDDs는 비어,로 설정됩니다.

이 여기에 코드입니다 : 나는 필터 기능과 newFilesOnly 플래그의 리턴 값의 다양한 조합을 시도

val ssc = new StreamingContext(sparkConf, Seconds(30)) 
val filterF = new Function[Path, Boolean] { 
    def apply(x: Path): Boolean = { 
     println("In File " + x.toString) //Prints exisitng file's path as expected 
     true 
    } 
} 
val strm = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3n://<bucket>/", filterF, false).map(_._2.toString) 
strm.print()  //DOESN'T PRINT ANYTHING 

, 아무 일 없습니다.

대신 StreamingContext.textFileStream(...)을 사용하면 정상적으로 작동하지만이 API의 예상되는 동작 만 새 파일을 읽습니다.

여기에 뭔가가 있습니까? 어떤 도움을 주시면 감사하겠습니다. 미리 감사드립니다!

+0

이 작업을 어떻게 실행하고 있습니까? 즉, 해당 버킷에 새 파일을 복사합니까? –

답변

2

FileInputDStream의 무시 윈도우를 증가시켜 해결했습니다. 이는 spark.streaming.fileStream.minRememberDuration 속성을 변경하여 수행 할 수 있습니다. 기본값은 1 분입니다. 테스트 한 모든 파일의 수정 시간은 1 분 이상 이었으므로 무시되었습니다. 자세한 내용은 코드 설명서 here을 참조하십시오.

관련 문제