2017-01-02 1 views
2

Spark 1.6.2, Scala 2.10.5 및 Java 1.7을 사용하고 있습니다.Spark dense_rank 윈도우 함수 - partitionBy 절없이

우리는 partitionBy 절을 사용하지 않고 2 억 개가 넘는 행의 데이터 세트에서 dense_rank()를 수행해야하며 orderBy 절만 사용합니다. 이것은 현재 MSSQL에서 실행되며 완료하는 데 약 30 분이 걸립니다. 아래 그림과 같이

나는 불꽃에 해당하는 로직을 구현 한 : 아래 그림과 같이

val df1 = hqlContext.read.format("jdbc").options(
    Map("url" -> url, "driver" -> driver, 
    "dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load() 

df1.cache() 

val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId"))) 

나는 원사 클러스터 모드에서 작업을 제출하고있다. 나는 2 노드의 Hadoop 2.6 클러스터를 가지고 있으며, 각각은 4 개의 vCores와 32GB의 메모리를 가지고있다. 로그에서

spark-submit --class com.spgmi.csd.OshpStdCarryOver --master yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar 

, 나는 MSSQL에서 약 200 만 달러 행의 테이블이 & 15 분에 스파크에 캐시 수입지고 있음을 볼 수 있습니다. 이 단계까지는 약 5GB의 메모리가 사용되고 있고 집행자 중 한 명에게는 약 6.2GB의 메모리가 여전히 무료이며 다른 집행자에게는 11GB의 메모리가 무료라는 것을 알았습니다.

그러나 dense_rank()의 ​​단계는 몇 분 후에 "GC Overhead limit exceeded"오류로 인해 항상 실패합니다. 위의 spark-submit 명령에서 알 수 있듯이 드라이버 메모리를 7g 정도로 높게 설정했습니다. 그러나, 아무 소용 없다!. 물론, PartyBy 절의 부족이 실제로 Spark에서 문제를 일으키는 것으로 알고 있습니다. 하지만 유감스럽게도 그것이 우리가 처리해야 할 유스 케이스입니다.

여기에 더 많은 빛을 비출 수 있습니까? 내가 놓친 게 있니? Spark에서 dense_rank 창 함수를 사용하는 다른 방법이 있습니까? 예를 들어이 포럼의 다른 전문가 중 한 사람이 제안한 "zipWithIndex"함수를 사용하는 경우처럼? dense_rank와 반대로 "zipWithIndex"메소드가 row_number() 함수를 복제한다는 점을 이해하면 dense_rank와 동일한 결과를 얻을 수 있습니까?

유용한 조언을 보내 주시면 감사하겠습니다. 감사합니다.

답변

2

여기에 두 가지 문제가 있습니다 : 분할 열 또는 파티션 조건을 제공하지 않고 JDBC 연결을 통해

  • 당신 부하 데이터. 단일 실행 프로그램 스레드를 사용하여 모든 데이터를로드합니다.

    이 문제는 대개 기존 열 중 하나를 사용하거나 인공 키를 제공하여 해결하기가 쉽습니다.

  • partitionBy없이 창 기능을 사용합니다. 결과적으로 모든 데이터는 단일 파티션으로 재구성되고 로컬로 정렬되며 단일 스레드를 사용하여 처리됩니다.

    • 이 필요 기록 순서를 반영 인공 파티션 생성 : 일반적으로

      Dataset API를 사용하지만, 몇 가지 트릭 사용할 수 있다는 것을 해결할 수있는 보편적 인 해결책이 없다. 내 대답에이 방법을 설명했습니다 Avoid performance impact of a single partition mode in Spark window functions

      비슷한 방법을 사용할 수도 있지만 여러 단계의 프로세스가 필요합니다 (아래 설명 된 것과 동일).당신이 분류 RDD (그것뿐만 아니라 Dataset에서 변환하지 않고 비슷한 일을 할 수 있어야한다) 및 추가 작업을 통해 두 개의 스캔을 사용할 수있는 연관 방법으로

    • : 각 파티션

      • 계산 부분 결과 (주어진 파티션에 대한 귀하의 경우 순위).
      • 필요한 요약을 수집하십시오 (각 파티션의 파티션 경계와 누적 된 순위 값).
      • 이전 스캔에서 부분 집계를 수정하려면 두 번째 스캔을 수행하십시오. 쉽게 귀하의 경우에 맞게 조정할 수 있습니다이 방법의

    한 예는, 당신의 제안을 How to compute cumulative sum using Spark

+0

덕분에 많이 찾을 수 있습니다! JDBC 데이터 소스의 "partitionColumn"옵션을 사용하여 MSSQL에서 데이터 가져 오기 시간을 단축 할 수있었습니다. 그러나 partitionBy가없는 고밀도 랭킹에 대한 권장 사항은 내가 스칼라를 처음 접했을 때 소화하기 위해 더 많은 시간이 필요합니다. 그러나, 나를 안내 해주셔서 너무 고마워! – Prash