데이터 집합의 cache
과 persist
연산자는 지연이 있으며 작업을 호출 할 때까지 효과가 없습니다 (캐싱이 완료 될 때까지 기다렸다가 나중에 성능이 향상되는 추가 비용 임).
스파크의 공식 문서 RDD Persistence에서 (굵게 내에서 문장과) : 스파크에서 가장 중요한 기능의
한
가 지속 (또는 캐시)되어 작업을 통해 메모리에 데이터 집합. RDD를 지속하면 각 노드는 메모리에 계산 된 모든 파티션을 저장하고 해당 데이터 세트 (또는 그로부터 파생 된 데이터 세트)의 다른 작업에서 다시 사용합니다. 이렇게하면 향후 작업을 훨씬 빠르게 (종종 10 배 이상) 수행 할 수 있습니다. 캐싱은 반복 알고리즘 및 빠른 대화식 사용을위한 핵심 도구입니다.
persist()
또는 cache()
방법을 사용하여 지속성을 유지하도록 RDD를 표시 할 수 있습니다. 액션에서 처음 계산되면 노드의 메모리에 유지됩니다. Spark의 캐시는 내결함성이 있습니다. RDD의 파티션이 손실되면 원래 만든 변환을 사용하여 자동으로 다시 계산됩니다. (! 및 SQL 자체 불꽃) :
rdd2.persist(StorageLevel.MEMORY_AND_DISK).count
캐싱을 유발하는 어떤 사람들은 이유가 정확히
는 다음 트릭을 할.
count
연산자가 상당히 저렴하므로 캐시가 비동기 적으로 실행되기 전에 캐싱이 완료되기 전에 캐싱이 거의 즉시 실행됩니다.
당신은 질문 그래서 때 persist
변환 또는 행동이나도 간주 될 수
?
나는 그것이 어느 쪽의 말을하고 그것을 최적화 힌트를 (즉, 실행 또는 계정 적로 촬영되지 않을 수 있습니다 또는) 생각 하는데요.
웹 UI의 저장소 탭을 사용하여 기본 RDD 인 데이터 집합이 이미 보존 된 것을 확인하십시오.
또한 cache
또는 persist
사업자의 출력은 explain
(또는 단순히 QueryExecution.optimizedPlan
)를 사용하여 볼 수 있습니다. (InMemoryRelation
논리적 계획)
val q1 = spark.range(10).groupBy('id % 5).count.cache
scala> q1.explain
== Physical Plan ==
InMemoryTableScan [(id % 5)#84L, count#83L]
+- InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)])
+- Exchange hashpartitioning((id#77L % 5)#88L, 200)
+- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)])
+- *Range (0, 10, step=1, splits=8)
scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
01 +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L])
02 +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
03 +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L])
04 +- *Range (0, 10, step=1, splits=8)
// Cache sample table range5 using pure SQL
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q2 = spark.sql("SELECT * FROM range5")
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#0L]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
+- *Range (0, 5, step=1, splits=8)
InMemoryTableScan
물리 연산자는 쿼리가 인 메모리 캐시 따라서 재사용되어 있는지 확인 할 수있는 방법입니다.
또한 스파크 SQL 자체 SQL의 CACHE TABLE query (RDD 캐싱 달리 기본 열망에 의해 인) 미국 DataFrame 캐싱을 트리거하기 위해 동일한 패턴을 사용
if (!isLazy) {
// Performs eager caching
sparkSession.table(tableIdent).count()
}
수단이 조작자에 따라 캐싱에 관한 한 다른 결과가 나타날 수 있습니다. cache
및 persist
연산자는 기본적으로 게으르며 SQL의 CACHE TABLE
은 열의입니다.