2016-09-02 3 views
1

큰 마루 파일을 HDFS의 다른 폴더에있는 여러 쪽 마루 파일로 나누고 싶습니다. 분할 된 테이블을 만들 수 있습니다 (Hive/Drill/Spark SQL) 그 위에.하나의 큰 마루 파일을 여러 개의 마루 파일로 키로 나누십시오.

데이터 예 :

+-----+------+ 
|model| num1| 
+-----+------+ 
| V80| 195.0| 
| V80| 750.0| 
| V80| 101.0| 
| V80| 0.0| 
| V80| 0.0| 
| V80| 720.0| 
| V80|1360.0| 
| V80| 162.0| 
| V80| 150.0| 
| V90| 450.0| 
| V90| 189.0| 
| V90| 400.0| 
| V90| 120.0| 
| V90| 20.3| 
| V90| 0.0| 
| V90| 84.0| 
| V90| 555.0| 
| V90| 0.0| 
| V90| 9.0| 
| V90| 75.6| 
+-----+------+ 

결과 폴더 구조는 "모델"필드로 그룹화해야합니다

def main(args: Array[String]): Unit = { 
    val conf = new SparkConf() 
    case class Infos(name:String, name1:String) 
    val sc = new SparkContext(conf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    val rdd = sqlContext.read.load("hdfs://nameservice1/user/hive/warehouse/a_e550_parquet").select("model", "num1").limit(10000) 

    val tmpRDD = rdd.map { item => (item(0), Infos(item.getString(0), item.getString(1))) }.groupByKey() 

    for (item <- tmpRDD) { 
     import sqlContext.implicits._ 
     val df = item._2.toSeq.toDF() 
     df.write.mode(SaveMode.Overwrite).parquet("hdfs://nameservice1/tmp/model=" + item._1) 
    } 
    } 

그냥 던졌다 :이 같은 스크립트를 시도

+ 
| 
+-----model=V80 
|  | 
|  +----- XXX.parquet 
+-----model=V90 
|  | 
|  +----- XXX.parquet 

예외 : 널 포인트 예외.

답변

1

DataFrame에서 partitionBy를 사용해야합니다. 당신은 groupBy가 필요하지 않습니다. 아래와 같은 것은 당신이 원하는 것을 줄 것입니다.

val df = sqlContext.read.parquet("hdfs://nameservice1/user/hive/warehouse/a_e550_parquet").select("model", "num1").limit(10000) 
df.write.partitionBy("model").mode(SaveMode.Overwrite) 
+0

정확합니다. 사용해 보았습니다. 그러나 그것은 매우 느리게 보입니다, 약 500M의 기록이 있습니다. 어떤 효율적인 방법이 있습니까? –

+0

왜 이렇게 느린 지 많은 이유가있을 수 있습니다. 데이터 스큐 등이있는 경우, 체크해야 할 것들이 있습니다. 작업이 너무 많습니다 (셔플 파티션 크기를 늘리거나 줄일 수 있는지 확인하십시오). 데이터를 모른 채 말하기가 어렵습니다. 당신은 이것을 확인하는 올바른 사람입니다 :-) – Jegan

관련 문제