Spark 1.6.2, Scala 2.10.5 및 Java 1.7을 사용하고 있습니다.Spark dense_rank 윈도우 함수 - partitionBy 절없이
우리는 partitionBy 절을 사용하지 않고 2 억 개가 넘는 행의 데이터 세트에서 dense_rank()를 수행해야하며 orderBy 절만 사용합니다. 이것은 현재 MSSQL에서 실행되며 완료하는 데 약 30 분이 걸립니다. 아래 그림과 같이
나는 불꽃에 해당하는 로직을 구현 한 : 아래 그림과 같이
val df1 = hqlContext.read.format("jdbc").options(
Map("url" -> url, "driver" -> driver,
"dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load()
df1.cache()
val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId")))
나는 원사 클러스터 모드에서 작업을 제출하고있다. 나는 2 노드의 Hadoop 2.6 클러스터를 가지고 있으며, 각각은 4 개의 vCores와 32GB의 메모리를 가지고있다. 로그에서
spark-submit --class com.spgmi.csd.OshpStdCarryOver --master yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar
, 나는 MSSQL에서 약 200 만 달러 행의 테이블이 & 15 분에 스파크에 캐시 수입지고 있음을 볼 수 있습니다. 이 단계까지는 약 5GB의 메모리가 사용되고 있고 집행자 중 한 명에게는 약 6.2GB의 메모리가 여전히 무료이며 다른 집행자에게는 11GB의 메모리가 무료라는 것을 알았습니다.
그러나 dense_rank()의 단계는 몇 분 후에 "GC Overhead limit exceeded"오류로 인해 항상 실패합니다. 위의 spark-submit 명령에서 알 수 있듯이 드라이버 메모리를 7g 정도로 높게 설정했습니다. 그러나, 아무 소용 없다!. 물론, PartyBy 절의 부족이 실제로 Spark에서 문제를 일으키는 것으로 알고 있습니다. 하지만 유감스럽게도 그것이 우리가 처리해야 할 유스 케이스입니다.
여기에 더 많은 빛을 비출 수 있습니까? 내가 놓친 게 있니? Spark에서 dense_rank 창 함수를 사용하는 다른 방법이 있습니까? 예를 들어이 포럼의 다른 전문가 중 한 사람이 제안한 "zipWithIndex"함수를 사용하는 경우처럼? dense_rank와 반대로 "zipWithIndex"메소드가 row_number() 함수를 복제한다는 점을 이해하면 dense_rank와 동일한 결과를 얻을 수 있습니까?
유용한 조언을 보내 주시면 감사하겠습니다. 감사합니다.
덕분에 많이 찾을 수 있습니다! JDBC 데이터 소스의 "partitionColumn"옵션을 사용하여 MSSQL에서 데이터 가져 오기 시간을 단축 할 수있었습니다. 그러나 partitionBy가없는 고밀도 랭킹에 대한 권장 사항은 내가 스칼라를 처음 접했을 때 소화하기 위해 더 많은 시간이 필요합니다. 그러나, 나를 안내 해주셔서 너무 고마워! – Prash