내 요구 사항은 감시중인 디렉토리의 모든 기존 파일을 읽는 Spark Streaming 응용 프로그램을 구축하고 있습니다.파일 필터가 Spark StreamingContext.fileStream (...) API에서 작동하지 않습니다.
나는 이것을 위해 StreamingContext.fileStream(...)
API를 사용하고 있습니다. 이 API는 필터 함수를 전달하는 데 필요합니다. 내 경우에는 항상 모든 파일을 읽을 필요가 있으므로 true
을 반환합니다. newFilesOnly
플래그가 StreamingContext.fileStream(...)
인 경우 false
으로 설정됩니다.
은 [여기 API doc]
없다하지만, 상관없이 필터 함수가 반환 또는 newFilesOnly
플래그는 d 스트림 대응에서 만든 RDDs는 비어,로 설정됩니다.
이 여기에 코드입니다 : 나는 필터 기능과 newFilesOnly 플래그의 리턴 값의 다양한 조합을 시도
val ssc = new StreamingContext(sparkConf, Seconds(30))
val filterF = new Function[Path, Boolean] {
def apply(x: Path): Boolean = {
println("In File " + x.toString) //Prints exisitng file's path as expected
true
}
}
val strm = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3n://<bucket>/", filterF, false).map(_._2.toString)
strm.print() //DOESN'T PRINT ANYTHING
, 아무 일 없습니다.
대신 StreamingContext.textFileStream(...)
을 사용하면 정상적으로 작동하지만이 API의 예상되는 동작 만 새 파일을 읽습니다.
여기에 뭔가가 있습니까? 어떤 도움을 주시면 감사하겠습니다. 미리 감사드립니다!
이 작업을 어떻게 실행하고 있습니까? 즉, 해당 버킷에 새 파일을 복사합니까? –