2017-05-16 2 views
4

내가 아래 샘플 조각 (아니 정확히 같은 코드)으로있는 스파크 응용 프로그램을 구현하고있다 :캐시와 지속성은 (동작처럼 보이지 않으므로) 언제 실행됩니까?

val rdd1 = sc.textfile(HDFS_PATH) 
val rdd2 = rdd1.map(func) 
rdd2.persist(StorageLevel.MEMORY_AND_DISK) 
println(rdd2.count) 

스파크 UI에서이 코드의 성능을 확인하는 일을, 나는 count에 대한 항목을 참조 하지만 persist에는 해당되지 않습니다. 이 카운트 동작에 대한 DAG에는 '지도'변환을위한 노드도 있습니다 (위 코드의 2 번째 줄).

count (마지막 행)이 발생했을 때지도 변환이 실행되고 persist이 발생하지 않았다고 결론 지어도 안전한가요?

또한 어떤 시점에서 실제로 rdd2가 지속됩니까? RDD에서는 변환과 작업이라는 두 가지 유형의 작업 만 호출 할 수 있다는 것을 알고 있습니다. count 동작이 호출 될 때 RDD가 지연 지속되면 변환 또는 동작으로 간주 될 것인가?

답변

1

마지막 행에서 count가 발생할 때 맵 변형이 실행되고 지속성이 발생할 때가 아니라고 결론 내릴 수 있습니까?

또한

, 어느 시점에서 rdd2 실제로 유지됩니다?

데이터 판독 매핑 및 변환 또는 동작 또는도 간주 될 것이다 영속화 카운트 문

실행 중에 한번에 지속 되는가?

실제로는 아니지만 처리 작업의 측면에서 보면 변환과 같은 것으로 간주 할 수 있습니다. 스파크는 게으르며 결과를 요구할 때만 작동합니다. 데이터 프레임을 지속 할 때 결과가 필요하지 않으므로 Spark는 아무런 작업도 수행하지 않습니다. 그런 식으로 persist은 변형과 같습니다

7

데이터 집합의 cachepersist 연산자는 지연이 있으며 작업을 호출 할 때까지 효과가 없습니다 (캐싱이 완료 될 때까지 기다렸다가 나중에 성능이 향상되는 추가 비용 임).

스파크의 공식 문서 RDD Persistence에서 (굵게 내에서 문장과) : 스파크에서 가장 중요한 기능의

가 지속 (또는 캐시)되어 작업을 통해 메모리에 데이터 집합. RDD를 지속하면 각 노드는 메모리에 계산 된 모든 파티션을 저장하고 해당 데이터 세트 (또는 그로부터 파생 된 데이터 세트)의 다른 작업에서 다시 사용합니다. 이렇게하면 향후 작업을 훨씬 빠르게 (종종 10 배 이상) 수행 할 수 있습니다. 캐싱은 반복 알고리즘 및 빠른 대화식 사용을위한 핵심 도구입니다.

persist() 또는 cache() 방법을 사용하여 지속성을 유지하도록 RDD를 표시 할 수 있습니다. 액션에서 처음 계산되면 노드의 메모리에 유지됩니다. Spark의 캐시는 내결함성이 있습니다. RDD의 파티션이 손실되면 원래 만든 변환을 사용하여 자동으로 다시 계산됩니다. (! 및 SQL 자체 불꽃) :

rdd2.persist(StorageLevel.MEMORY_AND_DISK).count 

캐싱을 유발하는 어떤 사람들은 이유가 정확히

는 다음 트릭을 할.

count 연산자가 상당히 저렴하므로 캐시가 비동기 적으로 실행되기 전에 캐싱이 완료되기 전에 캐싱이 거의 즉시 실행됩니다.

당신은 질문 그래서 때 persist 변환 또는 행동이나도 간주 될 수

?

나는 그것이 어느 쪽의 말을하고 그것을 최적화 힌트를 (즉, 실행 또는 계정 로 촬영되지 않을 수 있습니다 또는) 생각 하는데요.


웹 UI의 저장소 탭을 사용하여 기본 RDD 인 데이터 집합이 이미 보존 된 것을 확인하십시오.

enter image description here

또한 cache 또는 persist 사업자의 출력은 explain (또는 단순히 QueryExecution.optimizedPlan)를 사용하여 볼 수 있습니다. (InMemoryRelation 논리적 계획)

val q1 = spark.range(10).groupBy('id % 5).count.cache 
scala> q1.explain 
== Physical Plan == 
InMemoryTableScan [(id % 5)#84L, count#83L] 
    +- InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
     +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)]) 
      +- Exchange hashpartitioning((id#77L % 5)#88L, 200) 
       +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)]) 
        +- *Range (0, 10, step=1, splits=8) 

scala> println(q1.queryExecution.optimizedPlan.numberedTreeString) 
00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
01 +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L]) 
02  +- Exchange hashpartitioning((id#77L % 5)#88L, 200) 
03   +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L]) 
04    +- *Range (0, 10, step=1, splits=8) 

// Cache sample table range5 using pure SQL 
// That registers range5 to contain the output of range(5) function 
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)") 
val q2 = spark.sql("SELECT * FROM range5") 
scala> q2.explain 
== Physical Plan == 
InMemoryTableScan [id#0L] 
    +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5` 
     +- *Range (0, 5, step=1, splits=8) 

InMemoryTableScan 물리 연산자는 쿼리가 인 메모리 캐시 따라서 재사용되어 있는지 확인 할 수있는 방법입니다.


또한 스파크 SQL 자체 SQL의 CACHE TABLE query (RDD 캐싱 달리 기본 열망에 의해 인) 미국 DataFrame 캐싱을 트리거하기 위해 동일한 패턴을 사용

if (!isLazy) { 
    // Performs eager caching 
    sparkSession.table(tableIdent).count() 
} 

수단이 조작자에 따라 캐싱에 관한 한 다른 결과가 나타날 수 있습니다. cachepersist 연산자는 기본적으로 게으르며 SQL의 CACHE TABLE은 열의입니다.

관련 문제