2017-11-30 1 views
1

spark.sql과 결합하려는 두 개의 큰 하이브 테이블이 있습니다. 표 1에 500 만 개의 행이 있고 표 2에 7 천만 개의 행이있는 표 1과 표 2가 있다고 가정 해 보겠습니다. 표는 기이 한 형식으로 하이브에 마루 파일로 저장되어 있습니다.두 개의 테이블 조인에서 스파크 성능 문제가 발생했습니다.

두 개의 조건으로 필터링하는 동안 모든 열과 평균 (예 : doubleColumn)의 수를 계산할 수 있도록 일부 열에서 일부 집계를 가져오고 싶습니다 (col1, col2에서 말하기).

참고 : 나는 하나의 컴퓨터에서 테스트 설치를 진행합니다 (매우 강력합니다). 클러스터에서 성능이 다를 것으로 예상됩니다.

내 첫 번째 시도는 같은 스파크 SQL을 사용하는 것입니다

val stat = sqlContext.sql("select count(id), avg(doubleColumn) " + 
           " FROM db.table1 as t1 JOIN db.table2 " + 
           " ON t1.id = t2.id " + 
           " WHERE col1 = val1 AND col2 = val2").collect 

불행하게도이 실행 매우 가난 오분에 대해 내가 집행 및 드라이버 당 최소 8 GB 메모리를 제공하는 경우에도. 또한 같은 더 나은 선택하도록 dataframe 구문을 사용하고 먼저 행을 필터링하려고 특정 열을 선택하려고 :

//Filter first and select only needed column 
val df = spark.sql("SELECT * FROM db.tab1") 
val tab1= df.filter($"col1" === "val1" && $"col2" === "val2").select("id") 

val tab2= spark.sql("SELECT id, doubleColumn FROM db.tab2") 
val joined = tab1.as("d1").join(tab2.as("d2"), $"d1.id" === $"d2.id") 

//Take the aggregations on the joined df 
import org.apache.spark.sql.functions; 

joined.agg(
    functions.count("id").as("count"), 
    functions.avg("doubleColumn").as("average") 
).show(); 

을하지만이 유의 한 성능 이득이 없습니다. 조인의 성능을 어떻게 향상시킬 수 있습니까?

  • 이 spark.sql 또는 데이터 프레임 구문을 수행하는 가장 좋은 방법은 무엇입니까?

  • 더 많은 실행 프로그램이나 메모리를 제공하면 도움이 될 것입니다.

  • 캐시를 사용해야합니까?
    데이터 프레임 tab1, tab2 및 조인 집계를 모두 캐시했는데 많은 이득을 얻었지만 많은 분석 쿼리를 동시에 요구하는 동시성에 관심이 많아서 데이터 프레임을 캐시하는 것이 실용적이지 않다고 생각합니다.

  • 단일 노드에서 작업하고 클러스터의 프로덕션 환경으로 이동하면 문제가 해결되지 않으므로 아무 것도 할 수 없습니까?

보너스 질문 : 나는 임팔라이 쿼리를 시도하고 약 40 초했지만 그것은 spark.sql보다 훨씬 더 나았다된다. 임팔라는 어떻게 불꽃을 피우는 것보다 낫을 수 있습니까?!

+0

단일 노드에 몇 개의 코어가 있습니까? –

+0

나는 10을 시도했다 - 얼마나 사용해야합니까? –

답변

2

이 spark.sql 또는 데이터 프레임 구문을 수행하는 가장 좋은 방법은 무엇입니까?

아무런 차이가 없습니다.

더 많은 실행 프로그램이나 메모리를 제공하면 도움이 될 것입니다.

데이터 왜곡으로 인해 문제가 발생하지 않고 구성을 올바르게 조정 한 경우에만.

캐시를 사용해야합니까?

입력 데이터가 여러 번 재사용되는 경우 성능면에서 (이미 결정한대로) 권장 할 만합니다.

단일 노드에서 작업하고 클러스터의 프로덕션 환경으로 이동하면 문제가 해결되지 않으므로 아무 것도 할 수 없습니까?

일반적으로 단일 노드에서의 성능 테스트는 완전히 쓸모가 없습니다. 병목 현상 (네트워크 IO/통신)과 장점 (디스크 I/O 및 자원 사용량 상환)을 놓치게됩니다.

그러나 parallelsm (spark.sql.shuffle.partitions, sql.default.parallelism 및 입력 분할 크기 증가)을 크게 줄일 수 있습니다. Counterintuitiv 부하 분산을 위해 설계된 스파크 스타일 병렬 처리는 자산보다 단일 시스템에서 더 많은 책임이 있습니다. 그것은 공유 메모리에 비해 매우 느린 통신을하기위한 셔플 (디스크 쓰기!)에 달려 있으며 스케줄링 오버 헤드가 중요합니다.

어떻게 임팔라가 불꽃보다 더 뛰어날 수 있습니까?!

낮은 지연 동시 쿼리를 위해 특별히 설계 되었기 때문에. Spark의 목표였던 것은 아닙니다 (데이터베이스 대 ETL 프레임 워크).

우리가 동시성에 관심이있는 당신

는 많은 사용자가 동시에 몇 가지 분석 질의를 요구한다.

스파크는 올바른 선택으로 들리지 않습니다.

+0

spark.sql.shuffle.partitions, sql.default.parallelism 구성 옵션에 대해 좀 더 자세히 설명 할 수 있습니까? –

1

구성을 변경할 수 있으므로 큰 클러스터에서 구성을 변경해야합니다. 나는 곧 두 가지를 생각할 수있다. spark.executor.cores을 5로 설정하고 메모리에 따라 spark.executor.instancesspark.executor.memory을 사용하여 더 많은 실행 프로그램과 메모리를 제공하십시오. 또한 하이브 테이블을 버켓으로 정렬 할 수 있습니까? 테이블을 버켓에 넣으면 테이블을 정렬하기 전에 테이블을 정렬 할 필요가 없어집니다.

촉매가 집계 쿼리를 처리하는 방식에 따라 조인 후에 데이터 프레임을 캐시하면 더 빠를 수도 있습니다. 쿼리가 끝난 후에도 unpersist()이 될 수 있지만 GC가 그만한 가치가 없다는 것에 동의합니다.

SQL 또는 scala dsl을 사용하면 어떤 이점도 나타나지 않습니다. 둘 다 전 단계 코드 생성을 사용하므로 본질적으로 동일합니다.

임팔라가 항상 더 빠르다는 이유 중 하나는 복제에 대해 걱정할 필요가 없기 때문입니다.하지만 한 노드는 그다지 신경 쓰지 않아야하지만 복제를 위해 데이터를 사전에 알리는 것과 뒤로 젖히다.

관련 문제