2017-02-22 3 views
0

스파크 DataFrame을 응집 해제 (deaggregation) DF 스키마와 함께 제공됩니다 :스파크

나는 "deaggdegated"열을 생산하기 위해 필요
id, agg_values 
432, 11 3.14 45 4.322 
984, 1 9.22 45 22.17 

:

id, item_id, value 
432, 11, 3.14 
432, 45, 4.322 
984, 1, 98.22 
984, 45, 222.1 

구문 분석 기능 :

private def parse_agg_scores(line: String): List[(String, String)] = { 
    val items: Array[String] = line.split(' ') 
    val ids = Iterator.from(0, 2).takeWhile(_ < items.size).map(items(_)) 
    val scores = Iterator.from(1, 2).takeWhile(_ < items.size).map(items(_)) 
    ids.zip(scores).toList 
    } 

flatMap을 시도했지만 작동하지 않습니다.

val res = df.flatMap{ row => 
     val line = row.getString(1) 

     parse_agg_scores(line) 
} 
+0

단지 말을하지 마십시오 무엇 그것은 작동하지 않습니다. 컴파일 타임 오류, 런타임 오류, 예기치 않은 결과 (그렇다면 무엇을 얻었으며 무엇을 기대 했습니까?) –

답변

0

그것은이 내가 생각해 솔루션입니다 sqlContext.implicits._ 수입하지 관련된 컴파일시 에러이었다에 말도없이 "가 작동하지 않습니다"

def unpivote_scores(df: DataFrame, colName: String, sqlContext: SQLContext): DataFrame = { 
    import sqlContext.implicits._ 

    df.flatMap { row => 
     val video_id = row.getInt(0) 
     val features = row.getString(1) 

     val items: Array[String] = features.split(' ') 
     val ids_string = Iterator.from(0, 2).takeWhile(_ < items.size).map(items(_)) 
     val scores_string = Iterator.from(1, 2).takeWhile(_ < items.size).map(items(_)) 

     val ids = ids_string.map(_.toInt) 
     val scores = scores_string.map(_.toDouble) 

     ids.zip(scores).toList.map(t => (video_id, t._1, t._2)) 
    }.toDF("VideoId", "FeatureId", "Score") 
    } 
2

당신은이 요소의 배열로 분할 4 요소의 배열에 새 열을 만들고, 그 열에 explode 수 - 후 별도의 컬럼으로이 두 요소의 배열을 분할 :

val result = dataFrame.withColumn("tuples", explode(array(
    array($"agg_values"(0), $"agg_values"(1)), 
    array($"agg_values"(2), $"agg_values"(3)) 
))) 
    .select($"id", $"tuples"(0) as "item_id", $"tuples"(1) as "value") 

result.show() 
// +---+-------+-----+ 
// | id|item_id|value| 
// +---+-------+-----+ 
// |432| 11.0| 3.14| 
// |432| 45.0|4.322| 
// |984| 1.0| 9.22| 
// |984| 45.0|22.17| 
// +---+-------+-----+ 
각 레코드 agg_values에 "쌍"의 다른 번호가있을 수 있습니다 경우에

편집, 우리는 폭발하기 전에 쌍 오전 배열로 배열을 변환하는 UDF가 필요합니다

// UDF to turn an array of Doubles into an Array of 2-item Arrays 
val groupPairs = udf { 
    arr: mutable.WrappedArray[Double] => arr.grouped(2).toArray 
} 

val result = dataFrame 
    .withColumn("pair", explode(groupPairs($"agg_values"))) 
    .select($"id", $"pair"(0) as "item_id", $"pair"(1) as "value") 
+0

이것은 흥미로운 해결책이지만 (item_id, value) 쌍의 길이는 알 수 없습니다. 그래서, 나는 "agg_values"(1)과 (2)와 같은 인덱스를 사용할 수 없다. –

+0

알겠습니다. 그래서 우리는 'agg_values'가 그러한 "쌍들"로 구성되어 있음을 (즉 길이가 짝수 임) 알지만 그 쌍이 몇 개 있는지를 모릅니다. 각 레코드에는 쌍의 수가 다를 수 있습니다. –

+1

내 답변이 업데이트되었습니다. 도움이 되었기를 바랍니다. –