0

아래 코드는 제 코드입니다.com.datastax.spark.connector.writer.NullKeyColumnException : 키 열 연도의 잘못된 null 값

directKafkaStream.foreachRDD(rdd -> 
    { 
     rdd.foreach(record -> 
      { 
       messages1.add(record._2); 
      }); 
       JavaRDD<String> lines = sc.parallelize(messages1); 
       JavaPairRDD<Integer, String> data = lines.mapToPair(new PairFunction<String, Integer, String>() 
       { 
        @Override 
        public Tuple2<Integer, String> call(String a) 
        { 
         String[] tokens = StringUtil.split(a, '%'); 
         return new Tuple2<Integer, String>(Integer.getInteger(tokens[3]),tokens[2]); 
        } 
       }); // map to get year and name of the movie 
       Function2<String, String, String> reduceSumFunc = (accum, n) -> (accum.concat(n)); // function for reduce 
       JavaPairRDD<Integer, String> yearCount = data.reduceByKey(reduceSumFunc); // reduceByKey to count 
       javaFunctions(yearCount).writerBuilder("movie_keyspace", "movie_count", mapTupleToRow(Integer.class, String.class)).withColumnSelector(someColumns("year","list_of_movies")).saveToCassandra(); // this is the error line 
      }); 

여기에 오류가 있습니다.

com.datastax.spark.connector.writer.NullKeyColumnException: Invalid null value for key column year 
    at com.datastax.spark.connector.writer.RoutingKeyGenerator$$anonfun$fillRoutingKey$1.apply$mcVI$sp(RoutingKeyGenerator.scala:49) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) 
    at com.datastax.spark.connector.writer.RoutingKeyGenerator.fillRoutingKey(RoutingKeyGenerator.scala:47) 
    at com.datastax.spark.connector.writer.RoutingKeyGenerator.apply(RoutingKeyGenerator.scala:56) 
    at com.datastax.spark.connector.writer.TableWriter.batchRoutingKey(TableWriter.scala:126) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$19.apply(TableWriter.scala:151) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$19.apply(TableWriter.scala:151) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:107) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745)   

설명 :

1) JavaRDD 저장할 수있게하지만 산드로 JavaPairRDD를 저장할 수없는 스파크

2)를 사용 카프카 산드 연결하려고

3) DB 오류가있는 행에서 주석을 제공했습니다.

답변

0

연도 값 중 하나가 null이며 허용되지 않습니다. 데이터를 확인하고 null 정수를 생성하는 부분을 찾으십시오.