2017-05-02 2 views
4

Spark 2.1을 사용하여 HDFS 2.7에 데이터를 쓰는 방법을 테스트하려고합니다. 내 데이터는 더미 값의 간단한 시퀀스이며 출력은 id으로 분할되어야합니다.Spark 2.1에 분할 된 쪽모 세공 파일을 저장하는 방법은 무엇입니까?

// Simple case class to cast the data 
case class SimpleTest(id:String, value1:Int, value2:Float, key:Int) 

// Actual data to be stored 
val testData = Seq(
    SimpleTest("test", 12, 13.5.toFloat, 1), 
    SimpleTest("test", 12, 13.5.toFloat, 2), 
    SimpleTest("test", 12, 13.5.toFloat, 3), 
    SimpleTest("simple", 12, 13.5.toFloat, 1), 
    SimpleTest("simple", 12, 13.5.toFloat, 2), 
    SimpleTest("simple", 12, 13.5.toFloat, 3) 
) 

// Spark's workflow to distribute, partition and store 
// sc and sql are the SparkContext and SparkSession, respectively 
val testDataP = sc.parallelize(testData, 6) 
val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key") 
testDf.write.partitionBy("id", "key").parquet("/path/to/file") 

나는 HDFS에서 다음과 같은 트리 구조 얻을 것으로 기대하고있다 :

- /path/to/file 
    |- /id=test/key=1/part-01.parquet 
    |- /id=test/key=2/part-02.parquet 
    |- /id=test/key=3/part-03.parquet 
    |- /id=simple/key=1/part-04.parquet 
    |- /id=simple/key=2/part-05.parquet 
    |- /id=simple/key=3/part-06.parquet 

을하지만 나는 다음과 같은 출력을 얻을 이전 코드 실행하면

/path/to/file/id=/key=24/ 
|-/part-01.parquet 
|-/part-02.parquet 
|-/part-03.parquet 
|-/part-04.parquet 
|-/part-05.parquet 
|-/part-06.parquet 

을 모르겠어요 코드에 문제가 있거나 Spark에서 수행중인 다른 작업이있는 경우

I는 다음과 같이 spark-submit을 실행하고있어 :

스파크 제출 --driver 로컬 메모리에 30g --executor 메모리 30G의 --executor 코어 --master --name APP 8을 --num -executors 8 --conf spark.io.compression.codec = lzf --conf spark.akka.frameSize = 1024 --conf spark.driver.maxResultSize = 1g --conf spark.sql.orc.compression.codec = 압축 해제 - -conf spark.sql.parquet.filterPushdown = 잘 ... 이후 재미있는 myClass가 myFatJar.jar

답변

1

나는 해결책을 발견! Cloudera에 따르면 mapred-site.xml 구성 문제입니다 (아래 링크 참조). 또한 데이터 프레임을 쓰는 대신 testDf.write.partitionBy("id", "key").parquet("/path/to/file")

다음과 같이 작성했습니다. testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file"). <namenode><port>을 각각 HDFS의 마스터 노드 이름 및 포트로 바꿀 수 있습니다.

@ jacek-laskowski에게 깊은 감사를드립니다.

참고 :

https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/m-p/36363#M1090

Writing to HDFS in Spark/Scala

5

... --class 사실 "나를 위해 작동합니다."

SimpleTest 사례 클래스를 사용하여 Spark 2.1에서 데이터 집합을 설명하면 import spark.implicits._이 입력되어 Dataset이됩니다.

내 경우에는 sparksql입니다.

즉, testDataPtestDf (sql.createDataFrame 사용)을 만들 필요가 없습니다.

다른 터미널에서
import spark.implicits._ 
... 
val testDf = testData.toDS 
testDf.write.partitionBy("id", "key").parquet("/path/to/file") 

(/tmp/testDf 디렉토리에 저장 한 후) :

$ tree /tmp/testDf/ 
/tmp/testDf/ 
├── _SUCCESS 
├── id=simple 
│   ├── key=1 
│   │   └── part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
│   ├── key=2 
│   │   └── part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
│   └── key=3 
│    └── part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
└── id=test 
    ├── key=1 
    │   └── part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
    ├── key=2 
    │   └── part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
    └── key=3 
     └── part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 

8 directories, 7 files 
관련 문제