2016-08-24 5 views
2

내가 하이브에서 쿼리를 실행하려고 여기아파치 스파크 - 하이브 내부에 가입, LIMIT 및 사용자 정의 UDF

입니다 간단한 설정 (나는이 작업을 수행 할 수 있습니다 알고 =하지만 메신저 이상의 기능을 사용자 정의 UDF를 사용하여 데이터 세트 A 및 B는 약 30,000이다 행

보다 단순한 항등 비교) 각각

SELECT * FROM a INNER JOIN b ON Custom_UDF_Equals_Comparison(a.id, b.id) LIMIT 5 custom_UDF_Equals_Comparison 단순히 a.id = b.id

,536간에 동등 검사를 수행

이 쿼리를 실행하면 로그 출력에서 ​​m/r 작업이 많이 실행되고 있음을 알 수 있습니다. 가능한 모든 순열이 비교 될 때까지 두 데이터 집합을 비교하여 가정합니다. 대부분의 데이터가 각 테이블의 처음 몇 행에 참여할 수 있음을 알고 소수의 m/r 작업 만 수행하면됩니다. 왜 이런 일이 발생합니까? 어떻게 해결할 수 있습니까?

편집 : 비교를 위해 UDF를 사용하는 경우 2 RDD의 사이에 전체 비교를 수행하는 이유

안녕하세요 zero323, 즉 비슷한 질문이지만 정확한되지, 그것을 설명하지만, LIMIT가 중지되지 않습니다 이유를 설명 나던 한계가 5 일 때 비교. 예를 들어 처음 10 회의 조인 시도에서 5 개의 행이 발견되면 나머지 30,000 * 30,000 번의 시도가 필요한 이유는 무엇입니까? 모든 조인이 발생한 후에도 제한이 적용된다는 사실 때문입니까? 예 : 30,000 * 30,000 행을 합친 다음 5로 줄입니다.

EDIT2 : join_views ("Fuzzy_String", "0.1", a.col1, b.col1) LIMIT 5

def levenshtein(str1: String, str2: String): Int = { 
val lenStr1 = str1.length 
val lenStr2 = str2.length 

val d: Array[Array[Int]] = Array.ofDim(lenStr1 + 1, lenStr2 + 1) 

for (i <- 0 to lenStr1) d(i)(0) = i 
for (j <- 0 to lenStr2) d(0)(j) = j 

for (i <- 1 to lenStr1; j <- 1 to lenStr2) { 
    val cost = if (str1(i - 1) == str2(j-1)) 0 else 1 

    d(i)(j) = min(
    d(i-1)(j ) + 1,  // deletion 
    d(i )(j-1) + 1,  // insertion 
    d(i-1)(j-1) + cost // substitution 
) 
} 

d(lenStr1)(lenStr2) 

}

def min(nums: Int*): Int = nums.min 

def join_views(joinType: String, parameters: Any, col1: Any, col2: Any) : Boolean = { 
if (joinType == "Equals") { 
    if (col1 == null || col2 == null) { 
    return false 
    } 

    return col1 == col2 
} 
else if (joinType == "Fuzzy_String") { 
    if (col1 == null || col2 == null) { 
    return false 
    } 

    val val1 = col1.asInstanceOf[String] 
    val val2 = col2.asInstanceOf[String] 

    val ratio = Utils.distancePercentage(val1, val2) 

    if (ratio == 1.0) { 
    return val1 == val2 
    } 

    return (ratio >= parameters.asInstanceOf[Double]) 
} 

return false; 

}

... = 20 초

... ON join_views ("Fuzzy_String", "0.9", a.col1, b.col1) LIMIT 5 = 100 초

,
+0

가 폐쇄 머물 수, 실제로 당혹 무언가를 발견 도움 –

+0

주셔서 감사합니다, 내 custom_UDF 또한 퍼지 검사를 수행하고, 내가 일치해야합니다 0.1 퍼지 값 것을 실행할 때, 그것은 매우 빠르게 돌아갑니다 조인 결과 (예 : 5 행이 매우 빠르며 반환됩니다), 0.9로 설정하면 UDF의 원본 = 비교와 유사하게됩니다. 나는 0.1의 퍼지 매치가 왜 더 빨리 돌아갈 지 궁금하다. 반드시 질문 일 필요는 없습니다. 단지 관측 –

+0

이것은 실제로 흥미 롭습니다. 구현 세부 사항을 공유 할 수 있습니까? – zero323

답변

1

그래서 여기에 세 가지 문제가 있습니다

  • 스파크가 최적화 그래서 이러한 최적화는 단지에 적용 할 수있는 해시를 사용하여 정렬하여 조인 동등 조인. UDF에 따라 다른 유형의 조인을 포함하여 다른 유형의 조인에서는 쌍으로 된 비교가 필요하므로 카디 전 곱이 필요합니다. 자세한 내용은 Why using a UDF in a SQL query leads to cartesian product?을 확인하십시오.
  • 데이터 이동 후 제한 작업, 특히 셔플을 완전히 최적화 할 수 없습니다. Sun Rui에 의해 제공되는 nice answer ~ Towards limiting the big RDD에서 좋은 설명을 찾을 수 있습니다.

    셔플이 부족하여 역설적으로 단순하지만 함께 파티션을 가져와야합니다.

  • 한계 최적화는 레코드가 아닌 파티션을 기반으로합니다. 첫 번째 파티션을 검사하는 Spark가 시작되고 조건을 충족하는 요소 수가 필요한 것보다 낮 으면 반복되는 각 파티션에 대해 증가하는 파티션 수를 반복합니다 (이 요소가 4라는 것을 기억하는 한). 희귀 한 사건을 찾고 있다면 꽤 빨리 증가 할 수 있습니다.