2016-06-17 4 views
2

스파크 (버전 1.6.1)를 사용하여 두 개의 json 파일을 생성하기 위해 json 파일을 처리 중입니다. 입력 파일의 크기는 약 30 ~ 40G (100M 레코드)입니다. 생성 된 파일의 경우 큰 파일은 10G ~ 15G (30M 레코드), 작은 파일은 약 500M ~ 750M (1.5M 레코드)입니다. 두 결과 파일은 아래의 문제에 직면하고있다 : 그 수행 "재분할는"하나의 파일로 결과를 병합 한 후 정렬 후 스파크 데이터 프레임이 정렬되지 않습니다.

가 나는 dataframe의 "종류"방법을 호출. 그런 다음 생성 된 파일을 검사하여 기록이 정렬 된 간격으로 이 발견되었지만 전체 파일이 전체적으로 정렬되지 않았습니다. 예 : 파일의 마지막 레코드 (라인 1.9M)의 키 (3 열로 구성)는 "(ou7QDj48c, 014, 075)"이지만 파일의 중간 레코드 키 (라인 375K)는 " pzwzh5vm8, 003, 023) "나는 로컬 상대적으로 작은 입력 소스 (입력 파일 400K 라인), 이러한 경우가 전혀 발생하지 않습니다를 사용하여 코드를 테스트

pzwzh5vm8 003 023 
... 
ou7QDj48c 014 075 

.

big_json = big_json.sort($"col1", $"col2", $"col3", $"col4") 
big_json.repartition(1).write.mode("overwrite").json("filepath") 

누군가가 조언을 줄 수 :

내 구체적인 코드는 다음과 같습니다? 고맙습니다.

(나는 또한 this thread에서 유사한 문제에 대해 논의 했음에도 불구하고 지금까지는 좋은 해결책이 없다고 생각합니다.이 현상이 실제로 재분할 연산에서 비롯된 것이라면 누구나 데이터 프레임을 단일 json 파일로 효과적으로 변환 할 수 있습니까? 정렬 순서를 유지하면서없이하는 것은, RDD로 변환? 감사)

=========================== 솔루션 ==== =========================

정말 @manos @eliasah 및 @pkrishna에서 도움을 주셔서 감사합니다. 나는 당신의 코멘트를 읽은 후에 합체를 사용하는 것에 대해 생각해 보았습니다.하지만 성능을 조사한 후에 나는 그 아이디어를 포기했습니다.

최종 솔루션은 다음과 같습니다 dataframe를 정렬하고 다시 분할하지 않고, JSON으로 작성하거나 뭉쳤다. 모든 작업이 완료되면

hdfs dfs -getmerge /hdfs/file/path/part* ./local.json 

아래 HDFS 명령을 호출이 명령은 내 상상보다 훨씬 낫다. 너무 많은 시간과 공간이 필요하지 않으며 좋은 단일 파일을 제공합니다. 방금 거대한 결과 파일에 headtail을 사용했으며 완전히 주문한 것처럼 보입니다. 그의 의견에 명시된 MarioS는로

+0

다시 분할하지 마십시오. 여러 개의 파일을 만든 다음 올바른 순서로 정렬해야합니다. – marios

답변

5

, 당신은 정렬 작업 후 다시 분할된다.

다시 파티션하기 : RDD의 데이터를 무작위로 재구성하여 더 많거나 적은 수의 파티션을 만들고 그 파티션간에 균형을 맞 춥니 다. 이것은 항상 네트워크를 통해 모든 데이터를 섞습니다.

병합 상태에서 병합 및 셔플을 사용하여 데이터를 재배포합니다. [Reference]

따라서 데이터가 더 이상 정렬되지 않습니다! 분할 횟수가 분할은 RDD에 파티션의 수를 줄이기 위해 1

감소 수단하여 예 1로 설정되어 있기 때문에

1

, 스파크 변환 병합 제공 (셔플 = 거짓) 주문을 보존합니다.

eliasah로서, coalesce를 사용하여 후드 아래의 파티션을 다시 언급했다. shuffle = true와 함께 coalesce를 호출합니다. 따라서 병합 변환은 shuffle = false로 다시 분할하는 대신 사용될 수 있습니다.

관련 문제