2
scala.collection.immutable.List $ SerializationProxy 인스턴스를 org.apache.spark.rdd.RDD.org 필드에 할당 할 수 없습니다. $ apache $ spark $ RDD $ RDD $$ dependency_ of type scala org.apache.spark.rdd.MapPartitionsRDD스파크 클러스터 드라이버가 오류와 함께 실패합니다 -
JavaPairInputDStream 메시지 = KafkaUtils.createDirectStream ( JSSC, String.class, 바이트 []. 클래스 StringDecoder.class, DefaultDecoder의 인스턴스 .collection.Seq. 클래스, kafkaParams, topicsSet );
JavaDStream<CustomerActivityRequestModel> customerActivityStream = messages.map(new Function<Tuple2<String, byte[]>, CustomerActivityRequestModel>() {
/**
*
*/
private static final long serialVersionUID = -75093981513752762L;
@Override
public CustomerActivityRequestModel call(Tuple2<String, byte[]> tuple2) throws IOException, ClassNotFoundException {
CustomerActivityRequestModel x = NearbuySessionWorkerHelper.unmarshal(CustomerActivityRequestModel.class, tuple2._2);
LOGGER.info(x.getActionLink());
LOGGER.info(x.getAppVersion());
return x;
}
});
customerActivityStream.foreachRDD(new VoidFunction<JavaRDD<CustomerActivityRequestModel>>() {
/**
*
*/
private static final long serialVersionUID = -9045343297759771559L;
@Override
public void call(JavaRDD<CustomerActivityRequestModel> customerRDD) throws Exception {
Configuration hconf = HBaseConfiguration.create();
hconf.set("hbase.zookeeper.quorum", "localhost");
hconf.set("hbase.zookeeper.property.clientPort", "2181");
//hconf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
hconf.set(TableInputFormat.INPUT_TABLE, hbaseTableName);
Job newAPIJobConfiguration1 = Job.getInstance(hconf);
newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class);
JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts= customerRDD.mapToPair(new PairFunction<CustomerActivityRequestModel, ImmutableBytesWritable, Put>() {
/**
*
*/
private static final long serialVersionUID = -6574479136167252295L;
@Override
public Tuple2<ImmutableBytesWritable, Put> call(CustomerActivityRequestModel customer) throws Exception {
Bytes.toBytes("long"),Bytes.toBytes(customer.getLongitude()));
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
}
});
hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration());
}
});