2014-12-09 4 views
0

스파크에서 카산드라에 데이터를 쓰는 동안 데이터가 쓰여지지 않습니다.
플래시백 :
kafka-sparkStreaming-cassandra 통합을하고 있습니다.
카프카 메시지를 읽고 카산드라 테이블에 넣으려고합니다 CREATE TABLE TEST_TABLE(key INT PRIMARY KEY, value TEXT).
스파크 스트리밍을하는 카프카는 시원하게 달리고 있지만 카산드라에게 불꽃이 날 때, 데이터가 테이블에 쓰여지지 않는 문제가 있습니다.
cassandra와의 연결을 만들 수 있지만 데이터가 cassandra 테이블에 삽입되지 않습니다. 출력은 연결이되고 다음 연결이 끊어짐을 보여줍니다.
System.out.print()의 문자열이 모두 출력됩니다.카프카 스파크 스트리밍 데이터가 카산드라에 기록되지 않습니다. 제로 행이 삽입되었습니다.

+++++++++++cassandra connector created++++++++++++++++++++++++++++ 
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++ 
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++ 

카산드라 쉘은 0 행을 표시합니다.
전체 코드 및 로그 및 종속성은 다음과 같습니다 :

public class SparkStream { 
    static int key=0; 
    public static void main(String args[]) throws Exception 
    { 

     if(args.length != 3) 
     { 
      System.out.println("parameters not given properly"); 
      System.exit(1); 
     } 

     Logger.getLogger("org").setLevel(Level.OFF); 
     Logger.getLogger("akka").setLevel(Level.OFF); 
     Map<String,Integer> topicMap = new HashMap<String,Integer>(); 
     String[] topic = args[2].split(","); 
     for(String t: topic) 
     { 
      topicMap.put(t, new Integer(3)); 
     } 

     /* Connection to Spark */ 
     SparkConf conf = new SparkConf(); 
     conf.set("spark.cassandra.connection.host", "localhost"); 
     JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf); 
     JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000)); 


     /* connection to cassandra */ 
     CassandraConnector connector = CassandraConnector.apply(sc.getConf()); 
     System.out.println("+++++++++++cassandra connector created++++++++++++++++++++++++++++"); 


     /* Receive Kafka streaming inputs */ 
     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); 
     System.out.println("+++++++++++++streaming Connection done!+++++++++++++++++++++++++++"); 


     /* Create DStream */     
     JavaDStream<TestTable> data = messages.map(new Function< Tuple2<String,String>, TestTable >() 
     { 
      public TestTable call(Tuple2<String, String> message) 
      { 
       return new TestTable(new Integer(++key), message._2()); 
      } 
     } 
     ); 
     System.out.println("++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++"); 


     /* Write to cassandra */ 
     javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra(); 


     jssc.start(); 
     jssc.awaitTermination(); 

    } 
} 

class TestTable implements Serializable 
{ 
    Integer key; 
    String value; 

    public TestTable() {} 

    public TestTable(Integer k, String v) 
    { 
     key=k; 
     value=v; 
    } 

    public Integer getKey(){ 
     return key; 
    } 

    public void setKey(Integer k){ 
     key=k; 
    } 

    public String getValue(){ 
     return value; 
    } 

    public void setValue(String v){ 
     value=v; 
    } 

    public String toString(){ 
     return MessageFormat.format("TestTable'{'key={0}, value={1}'}'", key, value); 

    } 
} 

로그는 다음과 같습니다

+++++++++++cassandra connector created++++++++++++++++++++++++++++ 
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++ 
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++ 
14/12/09 12:07:33 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added 
14/12/09 12:07:33 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster 
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 
14/12/09 12:07:34 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster 

14/12/09 12:07:45 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added 
14/12/09 12:07:45 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster 
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 
14/12/09 12:07:46 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster 

pom.xml 파일 종속성은 다음과 같습니다

<dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>1.1.0</version> 
    </dependency> 

<dependency> 
    <groupId>com.datastax.spark</groupId> 
    <artifactId>spark-cassandra-connector_2.10</artifactId> 
    <version>1.1.0</version> 
</dependency> 
<dependency> 
    <groupId>com.datastax.spark</groupId> 
    <artifactId>spark-cassandra-connector-java_2.10</artifactId> 
    <version>1.1.0</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.10</artifactId> 
    <version>1.1.1</version> 
</dependency> 


    <dependency> 
     <groupId>com.msiops.footing</groupId> 
     <artifactId>footing-tuple</artifactId> 
     <version>0.2</version> 
    </dependency> 

<dependency> 
    <groupId>com.datastax.cassandra</groupId> 
    <artifactId>cassandra-driver-core</artifactId> 
    <version>2.1.3</version> 
</dependency> 

가에 문제가 코드? 또는 카산드라 구성?

+1

당신이 데이터를 실제로 아무것도가 있는지 여부를 확인 했 카산드라 테이블에 저장하기로 클래스와 다른 자바 파일 TestTable.java을 만들어? data.print() 함수를 호출하여 테스트 할 수 있습니다. – RussS

+0

@RussS - 네, 그런 식으로 인쇄하는 것이 지쳤습니다. 그것의 일 벌금. TestTable.toString() 메서드에서 단일 String을 반환하는 것과 같은 것이 있지만 cassandra 테이블에는 두 개의 열 (키 INT 및 값 TEXT)이 있습니다. ?? – aiman

답변

1

이 문제를 해결했습니다. columnMapper는 TestTable 클래스의 getter 및 setter에 액세스 할 수 없습니다. 이렇게 액세스 수정자가 public으로 변경되었습니다. 하지만 지금은 하나의 파일에 2 개의 공개 클래스가 있습니다. 이것은 오류입니다. 그렇게

public class TestTable implements Serializable { 
//code 
} 

이제 메시지가 카프카에서 읽을되고 있으며

관련 문제