2017-04-26 1 views
0

나는 다음과 같은 mapparttion 기능을 적용한 후 (SparkSQL 1.6.2에서) 새로운 dataframe을 만들려고하고 있어요 : 볼 수 있듯이자바 API와 JavaRDD를 사용하여 스파크 SQL에 dataframe하는 <Row>

FlatMapFunction<Iterator<Row>,Row> mapPartitonstoTTF=rows-> 
{ 
    List<Row> mappedRows=new ArrayList<Row>();  
    while(rows.hasNext()) 
    { 
     Row row=rows.next();    
     Row mappedRow= RowFactory.create(row.getDouble(0),row.getString(1),row.getLong(2),row.getDouble(3),row.getInt(4),row.getString(5), 
       row.getString(6),row.getInt(7),row.getInt(8),row.getString(9),0L);  
     mappedRows.add(mappedRow); 

    } 
    return mappedRows; 

}; 


JavaRDD<Row> sensorDataDoubleRDD=oldsensorDataDoubleDF.toJavaRDD().mapPartitions(mapPartitonstoTTF); 

StructType oldSchema=oldsensorDataDoubleDF.schema(); 
StructType newSchema =oldSchema.add("TTF",DataTypes.LongType,false); 

System.out.println("The new schema is: "); 
newSchema.printTreeString(); 

System.out.println("The old schema is: "); 
oldSchema.printTreeString(); 

DataFrame sensorDataDoubleDF=hc.createDataFrame(sensorDataDoubleRDD, newSchema); 
sensorDataDoubleDF.show(); 

위에서 새로운 값을 가진 LongType 열을 추가하려고합니다. RowFactory.create() function

그러나 sensorDataDoubleDF.show()를 실행하면 예외가 발생합니다. 다음과 같이 :

org.apache.spark.SparkException: Job aborted due to stage failure: Task 117 in stage 26.0 failed 4 times, most recent failure: Lost task 117.3 in stage 26.0 (TID 3249, AUPER01-01-20-08-0.prod.vroc.com.au): scala.MatchError: 1435766400001 (of class java.lang.Long) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) 
    at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) 
    at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

옛 스키마는 새 스키마가 LongType

root 
|-- data_quality: double (nullable = false) 
|-- data_sensor: string (nullable = true) 
|-- data_timestamp: long (nullable = false) 
|-- data_valueDouble: double (nullable = false) 
|-- day: integer (nullable = false) 
|-- dpnode: string (nullable = true) 
|-- dsnode: string (nullable = true) 
|-- month: integer (nullable = false) 
|-- year: integer (nullable = false) 
|-- nodeid: string (nullable = true) 
|-- nodename: string (nullable = true) 
|-- TTF: long (nullable = false) 

나는 그것을 내기 위해 어떤 도움을 주셔서 감사합니다 같은 TTF 컬럼의 추가와 같은 위입니다

root 
|-- data_quality: double (nullable = false) 
|-- data_sensor: string (nullable = true) 
|-- data_timestamp: long (nullable = false) 
|-- data_valueDouble: double (nullable = false) 
|-- day: integer (nullable = false) 
|-- dpnode: string (nullable = true) 
|-- dsnode: string (nullable = true) 
|-- month: integer (nullable = false) 
|-- year: integer (nullable = false) 
|-- nodeid: string (nullable = true) 
|-- nodename: string (nullable = true) 

우리의 내가 실수하고있는 곳.

답변

1

이전 스키마에는 11 개의 열이 있지만 10 개만 매핑됩니다. row.getString(10)RowFactory.create 함수에 추가하십시오.

Row mappedRow= RowFactory.create(row.getDouble(0),row.getString(1),row.getLong(2),row.getDouble(3),row.getInt(4),row.getString(5), 
       row.getString(6),row.getInt(7),row.getInt(8),row.getString(9),row.getString(10),0L); 
관련 문제