2014-12-10 3 views
1

내 테이블 [col1, col2, key1, col3, txn_id, dw_last_updated]에 아래 열이 있습니다. 이 txn_id 중에서 key1은 기본 키 열입니다. 내 데이터 세트에서 (txn_id, key) 조합에 대해 여러 레코드를 가질 수 있습니다. 그 기록들 중에서 dw_last_updated를 기반으로 최신 하나를 골라야합니다.Apache SPARK GroupByKey alternate

나는 이것을 논리로 사용하고 있습니다. 나는 지속적으로 메모리 문제에 부딪 히고 있으며 groupByKey() 때문에 부분적으로는 믿습니다 ... 더 좋은 대안이 있습니까?

case class Fact(col1: Int, 
    col2: Int, 
    key1: String, 
    col3: Int, 
    txn_id: Double, 
    dw_last_updated: Long) 

sc.textFile(s3path).map { row => 
      val parts = row.split("\t") 
      Fact(parts(0).toInt, 
      parts(1).toInt, 
      parts(2), 
      parts(3).toInt, 
      parts(4).toDouble, 
      parts(5).toLong) 
     }).map { t => ((t.txn_id, t.key1), t) }.groupByKey(512).map { 
      case ((txn_id, key1), sequence) => 
      val newrecord = sequence.maxBy { 
       case Fact_Cp(col1, col2, key1, col3, txn_id, dw_last_updated) => dw_last_updated.toLong 
      } 
      (newrecord.col1 + "\t" + newrecord.col2 + "\t" + newrecord.key1 + 
       "\t" + newrecord.col3 + "\t" + newrecord.txn_id + "\t" + newrecord.dw_last_updated) 
     } 

의견/제안을 감사합니다 ...

+0

나에게 좋을 것 같은데, 두렵다. 더 많은 파티션을 시도 할 수 있습니다. 하지만 아마도 더 많은 기계가 필요할 것입니다. –

+0

아직 해결되지 않았습니까? – maasg

답변

1

rdd.groupByKey는 단일 노드의 키 값의 순서를 유지하는 데 필요한 메모리를 필요로하는, 키에 따라 모든 값을 수집합니다. 그 사용은 권장하지 않습니다. [1]을 참조하십시오.

max (dw_last_updated)보다 키당 1 개의 값에만 관심이 있다면 더 많은 메모리 효율적인 방법은 rdd.reduceByKey을 사용하는 것입니다. 여기에서 reduce 함수는 동일한 키에 대한 두 레코드의 최대 값을 선택하는 것입니다. 그 타임 스탬프를 판별 자로 사용합니다.

rdd.reduceByKey{case (record1,record2) => max(record1, record2)} 

이 사건에 적용, 그것은 다음과 같이한다 : 나는 Fact 동반자 개체에 유틸리티 작업을 정의한

case class Fact(...) 
object Fact { 
    def parse(s:String):Fact = ??? 
    def maxByTs(f1:Fact, f2:Fact):Fact = if (f1.dw_last_updated.toLong > f2.dw_last_updated.toLong) f1 else f2 
} 
val factById = sc.textFile(s3path).map{row => val fact = Fact.parse(row); ((fact.txn_id, fact.key1),fact)} 
val maxFactById = factById.reduceByKey(Fact.maxByTs) 

참고 깔끔한 코드를 유지하기 위해. 또한 각 변형 단계 또는 논리적 단계 그룹에 명명 된 변수를 제공하라는 조언을 제공합니다. 프로그램을 더 읽기 쉽게 만듭니다.

+0

대답을 자세히 설명해 주시겠습니까? 어떻게해야합니까 (record1, record2)? – user3279189

+0

추가 된 예를 살펴보십시오. 그러나 그것에 익숙해지는 것은 아닙니다 :-) – maasg