2016-09-28 2 views
1

RDD의 키로 Array [Byte]를 사용하고 싶습니다. 예를 들어 :RDD에서 바이트 배열을 키로 사용하는 방법은 무엇입니까?

val rdd1:RDD[((Array[Byte]), (String, Int)] = from src rdd 
val rdd2:RDD[((Array[Byte]), (String, Int)] = from dest rdd 

val resultRdd = rdd1.join(rdd2) 
내가 키 으로 배열 [바이트]을 사용하여 rdd1 및 rdd2에 조인 작업을 수행 할

하지만 항상 받고 resultRdd.count() =

0 그래서 나는 배열을 직렬화하려고 [Byte] 그리고 작은 Dataset에 대해서만보고 싶으면 잘 작동합니다. java.lang.OutOfMemoryError와 점점 큰 데이터 세트 용

val serRdd1= rdd1.map { case (k,v) => (new SerByteArr(k), v) } 
val serRdd2= rdd2.map { case (k,v) => (new SerByteArr(k), v) } 

class SerByteArr(val bytes: Array[Byte]) extends Serializable { 
    override val hashCode = bytes.deep.hashCode 
    override def equals(obj:Any) = obj.isInstanceOf[SerByteArr] && obj.asInstanceOf[SerByteArr].bytes.deep == this.bytes.deep 
} 

: GC 오버 헤드 제한, 초과 문제는 객체 (새 SerByteArr (K))를 만드는로 발생된다.

GC 제한 초과 오류를 방지하는 방법. 누구가 나를 도울까요?

+1

배열의 크기는 어느 정도입니까? 그것들을 해시로 대체하는 것이 합리적일까요? – maasg

답변

1

배열에 내장 된 스칼라 래퍼 WrappedArray[Byte]을 사용할 수 있습니다. 배열은 toSeq 메서드를 사용하여 WrappedArray로 변환 할 수 있습니다. WrappedArray는 equalshashCode을 올바르게 구현 했으므로 같은 요소를 가진 두 개의 다른 배열은 동등한 것으로 간주됩니다.

scala> val a = Array(1,2,3,4,5) 
a: Array[Int] = Array(1, 2, 3, 4, 5) 

scala> val b = Array(1,2,3,4,5) 
b: Array[Int] = Array(1, 2, 3, 4, 5) 

scala> a == b 
res0: Boolean = false 

scala> a.toSeq 
res1: Seq[Int] = WrappedArray(1, 2, 3, 4, 5) 

scala> a.toSeq == b.toSeq 
res2: Boolean = true 
관련 문제