2017-11-20 3 views
1

실 스파클 어플리케이션을 원사 클러스터에서 실행합니다. 내 코드에서 나는 내 데이터 세트에 파티션을 생성하기위한 큐의 수를 가능한 코어를 사용스파크 : 프로그래밍 방식으로 클러스터 코어 수를 얻으십시오.

Dataset ds = ... 
ds.coalesce(config.getNumberOfCores()); 

내 질문 : 어떻게 프로그래밍 방식으로 큐의 수를 가능한 코어를 얻을 수 없습니다 구성에 따라 할 수 있습니까?

+0

어떤 리소스 관리자를 사용하고 있습니까? yarn 또는 mesos –

+0

나는 원사를 사용하고 있습니다. – Rougher

+0

[yarn cluster API] (https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Scheduler_API)에서 필요한 대기열 매개 변수를 추출한 다음 병합에 사용합니다 –

답변

1

Spark에서 클러스터의 실행 프로그램 수와 코어 수를 모두 얻는 방법이 있습니다. 과거에 사용한 스칼라 유틸리티 코드는 다음과 같습니다. Java에 쉽게 적응할 수 있어야합니다.

  1. 근로자의 수는 집행을 뺀 또는 sc.getExecutorStorageStatus.length - 1의 수 : 두 가지 핵심 아이디어가있다.

  2. 작업자 당 java.lang.Runtime.getRuntime.availableProcessors을 실행하여 작업자 당 코어 수를 얻을 수 있습니다.

나머지 코드 스칼라 implicits를 사용 SparkContext 편의 수단을 추가 상투적이다. 1.x 년 전 코드를 작성 했으므로 SparkSession을 사용하지 않습니다.

하나의 마지막 점 : 왜곡 된 데이터의 경우 성능을 향상시킬 수 있으므로 여러 코어로 병합하는 것이 좋습니다. 실제로 데이터 크기와 작업이 공유 클러스터에서 실행 중인지 여부에 따라 1.5 배에서 4 배 사이의 아무 곳이나 사용합니다.

import org.apache.spark.SparkContext 

import scala.language.implicitConversions 


class RichSparkContext(val sc: SparkContext) { 

    def executorCount: Int = 
    sc.getExecutorStorageStatus.length - 1 // one is the driver 

    def coresPerExecutor: Int = 
    RichSparkContext.coresPerExecutor(sc) 

    def coreCount: Int = 
    executorCount * coresPerExecutor 

    def coreCount(coresPerExecutor: Int): Int = 
    executorCount * coresPerExecutor 

} 


object RichSparkContext { 

    trait Enrichment { 
    implicit def enrichMetadata(sc: SparkContext): RichSparkContext = 
     new RichSparkContext(sc) 
    } 

    object implicits extends Enrichment 

    private var _coresPerExecutor: Int = 0 

    def coresPerExecutor(sc: SparkContext): Int = 
    synchronized { 
     if (_coresPerExecutor == 0) 
     sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head 
     else _coresPerExecutor 
    } 

} 
+1

sc. getExecutorStorageStatus.length - 1은 나에게 좋다. 고맙습니다 – Rougher

관련 문제