2017-09-26 1 views
0

파일을 스파크 스트리밍으로 사용하고 있습니다. 스트림의 단어 수를 계산하고 싶지만 애플리케이션에서 아무 것도 인쇄하지 않습니다. 내 코드는 여기에 있습니다. 나는filstream을 사용하는 스파크 스트리밍 단어 개수가 결과를 인쇄하지 않습니다.

import org.apache.spark.SparkConf 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext 

object TwitterHashtagStreaming { 

def main(args: Array[String]) : Unit = { 

val conf = new SparkConf().setAppName("TwitterHashtagStreaming").setMaster("local[2]").set("spark.executor.memory","1g"); 

val streamingC = new StreamingContext(conf,Seconds(5)) 

val streamLines = streamingC.textFileStream("file:///home/cloudera/Desktop/wordstream") 
val words = streamLines.flatMap(_.split(" ")) 
val counts = words.map(word => (word, 1)).reduceByKey(_ + _) 

counts.print() 

streamingC.start() 
streamingC.awaitTermination() 
} 

} 
+0

무엇이 인쇄됩니까? 오류가 있습니까? –

+0

숫자가 비어있는 것처럼 시간이 없습니다. -------------------------------------- ----- 시간 : 1506415275000 ms --------------------------------------- ---- –

+0

먼저 데이터가 읽혔는지 확인하기 위해 단어 수를 계산하기 전에 streamLines를 인쇄 해보십시오. –

답변

1

주의 깊게 참조하십시오 클라우 데라 환경에 스칼라를 사용하고있어 document :

def textFileStream(directory: String): DStream[String] 

새 파일에 대한 하둡 호환 파일 시스템을 모니터링하고 텍스트로 읽는 입력 스트림을 작성합니다 파일 (키를 LongWritable로 사용, 값을 텍스트로, 입력 형식을 TextInputFormat으로 사용). 동일한 파일 시스템 내의 위치에서 파일을 "이동"하여 파일을 에 기록해야합니다. 파일 이름은로 시작합니다. 이 무시되었습니다. 한마디로

, 그것은 변화 감지기, 당신은 당신의 모니터 디렉토리에 데이터를 기록, 스트리밍 서비스를 시작해야합니다.

이 예를 들어, "스트림 개념"이 실제로 프로덕션 환경에 배포을 시뮬레이션합니다 의미, 네트워크 패킷 것이다 점차 파일과 같은 소득.

관련 문제