2016-10-27 3 views
3

의 범위이다에서 2 스파크 RDD 2 대형 RDD (각 이상 milion 기록을 가짐)가 있는지 그 값을 만들기 위해 비교, 제이다 :첫 번째 RDD

rdd1.txt(name,value):  
chr1 10016 
chr1 10017 
chr1 10018 
chr1 20026 
chr1 20036 
chr1 25016 
chr1 26026 
chr2 40016 
chr2 40116 
chr2 50016 
chr3 70016 

rdd2.txt(name,min,max): 
chr1  10000 20000 
chr1  20000 30000 
chr2  40000 50000 
chr2  50000 60000 
chr3  70000 80000 
chr3 810001 910000 
chr3 860001 960000 
chr3 910001 1010000 

값 그 유효한

는 예를 들어 상기 걸리는 경우는 최소 및 제 RDD 이름의의 카운트 값의 최대의 범위의 뜻을 발생 더하기 1 때만 유효 상기 chr1의 7.

발생 스파크로 어떻게 스칼라 결과를 얻을 수 있습니까?

많은 감사

+0

두 번째 rdd의 열이 고유하지 않은 이유는 무엇입니까? 즉, 첫 번째 rdd의 값이 두 번째 rdd의 값과 일치하면됩니다. – jtitusj

+0

두 번째 RDD는 RDD1의 값 범위를 정의합니다. –

답변

2

시도 :

내가 알고있는 것처럼
val rdd1 = sc.parallelize(Seq(
    ("chr1", 10016), ("chr1", 10017), ("chr1", 10018))) 
val rdd2 = sc.parallelize(Seq(
    ("chr1", 10000, 20000), ("chr1",20000, 30000))) 

rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")) 
.where($"value".between($"min", $"max")) 
+0

scala> r1.toDF ("name", "value") 조인 (r2.toDF ("name", "min", "max"), Seq (" ($ "min", $ "max")) java.lang.IllegalArgumentException : 요구 사항 실패 : 열 수가 일치하지 않습니다. 이전 열 이름 (1) : _1 새 열 이름 (2) : 이름, 값 –

+0

올바르게 이해하면 'rdd1.toDF ("name", "min", "max"). groupBy (' 더 효율적으로 만들기 위해 합류하기 전에 .agg (min ('min) .as ("min"), max ('max) .as ("max"))'(나는 이것이 최적화라고 확신한다. ..) – Wilmerton

+0

나는 그것을 시도하고 그것이 내 질문에 대한 완전한 대답을 한 후 더 효율적인지 확인합니다 –

0

, 당신은 rdd2에서 최소 및 최대 사이에 가을 rdd1의 값을합니다. 당신이 파일에서 읽는 경우 아래

val rdd1 = sc.parallelize(Seq(("chr1", 10016), ("chr1", 10017), ("chr1", 10018))) 
val rdd2 = sc.parallelize(Seq(("chr1", 10000, 20000), ("chr1",20000, 30000))) 
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show() 


scala> val rdd1=sc.parallelize(Seq(("chr1", 10016),("chr1", 10017),("chr1", 10018),("chr1", 20026),("chr1", 20036),("chr1", 25016),("chr1", 26026),("chr2", 40016),("chr2", 40116),("chr2", 50016),("chr3", 70016))) 
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24 

scala> val rdd2=sc.parallelize(Seq(("chr1",  10000, 20000),("chr1",  20000 , 30000),("chr2",  40000 ,50000),("chr2",  50000 ,60000),("chr3",  70000 ,80000),("chr3", 810001 ,910000),("chr3", 860001 ,960000),("chr3", 910001 ,1010000))) 
rdd2: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24 


scala> rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show() 
+----+-----+ 
|name|count| 
+----+-----+ 
|chr3| 1| 
|chr1| 7| 
|chr2| 3| 
+----+-----+ 

편집 를 작동하는지 확인하시기 바랍니다, 내가 사용하는 것과

import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}; 

val sqlContext = new SQLContext(sc) 
val nameValueSchema = StructType(Array(StructField("name", StringType, true),StructField("value", IntegerType, true))) 
val nameMinMaxSchema = StructType(Array(StructField("name", StringType, true),StructField("min", IntegerType, true),StructField("max", IntegerType, true))) 
val rdd1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameValueSchema).load("rdd1.csv") 
val rdd2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameMinMaxSchema).load("rdd2.csv") 
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show() 
이 모든 노드에서 실행됩니다

및 PARALLELIZE의 필요가 없습니다 다음 요구. 여기 documentation 인용

DEF 병렬화 [T] (서열 서열 [T], numSlices : 지능 = defaultParallelism) (암시는 arg0는 : ClassTag [T]가) RDD [T] 퍼머 로컬 스칼라 배포 RDD를 형성하는 수집.

+0

완전한 명령에 대한 많은 감사와 결과가 예상되지만 rdd1.txt와 rdd2.txt는 매우 매우 큽니다. , 하드 코딩없이 병렬 처리하는 방법은 무엇입니까? –

+0

하드 코딩하지 않고 병렬 처리하는 것을 잘 모르겠다면, 정교 할 수 있습니까? –

+0

@WuFei를 하드 코딩한다는 것은 무엇을 의미합니까? – eliasah

관련 문제