2013-06-28 2 views
0

내 로컬 컴퓨터에서 작동하는 간단한 Spark-Streaming 예제를 얻으려고합니다.
나는 소켓/조식/고사으로 기록 스레드가 :Spark 스트리밍 소켓에서 감소 작업이 작동하지 않습니다

serverSocket = new ServerSocket(Constants.PORT); 
s1 = serverSocket.accept(); 
while(true) { 
    Thread.sleep(random.nextInt(100)); 
    String character = alphabet.get(random.nextInt(alphabet.size())) ; 
    PrintWriter out = new PrintWriter(s1.getOutputStream()); 
    out.println(character); 
    out.flush(); 
} 

내 주요 프로그램, 내가의 수를 계산하려고합니다/학사/(스텝 감소없이) 다음과 같은 고사가 보인다 :

Time: 1372413296000 ms 
------------------------------------------- 
B 
A 
B 
C 
C 
C 
A 
B 
C 
C 
... 

------------------------------------------- 
Time: 1372413296000 ms 
------------------------------------------- 
(batchCountB,1) 
(batchCountA,1) 
(batchCountB,1) 
(batchCountC,1) 
(batchCountC,1) 
(batchCountC,1) 
(batchCountA,1) 
(batchCountB,1) 
(batchCountC,1) 
(batchCountC,1) 
... 

를하지만지도 후 환원 단계를 추가하는 경우는 anymo 작동하지 않습니다 :이 경우 모두에서

public static void main(String[] args) { 
    // start socket writer thread 
    System.setProperty("spark.cleaner.ttl", "10000"); 
    JavaSparkContext sc = new JavaSparkContext(
      "local", 
      "Test", 
      Constants.SPARK_HOME, 
      new String[]{"target/spark-standalone-0.0.1-SNAPSHOT.jar"}); 
    Duration batchDuration = new Duration(TIME_WINDOW_MS); 
    JavaStreamingContext streamingContext = new JavaStreamingContext(sc, batchDuration); 
    JavaDStream<String> stream = streamingContext.socketTextStream("localhost", Constants.PORT); 
    stream.print(); 
    JavaPairDStream<String, Long> texts = stream.map(new PairFunction<String, String, Long>() { 

      @Override 
      public Tuple2<String, Long> call(String t) throws Exception { 
       return new Tuple2<String, Long>("batchCount" + t, 1l); 
      } 

     }); 
    texts.print(); 
    streamingContext.checkpoint("checkPointDir"); 
    streamingContext.start(); 

은 (일괄 처리를위한 샘플 출력) 잘 작동합니다 레. 이 코드는 난 단지 감소에 대한 첫 번째 "스트림"변수 및 "텍스트"변수 아무것도 출력을 얻을이 경우에는() texts.print 후

JavaPairDStream<String, Long> reduced = texts.reduceByKeyAndWindow(new Function2<Long, Long, Long>() { 

    @Override 
    public Long call(Long t1, Long t2) throws Exception { 
     return t1 + t2; 
    } 
    }, new Duration(TIME_WINDOW_MS)); 
reduced.print(); 

간다. 이 첫 번째 일괄 처리 후에도 아무 일도 일어나지 않습니다. 스파크 로그 레벨을 DEBUG로 설정했지만 예외 나 다른 이상한 일은 발생하지 않았습니다.

여기서 어떻게됩니까? 왜 내가 잠겨 있니?

답변

2

기록 : Spark 사용자 그룹에서 답변을 얻었습니다.
에러는 하나의 동시 처리를 가능하게하기 위해서는, 스파크 컨텍스트를 인스턴스화하는 파라미터로서

"local[2]" 

대신

"local" 

의 사용한다는 것이다.

관련 문제