스파크에서 카산드라에 데이터를 쓰는 동안 데이터가 쓰여지지 않습니다.
플래시백 :
kafka-sparkStreaming-cassandra 통합을하고 있습니다.
카프카 메시지를 읽고 카산드라 테이블에 넣으려고합니다 CREATE TABLE TEST_TABLE(key INT PRIMARY KEY, value TEXT)
.
스파크 스트리밍을하는 카프카는 시원하게 달리고 있지만 카산드라에게 불꽃이 날 때, 데이터가 테이블에 쓰여지지 않는 문제가 있습니다.
cassandra와의 연결을 만들 수 있지만 데이터가 cassandra 테이블에 삽입되지 않습니다. 출력은 연결이되고 다음 연결이 끊어짐을 보여줍니다.
System.out.print()
의 문자열이 모두 출력됩니다.카프카 스파크 스트리밍 데이터가 카산드라에 기록되지 않습니다. 제로 행이 삽입되었습니다.
+++++++++++cassandra connector created++++++++++++++++++++++++++++
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++
카산드라 쉘은 0 행을 표시합니다.
전체 코드 및 로그 및 종속성은 다음과 같습니다 :
public class SparkStream {
static int key=0;
public static void main(String args[]) throws Exception
{
if(args.length != 3)
{
System.out.println("parameters not given properly");
System.exit(1);
}
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
Map<String,Integer> topicMap = new HashMap<String,Integer>();
String[] topic = args[2].split(",");
for(String t: topic)
{
topicMap.put(t, new Integer(3));
}
/* Connection to Spark */
SparkConf conf = new SparkConf();
conf.set("spark.cassandra.connection.host", "localhost");
JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000));
/* connection to cassandra */
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
System.out.println("+++++++++++cassandra connector created++++++++++++++++++++++++++++");
/* Receive Kafka streaming inputs */
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
System.out.println("+++++++++++++streaming Connection done!+++++++++++++++++++++++++++");
/* Create DStream */
JavaDStream<TestTable> data = messages.map(new Function< Tuple2<String,String>, TestTable >()
{
public TestTable call(Tuple2<String, String> message)
{
return new TestTable(new Integer(++key), message._2());
}
}
);
System.out.println("++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++");
/* Write to cassandra */
javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();
jssc.start();
jssc.awaitTermination();
}
}
class TestTable implements Serializable
{
Integer key;
String value;
public TestTable() {}
public TestTable(Integer k, String v)
{
key=k;
value=v;
}
public Integer getKey(){
return key;
}
public void setKey(Integer k){
key=k;
}
public String getValue(){
return value;
}
public void setValue(String v){
value=v;
}
public String toString(){
return MessageFormat.format("TestTable'{'key={0}, value={1}'}'", key, value);
}
}
로그는 다음과 같습니다
+++++++++++cassandra connector created++++++++++++++++++++++++++++
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++
14/12/09 12:07:33 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/09 12:07:33 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:34 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
14/12/09 12:07:45 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/09 12:07:45 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:46 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
pom.xml 파일 종속성은 다음과 같습니다
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.msiops.footing</groupId>
<artifactId>footing-tuple</artifactId>
<version>0.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>2.1.3</version>
</dependency>
가에 문제가 코드? 또는 카산드라 구성?
당신이 데이터를 실제로 아무것도가 있는지 여부를 확인 했 카산드라 테이블에 저장하기로 클래스와 다른 자바 파일 TestTable.java을 만들어? data.print() 함수를 호출하여 테스트 할 수 있습니다. – RussS
@RussS - 네, 그런 식으로 인쇄하는 것이 지쳤습니다. 그것의 일 벌금. TestTable.toString() 메서드에서 단일 String을 반환하는 것과 같은 것이 있지만 cassandra 테이블에는 두 개의 열 (키 INT 및 값 TEXT)이 있습니다. ?? – aiman