spark.sql과 결합하려는 두 개의 큰 하이브 테이블이 있습니다. 표 1에 500 만 개의 행이 있고 표 2에 7 천만 개의 행이있는 표 1과 표 2가 있다고 가정 해 보겠습니다. 표는 기이 한 형식으로 하이브에 마루 파일로 저장되어 있습니다.두 개의 테이블 조인에서 스파크 성능 문제가 발생했습니다.
두 개의 조건으로 필터링하는 동안 모든 열과 평균 (예 : doubleColumn)의 수를 계산할 수 있도록 일부 열에서 일부 집계를 가져오고 싶습니다 (col1, col2에서 말하기).
참고 : 나는 하나의 컴퓨터에서 테스트 설치를 진행합니다 (매우 강력합니다). 클러스터에서 성능이 다를 것으로 예상됩니다.
내 첫 번째 시도는 같은 스파크 SQL을 사용하는 것입니다
val stat = sqlContext.sql("select count(id), avg(doubleColumn) " +
" FROM db.table1 as t1 JOIN db.table2 " +
" ON t1.id = t2.id " +
" WHERE col1 = val1 AND col2 = val2").collect
불행하게도이 실행 매우 가난 오분에 대해 내가 집행 및 드라이버 당 최소 8 GB 메모리를 제공하는 경우에도. 또한 같은 더 나은 선택하도록 dataframe 구문을 사용하고 먼저 행을 필터링하려고 특정 열을 선택하려고 :
//Filter first and select only needed column
val df = spark.sql("SELECT * FROM db.tab1")
val tab1= df.filter($"col1" === "val1" && $"col2" === "val2").select("id")
val tab2= spark.sql("SELECT id, doubleColumn FROM db.tab2")
val joined = tab1.as("d1").join(tab2.as("d2"), $"d1.id" === $"d2.id")
//Take the aggregations on the joined df
import org.apache.spark.sql.functions;
joined.agg(
functions.count("id").as("count"),
functions.avg("doubleColumn").as("average")
).show();
을하지만이 유의 한 성능 이득이 없습니다. 조인의 성능을 어떻게 향상시킬 수 있습니까?
이 spark.sql 또는 데이터 프레임 구문을 수행하는 가장 좋은 방법은 무엇입니까?
더 많은 실행 프로그램이나 메모리를 제공하면 도움이 될 것입니다.
캐시를 사용해야합니까?
데이터 프레임 tab1, tab2 및 조인 집계를 모두 캐시했는데 많은 이득을 얻었지만 많은 분석 쿼리를 동시에 요구하는 동시성에 관심이 많아서 데이터 프레임을 캐시하는 것이 실용적이지 않다고 생각합니다.단일 노드에서 작업하고 클러스터의 프로덕션 환경으로 이동하면 문제가 해결되지 않으므로 아무 것도 할 수 없습니까?
보너스 질문 : 나는 임팔라이 쿼리를 시도하고 약 40 초했지만 그것은 spark.sql보다 훨씬 더 나았다된다. 임팔라는 어떻게 불꽃을 피우는 것보다 낫을 수 있습니까?!
단일 노드에 몇 개의 코어가 있습니까? –
나는 10을 시도했다 - 얼마나 사용해야합니까? –