2014-12-05 4 views
3

나는 스파크 스트리밍을 사용하여 카프카 스트리밍 메시지를 읽습니다. 이제 출력으로 카산드라를 설정하려고합니다. "텍스트 값이" 내가 이렇게 성공적으로 JavaDStream<Tuple2<String,String>> data에 데이터를 매핑 한 : "텍스트 기본 키 키"와 나는 열 "test_table"카산드라의 테이블을 만든spark-streaming : 캐스 산드라로 스트리밍 데이터를 출력하는 방법

JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf); 
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); 

JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); 
JavaDStream<Tuple2<String,String>> data = messages.map(new Function< Tuple2<String,String>, Tuple2<String,String> >() 
{ 
    public Tuple2<String,String> call(Tuple2<String, String> message) 
    { 
     return new Tuple2<String,String>(message._1(), message._2()); 
    } 
} 
); 

그럼 내가 생성 한 목록 :

List<TestTable> list = new ArrayList<TestTable>(); 
TestTable 멤버 "키"와 "값"내 카산드라 테이블과 같은 구조를 갖는 내 사용자 정의 클래스입니다

:

class TestTable 
{ 
    String key; 
    String val; 

    public TestTable() {} 

    public TestTable(String k, String v) 
    { 
     key=k; 
     val=v; 
    } 

    public String getKey(){ 
     return key; 
    } 

    public void setKey(String k){ 
     key=k; 
    } 

    public String getVal(){ 
     return val; 
    } 

    public void setVal(String v){ 
     val=v; 
    } 

    public String toString(){ 
     return "Key:"+key+",Val:"+val; 
    } 
} 

JavaDStream<Tuple2<String,String>> data의 데이터를 List<TestTable> list에 추가하는 방법을 제안하십시오. 나는 이후 카산드라으로 RDD 데이터를 저장

JavaRDD<TestTable> rdd = sc.parallelize(list); 
javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", "test_table"); 

을 사용할 수 있도록 나는이 일을하고있다.

messages.foreachRDD(new Function<Tuple2<String,String>, String>() 
         { 
          public List<TestTable> call(Tuple2<String,String> message) 
          { 
           String k = message._1(); 
           String v = message._2(); 
           TestTable tbl = new TestTable(k,v); 
           list.put(tbl); 
          } 
         } 
        ); 

을하지만 happenning 어떤 종류의 잘못 일치를 보인다

나는 이런 식으로 코딩을 시도했다. 도와주세요.

답변

6

이 프로그램의 의도는 kafka의 스트리밍 데이터를 Cassandra에 저장하려는 것으로 가정하면 JavaDStream<Tuple2<String,String>> 데이터를 List<TestTable> 목록에 덤프 할 필요가 없습니다.

DataStax의 Spark-Cassandra 커넥터는 Spark Streaming extensions을 통해이 기능을 직접 지원합니다.

JavaDStream에 이러한 확장을 사용하기에 충분해야한다 : 대신 중간 목록에 데이터를 배수의

javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra(); 

.

+0

@ maasg- 감사합니다. 이제 연결을 만들 수 있습니다. 그러나 이제 데이터는 카산드라 테이블에 삽입되지 않습니다. 로그에 연결이 표시되고 다음 초에는 연결이 끊어집니다. 전체 코드와 로그 및 종속성은 "http://stackoverflow.com/questions/27386223/spark-data-not-getting-written-into-cassandra-zerorows-inserted"에 있습니다. – aiman

관련 문제