2016-10-09 3 views
2

scala.collection.immutable.List $ SerializationProxy 인스턴스를 org.apache.spark.rdd.RDD.org 필드에 할당 할 수 없습니다. $ apache $ spark $ RDD $ RDD $$ dependency_ of type scala org.apache.spark.rdd.MapPartitionsRDD스파크 클러스터 드라이버가 오류와 함께 실패합니다 -

JavaPairInputDStream 메시지 = KafkaUtils.createDirectStream ( JSSC, String.class, 바이트 []. 클래스 StringDecoder.class, DefaultDecoder의 인스턴스 .collection.Seq. 클래스, kafkaParams, topicsSet );

 JavaDStream<CustomerActivityRequestModel> customerActivityStream = messages.map(new Function<Tuple2<String, byte[]>, CustomerActivityRequestModel>() { 
      /** 
     * 
     */ 
     private static final long serialVersionUID = -75093981513752762L; 

      @Override 
      public CustomerActivityRequestModel call(Tuple2<String, byte[]> tuple2) throws IOException, ClassNotFoundException { 

       CustomerActivityRequestModel x = NearbuySessionWorkerHelper.unmarshal(CustomerActivityRequestModel.class, tuple2._2); 
       LOGGER.info(x.getActionLink()); 
       LOGGER.info(x.getAppVersion()); 
       return x; 
      } 
     }); 




    customerActivityStream.foreachRDD(new VoidFunction<JavaRDD<CustomerActivityRequestModel>>() { 



     /** 
     * 
     */ 
     private static final long serialVersionUID = -9045343297759771559L; 

     @Override 
     public void call(JavaRDD<CustomerActivityRequestModel> customerRDD) throws Exception { 
      Configuration hconf = HBaseConfiguration.create(); 
      hconf.set("hbase.zookeeper.quorum", "localhost"); 
      hconf.set("hbase.zookeeper.property.clientPort", "2181"); 
      //hconf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName); 
      hconf.set(TableInputFormat.INPUT_TABLE, hbaseTableName); 
      Job newAPIJobConfiguration1 = Job.getInstance(hconf); 
      newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName); 
      newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); 

      JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts= customerRDD.mapToPair(new PairFunction<CustomerActivityRequestModel, ImmutableBytesWritable, Put>() { 


       /** 
       * 
       */ 
       private static final long serialVersionUID = -6574479136167252295L; 

       @Override 
       public Tuple2<ImmutableBytesWritable, Put> call(CustomerActivityRequestModel customer) throws Exception { 


          Bytes.toBytes("long"),Bytes.toBytes(customer.getLongitude())); 
        return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put); 
       } 
      }); 
      hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration()); 

     } 
    }); 

답변

0

실행중인 jar 파일이 각 노드의 클래스 경로에 있어야하며,이 경우 동일한 문제가 해결됩니다.

관련 문제