2017-10-16 1 views
1

spoke로 데이터를 처리 할 때 java.io.NotSerializableException 문제가 많이 발생합니다. 이 java.io.NotSerializableException의 원인을 실패하고 따라야하지만java.io.NotSerializableException Spark Streaming을 사용하여 Hbase에 rdd를 저장할 때

val hbase_conf = HBaseConfiguration.create() 
hbase_conf.set("hbase.zookeeper.property.clientPort", "2181") 
hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com,hadoop-zk1.s.qima-inc.com,hadoop-zk2.s.qima-inc.com") 
val newAPIJobConfiguration = Job.getInstance(hbase_conf); 
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table"); 
newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]]) 
newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp") 
mydata.foreachRDD(rdd => { 
    val json_rdd = rdd.map(Json.parse _).map(_.validate[Scan]) 
    .map(Scan.transformScanRestult _) 
    .filter(_.nonEmpty) 
    .map(_.get) 
    .map(Scan.convertForHbase _) 
    json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration) 
}) 

object mytest_config{ 
    val hbase_conf = HBaseConfiguration.create() 
    hbase_conf.set("hbase.zookeeper.property.clientPort", "2181") 
    hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2") 
    val newAPIJobConfiguration = Job.getInstance(hbase_conf); 
    newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table"); 
    newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]]) 
    newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp") 
    } 

mydata.foreachRDD(rdd => { 
     val json_rdd = rdd.map(Json.parse _) 
     .map(_.validate[Scan]) 
     .map(Scan.transformScanRestult _) 
     .filter(_.nonEmpty) 
     .map(_.get) 
     .map(Scan.convertForHbase _) 

    json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration) 
}) 

을 다음과 그리고이 일할 수 그래서 난 내 코드를 변경 오류 정보

17/10/16 18:56:50 ERROR Utils: Exception encountered 
     java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 

입니다! 누군가이 작업이 왜 필요한지 아이디어를 얻었으며 공식적으로 권장되는 방법은 무엇입니까?

답변

2

그 오류는

newAPIJobConfiguration에 어쩌면 나는 매우 명확하게 내 질문에 설명하지 않은 내부 직원 (foreach)

json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration) 
+0

를 사용하고 드라이버

val newAPIJobConfiguration = Job.getInstance(hbase_conf); 

에 초기화됩니다. 나는 왜 mytest-config가 작동 할 수 있는지에 대한 예를 들어 모든 설정을 스칼라 객체에 넣는 지 알고 싶다. 드라이버에서 객체가 초기화 되었습니까? –

+0

'mytest_config'는'serializable'이고 다른 하나는 – mrsrinivas

+0

이 아닙니다. 객체가 드라이버에서 작업자로 unserialzable 객체를 변환 할 수 있다는 것을 알았습니다. 정말 고마워. 게으른 발이 사용되어야 할 때가 하나 더 있습니다. 예를 들어 주시겠습니까? –

관련 문제