2017-03-27 2 views
1

I는 다음 데이터대하여 반복 열 값에 기초

{ID:"1",CNT:"2", Age:"21", Class:"3"} 
{ID:"2",CNT:"3", Age:"24", Class:"5"} 

I는 아래와 같은 출력을 CNT 값에 기초하여 데이터 프레임을 반복하고 생성 할 스파크에 dataframe 가지고

{ID:"1",CNT:"1", Age:"21", Class:"3"} 
{ID:"1",CNT:"2", Age:"21", Class:"3"} 
{ID:"2",CNT:"1", Age:"24", Class:"5"} 
{ID:"2",CNT:"2", Age:"24", Class:"5"} 
{ID:"2",CNT:"3", Age:"24", Class:"5"} 

이 문제를 해결하는 방법을 알고 계신 분도 계실 수 있습니다.

답변

5

당신은, rdd에 데이터 프레임 변환을 확장 한 다음 데이터 프레임으로 변환 할 flatMap를 사용할 수 있습니다

val df = Seq((1,2,21,3),(2,3,24,5)).toDF("ID", "CNT", "Age", "Class") 

case class Person(ID: Int, CNT: Int, Age: Int, Class: Int) 

df.as[Person].rdd.flatMap(p => (1 to p.CNT).map(Person(p.ID, _, p.Age, p.Class))).toDF.show 
+---+---+---+-----+ 
| ID|CNT|Age|Class| 
+---+---+---+-----+ 
| 1| 1| 21| 3| 
| 1| 2| 21| 3| 
| 2| 1| 24| 5| 
| 2| 2| 24| 5| 
| 2| 3| 24| 5| 
+---+---+---+-----+ 
2

그냥 경우에 당신이 dataframe를 사용하여 솔루션을 선호 단지, 여기에 우리가 간다 :

case class Person(ID: Int, CNT: Int, Age: Int, Class: Int) 

val iterations: (Int => Array[Int]) = (input: Int) => { 
    (1 to input).toArray[Int] 
} 
val udf_iterations = udf(iterations) 

val p1 = Person(1, 2, 21, 3) 
val p2 = Person(2, 3, 24, 5) 

val records = Seq(p1, p2) 
val df = spark.createDataFrame(records) 

df.withColumn("CNT-NEW", explode(udf_iterations(col("CNT")))) 
    .drop(col("CNT")) 
    .withColumnRenamed("CNT-NEW", "CNT") 
    .select(df.columns.map(col): _*) 
    .show(false) 

+---+---+---+-----+ 
|ID |CNT|Age|Class| 
+---+---+---+-----+ 
|1 |1 |21 |3 | 
|1 |2 |21 |3 | 
|2 |1 |24 |5 | 
|2 |2 |24 |5 | 
|2 |3 |24 |5 | 
+---+---+---+-----+