나는 스파크 스트리밍을 사용하여 카프카 스트리밍 메시지를 읽습니다. 이제 출력으로 카산드라를 설정하려고합니다. "텍스트 값이" 내가 이렇게 성공적으로 JavaDStream<Tuple2<String,String>> data
에 데이터를 매핑 한 : "텍스트 기본 키 키"와 나는 열 "test_table"카산드라의 테이블을 만든spark-streaming : 캐스 산드라로 스트리밍 데이터를 출력하는 방법
JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
JavaDStream<Tuple2<String,String>> data = messages.map(new Function< Tuple2<String,String>, Tuple2<String,String> >()
{
public Tuple2<String,String> call(Tuple2<String, String> message)
{
return new Tuple2<String,String>(message._1(), message._2());
}
}
);
그럼 내가 생성 한 목록 :
List<TestTable> list = new ArrayList<TestTable>();
TestTable 멤버 "키"와 "값"내 카산드라 테이블과 같은 구조를 갖는 내 사용자 정의 클래스입니다
:
class TestTable
{
String key;
String val;
public TestTable() {}
public TestTable(String k, String v)
{
key=k;
val=v;
}
public String getKey(){
return key;
}
public void setKey(String k){
key=k;
}
public String getVal(){
return val;
}
public void setVal(String v){
val=v;
}
public String toString(){
return "Key:"+key+",Val:"+val;
}
}
JavaDStream<Tuple2<String,String>> data
의 데이터를 List<TestTable> list
에 추가하는 방법을 제안하십시오. 나는 이후 카산드라으로 RDD 데이터를 저장
JavaRDD<TestTable> rdd = sc.parallelize(list);
javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", "test_table");
을 사용할 수 있도록 나는이 일을하고있다.
messages.foreachRDD(new Function<Tuple2<String,String>, String>()
{
public List<TestTable> call(Tuple2<String,String> message)
{
String k = message._1();
String v = message._2();
TestTable tbl = new TestTable(k,v);
list.put(tbl);
}
}
);
을하지만 happenning 어떤 종류의 잘못 일치를 보인다
나는 이런 식으로 코딩을 시도했다. 도와주세요.
@ maasg- 감사합니다. 이제 연결을 만들 수 있습니다. 그러나 이제 데이터는 카산드라 테이블에 삽입되지 않습니다. 로그에 연결이 표시되고 다음 초에는 연결이 끊어집니다. 전체 코드와 로그 및 종속성은 "http://stackoverflow.com/questions/27386223/spark-data-not-getting-written-into-cassandra-zerorows-inserted"에 있습니다. – aiman