2016-12-09 3 views
1

거대한 RDD (소스)가 있고 그 중 BloomFilter 데이터를 만들어야하므로 사용자 데이터의 후속 업데이트는 중복이없는 진정한 "diffs"만 고려합니다. 블룸 필터의 구현의 대부분이 비 직렬화처럼Spark and BloomFilter sharing

(즉 쉽게하지만 고정 할 수 있습니다) 같은데,하지만 난 약간 다른 워크 플로우 원하는 :

  1. 프로세스의 모든 파티션을 적절한 블룸 필터의 인스턴스를 생성 각 파티션에 대해 BloomFilter 객체 각각에 대해 - 어딘가에 바이너리 파일에 작성하십시오. 실제로 전체 파티션을 처리하는 방법을 모르겠습니다. mapPartition RDD에서 사용할 수있는 기능이 있지만 반복기를 반환 할 것으로 예상됩니다. 아마도 전달 된 반복자를 소비하고 BloomFilter의 인스턴스를 만들고 어딘가에 작성하고 링크를 생성 파일에 Iterator.singleton[PathToFile]으로 반환할까요?
  2. 마스터 노드에서 - consume 처리 결과 (File에 대한 경로 목록)는 해당 파일을 읽고 메모리에 BloomFilters를 집계합니다. 그런 다음 이진 파일에 응답을 작성하십시오.

나는 올바른 방법을 알고하지 않습니다

  • 가 전달되는 함수 내에서 (이 HDFS, S3N 또는 로컬 파일 수) 클러스터에서 지원되는 FS에서 파일을 만듭니다 ~ mapPartitions
  • 두 번째 단계에서 파일의 내용을 consume (파일 경로가있는 RDD가 있고 읽을 때 SparkContext을 사용하면 읽을 수있는 방법을 모르지만) 내용을 읽습니다.

감사합니다.

답변

1

breeze 구현은 가장 빠른 아니지만 평소 스파크 종속성 함께 제공하고 simple aggregate 사용할 수 있습니다 :

import breeze.util.BloomFilter 

// Adjust values to fit your case 
val numBuckets: Int = 100 
val numHashFunctions: Int = 30 

val rdd = sc.parallelize(Seq("a", "d", "f", "e", "g", "j", "z", "k"), 4) 
val bf = rdd.aggregate(new BloomFilter[String](numBuckets, numHashFunctions))(
    _ += _, _ |= _ 
) 

bf.contains("a") 
Boolean = true 
bf.contains("n") 
스파크에서
Boolean = false 

가 2.0 이상이 DataFrameStatFunctions.bloomFilter을 사용할 수 있습니다 :

val df = rdd.toDF 

val expectedNumItems: Long = 1000 
val fpp: Double = 0.005 

val sbf = df.stat.bloomFilter($"value", expectedNumItems, fpp) 

sbf.mightContain("a") 
Boolean = true 
sbf.mightContain("n") 
Boolean = false 

Algebird 구현은 잘 작동하고 breeze 구현과 유사하게 사용할 수 있습니다.