, 당신은 rdd2에서 최소 및 최대 사이에 가을 rdd1의 값을합니다. 당신이 파일에서 읽는 경우 아래
val rdd1 = sc.parallelize(Seq(("chr1", 10016), ("chr1", 10017), ("chr1", 10018)))
val rdd2 = sc.parallelize(Seq(("chr1", 10000, 20000), ("chr1",20000, 30000)))
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
scala> val rdd1=sc.parallelize(Seq(("chr1", 10016),("chr1", 10017),("chr1", 10018),("chr1", 20026),("chr1", 20036),("chr1", 25016),("chr1", 26026),("chr2", 40016),("chr2", 40116),("chr2", 50016),("chr3", 70016)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> val rdd2=sc.parallelize(Seq(("chr1", 10000, 20000),("chr1", 20000 , 30000),("chr2", 40000 ,50000),("chr2", 50000 ,60000),("chr3", 70000 ,80000),("chr3", 810001 ,910000),("chr3", 860001 ,960000),("chr3", 910001 ,1010000)))
rdd2: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
+----+-----+
|name|count|
+----+-----+
|chr3| 1|
|chr1| 7|
|chr2| 3|
+----+-----+
편집 를 작동하는지 확인하시기 바랍니다, 내가 사용하는 것과
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
val sqlContext = new SQLContext(sc)
val nameValueSchema = StructType(Array(StructField("name", StringType, true),StructField("value", IntegerType, true)))
val nameMinMaxSchema = StructType(Array(StructField("name", StringType, true),StructField("min", IntegerType, true),StructField("max", IntegerType, true)))
val rdd1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameValueSchema).load("rdd1.csv")
val rdd2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameMinMaxSchema).load("rdd2.csv")
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
이 모든 노드에서 실행됩니다
및 PARALLELIZE의 필요가 없습니다 다음 요구. 여기 documentation 인용
DEF 병렬화 [T] (서열 서열 [T], numSlices : 지능 = defaultParallelism) (암시는 arg0는 : ClassTag [T]가) RDD [T] 퍼머 로컬 스칼라 배포 RDD를 형성하는 수집.
두 번째 rdd의 열이 고유하지 않은 이유는 무엇입니까? 즉, 첫 번째 rdd의 값이 두 번째 rdd의 값과 일치하면됩니다. – jtitusj
두 번째 RDD는 RDD1의 값 범위를 정의합니다. –