2016-06-13 2 views
1

take(5)을 다른 RDD로 호출 한 후 반환 된 컬렉션을 변환하려면 어떻게해야합니까? 출력 파일에 처음 5 개의 레코드를 저장할 수 있습니까?Spark : scala - RDD에서 다른 RDD로 컬렉션을 변환하는 방법

saveAsTextfile을 사용하는 경우 takesaveAsTextFile을 함께 사용하지 않게됩니다. (그 이유는 아래에서 그 줄을 주석으로 표시 한 이유입니다). RDD의 모든 레코드를 정렬 된 순서로 저장하므로 처음 5 개 레코드가 상위 5 개 국가이지만 처음 5 개 레코드 만 저장하려는 경우 - RDD에서 콜렉션 [take (5)]을 변환 할 수 있습니까?

val Strips = txtFileLines.map(_.split(",")) 
         .map(line => (line(0) + "," + (line(7).toInt + line(8).toInt))) 
         .sortBy(x => x.split(",")(1).trim().toInt, ascending=false) 
         .take(5) 
         //.saveAsTextFile("output\\country\\byStripsBar") 

솔루션 : 당신이 절대적으로 saveAsTextFile 서식을 필요로하지 않는 sc.parallelize(Strips, 1).saveAsTextFile("output\\country\\byStripsBar")

답변

2
val rowsArray: Array[Row] = rdd.take(5) 
val slicedRdd = sparkContext.parallelize(rowsArray, 1) 

slicedRdd.savesTextFile("specify path here") 
:

그렇지 않으면, 여기에 말의 RDD 유일한 해결책이다

1

, 난 그냥 간단한 IO를 사용하여 파일 (같은 File)에 take(5) 출력을 인쇄합니다.

scala> val rdd = sc.parallelize(5 to 1 by -1 map{x => (x, x*x)}) 
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[71] at parallelize at <console>:27 

scala> rdd.collect 
res1: Array[(Int, Int)] = Array((5,25), (4,16), (3,9), (2,4), (1,1)) 

scala> val top2 = rdd.sortBy(_._1).zipWithIndex.collect{case x if (x._2 < 2) => x._1} 
top2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[79] at collect at <console>:29 

scala> top2.collect 
res2: Array[(Int, Int)] = Array((1,1), (2,4)) 
관련 문제