2017-10-23 1 views
0

각 날짜의 최신 데이터 프레임이 있습니다. 매일 새 qte와 새 ca를 이전 날짜에 추가하고 날짜를 업데이트해야합니다. 그래서 이미 존재하는 것을 업데이트하고 새로운 것을 추가해야합니다. 여기Spark : 조인 작업을 기반으로 Dataframe 업데이트

내가 마지막에하고 싶은 것을 예 :

val histocaisse = spark.read 
     .format("csv") 
     .option("header", "true") //reading the headers 
     .load("C:/Users/MHT/Desktop/histocaisse_dte1.csv") 

    val hist = histocaisse 
     .withColumn("pos_id", 'pos_id.cast(LongType)) 
     .withColumn("article_id", 'pos_id.cast(LongType)) 
     .withColumn("date", 'date.cast(DateType)) 
     .withColumn("qte", 'qte.cast(DoubleType)) 
     .withColumn("ca", 'ca.cast(DoubleType)) 



    val histocaisse2 = spark.read 
     .format("csv") 
     .option("header", "true") //reading the headers 

     .load("C:/Users/MHT/Desktop/histocaisse_dte2.csv") 

    val hist2 = histocaisse2.withColumn("pos_id", 'pos_id.cast(LongType)) 
     .withColumn("article_id", 'pos_id.cast(LongType)) 
     .withColumn("date", 'date.cast(DateType)) 
     .withColumn("qte", 'qte.cast(DoubleType)) 
     .withColumn("ca", 'ca.cast(DoubleType)) 
    hist2.show(false) 

+------+----------+----------+----+----+ 
|pos_id|article_id|date  |qte |ca | 
+------+----------+----------+----+----+ 
|1  |1   |2000-01-07|2.5 |3.5 | 
|2  |2   |2000-01-07|14.7|12.0| 
|3  |3   |2000-01-07|3.5 |1.2 | 
+------+----------+----------+----+----+ 

+------+----------+----------+----+----+ 
|pos_id|article_id|date  |qte |ca | 
+------+----------+----------+----+----+ 
|1  |1   |2000-01-08|2.5 |3.5 | 
|2  |2   |2000-01-08|14.7|12.0| 
|3  |3   |2000-01-08|3.5 |1.2 | 
|4  |4   |2000-01-08|3.5 |1.2 | 
|5  |5   |2000-01-08|14.5|1.2 | 
|6  |6   |2000-01-08|2.0 |1.25| 
+------+----------+----------+----+----+ 

+------+----------+----------+----+----+ 
|pos_id|article_id|date  |qte |ca | 
+------+----------+----------+----+----+ 
|1  |1   |2000-01-08|5.0 |7.0 | 
|2  |2   |2000-01-08|39.4|24.0| 
|3  |3   |2000-01-08|7.0 |2.4 | 
|4  |4   |2000-01-08|3.5 |1.2 | 
|5  |5   |2000-01-08|14.5|1.2 | 
|6  |6   |2000-01-08|2.0 |1.25| 
+------+----------+----------+----+----+ 

내가이

val df = hist2.join(hist1, Seq("article_id", "pos_id"), "left") 
    .select($"pos_id", $"article_id", 
    coalesce(hist2("date"), hist1("date")).alias("date"), 
    (coalesce(hist2("qte"), lit(0)) + coalesce(hist1("qte"), lit(0))).alias("qte"), 
    (coalesce(hist2("ca"), lit(0)) + coalesce(hist1("ca"), lit(0))).alias("ca")) 
    .orderBy("pos_id", "article_id") 

// df.show() 
|pos_id|article_id|  date| qte| ca| 
+------+----------+----------+----+----+ 
|  1|   1|2000-01-08| 5.0| 7.0| 
|  2|   2|2000-01-08|29.4|24.0| 
|  3|   3|2000-01-08| 7.0| 2.4| 
|  4|   4|2000-01-08| 3.5| 1.2| 
|  5|   5|2000-01-08|14.5| 1.2| 
|  6|   6|2000-01-08| 2.0|1.25| 
+------+----------+----------+----+----+ 

목표를했다 이렇게하려면이 존재하고 추가 할 경우 정보를 정기적으로 업데이트하는 것입니다 새로운 것들.하지만 내가 사건을 시도하면 빈은 다음과 같은 문제가 발생했습니다

Exception in thread "main" java.lang.UnsupportedOperationException: empty collection 
    at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1321) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 

취해야 할 조치 첫 번째 테이블이 비어있는 경우조차도 고려하십시오.

+0

'histocaisse_dte1.csv'에 헤더 행이 있습니까? – MaxU

답변

0

그 이유는 schema을 정의하고 csv 파일을 읽을 때 적용해야합니다. 이렇게하면 캐스팅 코드가 필요하지 않습니다. 그런 다음 마지막으로 당신이없이 최종 논리를 적용 할 수있는 schema

val hist1 = spark.read 
    .format("csv") 
    .option("header", "true") //reading the headers 
    .schema(schema) 
    .load("C:/Users/MHT/Desktop/histocaisse_dte1.csv") 

val hist2 = spark.read 
    .format("csv") 
    .option("header", "true") //reading the headers 
    .schema(schema) 
    .load("C:/Users/MHT/Desktop/histocaisse_dte2.csv") 

import org.apache.spark.sql.types._ 
val schema = StructType(Seq(
    StructField("pos_id", LongType, true), 
    StructField("article_id", LongType, true), 
    StructField("date", DateType, true), 
    StructField("qte", LongType, true), 
    StructField("ca", DoubleType, true) 
)) 

후 사용할 수있는대로 스키마를 생성 할 수 있습니다 : 귀하의 경우

모두 dataframes 같은 보인다 오류