2017-09-14 4 views
-1

REPL에서 다음 코드를 사용하여 hfiles를 만들고 hbase로 대량로드합니다. 동일한 코드를 사용하고 spark-submit을 실행하여 오류하지만 난 REPL에서 실행할 때 그것은 내가 rdd.I을 만들려고 할 때 오류가 발생합니다대량로드를 수행하기 위해 spark scala 코드를 실행하는 동안 오류가 발생했습니다.

org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2067) 
    at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:333) 
    at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:332) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.flatMap(RDD.scala:332) 
    at org.apache.spark.sql.DataFrame.flatMap(DataFrame.scala:1418) 

다음과 같은 오류를 던지고있다

import org.apache.spark._ 
import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.hadoop.fs.Path 
import org.apache.hadoop.hbase.client.{ConnectionFactory, HTable} 
import org.apache.hadoop.hbase.mapred.TableOutputFormat 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable 
import org.apache.hadoop.mapreduce.Job 
import org.apache.hadoop.hbase.KeyValue 
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat 
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.types.StringType 

import scala.collection.mutable.ArrayBuffer 
import org.apache.hadoop.hbase.KeyValue 
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions 


val cdt = "dt".getBytes 
val ctemp="temp".getBytes 
val ctemp_min="temp_min".getBytes 
val ctemp_max="temp_max".getBytes 
val cpressure="pressure".getBytes 
val csea_level="sea_level".getBytes 
val cgrnd_level="grnd_level".getBytes 
val chumidity="humidity".getBytes 
val ctemp_kf="temp_kf".getBytes 
val cid="id".getBytes 
val cweather_main="weather_main".getBytes 
val cweather_description="weather_description".getBytes 
val cweather_icon="weather_icon".getBytes 
val cclouds_all="clouds_all".getBytes 
val cwind_speed="wind_speed".getBytes 
val cwind_deg="wind_deg".getBytes 
val csys_pod="sys_pod".getBytes 
val cdt_txt="dt_txt".getBytes 
val crain="rain".getBytes 
val COLUMN_FAMILY = "data".getBytes 
val cols = ArrayBuffer(cdt,ctemp,ctemp_min,ctemp_max,cpressure,csea_level,cgrnd_level,chumidity,ctemp_kf,cid,cweather_main,cweather_description,cweather_icon,cclouds_all,cwind_speed,cwind_deg,csys_pod,cdt_txt,crain) 
val rowKey = new ImmutableBytesWritable() 

val conf = HBaseConfiguration.create() 

val ZOOKEEPER_QUORUM = "address" 

conf.set("hbase.zookeeper.quorum", ZOOKEEPER_QUORUM); 

val connection = ConnectionFactory.createConnection(conf) 

val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferschema","true").load("Hbasedata/Weatherdata.csv") 

val rdd = df.flatMap(x => {      //Error when i run this 
     rowKey.set(x(0).toString.getBytes) 
     for(i <- 0 to cols.length - 1) yield { 
      val index = x.fieldIndex(new String(cols(i))) 
      val value = if (x.isNullAt(index)) "".getBytes else x(index).toString.getBytes 
      (rowKey,new KeyValue(rowKey.get, COLUMN_FAMILY, cols(i), value)) 
     } 
     }) 

오류를 던지고는 동일하게 사용했다 spark-submit의 코드가 잘 작동하고있었습니다.

val rowKey = new ImmutableBytesWritable() 

ImmutableBytesWritable에서

+0

스파크 제출에서 그것을 사용했을 때, 그것을 개체 안에 넣었습니까? –

답변

0

문제는 직렬화하지 않고, 외부 "flatMap"기능에 있습니다. 예외 전체 스택 추적을 확인하십시오.

"flatMap"함수 내에서 언급 된 명령문을 적어도 검사 할 수 있습니다.

관련 문제