2016-09-28 2 views
12

평평하게하려는 데이터 프레임이 있습니다. 프로세스의 일부로 분해하려고합니다. 따라서 배열의 열이있는 경우 배열의 각 값을 사용하여 별도의 행을 만듭니다. 예를 들어,Spark sql null 값을 잃지 않고 폭발하는 방법

id | name | likes 
_______________________________ 
1 | Luke | [baseball, soccer] 

는이되어야

id | name | likes 
_______________________________ 
1 | Luke | baseball 
1 | Luke | soccer 

이 내 코드

private DataFrame explodeDataFrame(DataFrame df) { 
    DataFrame resultDf = df; 
    for (StructField field : df.schema().fields()) { 
     if (field.dataType() instanceof ArrayType) { 
      resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name()))); 
      resultDf.show(); 
     } 
    } 
    return resultDf; 
} 

문제는 내 데이터 배열 열의 일부가 널 (null)을 가지고입니다. 이 경우 전체 행이 삭제됩니다. 그래서이 dataframe :

id | name | likes 
_______________________________ 
1 | Luke | [baseball, soccer] 
2 | Lucy | null 

id | name | likes 
_______________________________ 
1 | Luke | baseball 
1 | Luke | soccer 

대신

id | name | likes 
_______________________________ 
1 | Luke | baseball 
1 | Luke | soccer 
2 | Lucy | null 

내가 널 행을 잃지 않도록 어떻게 내 배열이 폭발 할 수의이됩니까?

나는 스파크 1.5.2와 자바를 사용하고 8

답변

20

당신은 explode_outer 기능을 사용할 수 있습니다 2.2

스파크 :

import org.apache.spark.sql.functions.explode_outer 

df.withColumn("likes", explode_outer($"likes")).show 

// +---+----+--------+ 
// | id|name| likes| 
// +---+----+--------+ 
// | 1|Luke|baseball| 
// | 1|Luke| soccer| 
// | 2|Lucy| null| 
// +---+----+--------+ 

스파크 < = 2.1

을 스칼라에서는 Java와 동등해야합니다. 거의 동일합니다 (개별 기능을 가져 오기 위해 import static 사용).

import org.apache.spark.sql.functions.{array, col, explode, lit, when} 

val df = Seq(
    (1, "Luke", Some(Array("baseball", "soccer"))), 
    (2, "Lucy", None) 
).toDF("id", "name", "likes") 

df.withColumn("likes", explode(
    when(col("likes").isNotNull, col("likes")) 
    // If null explode an array<string> with a single null 
    .otherwise(array(lit(null).cast("string"))))) 

생각이 여기에 원하는 유형의 array(NULL)NULL을 대체 할 기본적이다.

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y") 

val st = StructType(Seq(
    StructField("_1", IntegerType, false), StructField("_2", StringType, true) 
)) 

dfStruct.withColumn("y", explode(
    when(col("y").isNotNull, col("y")) 
    .otherwise(array(lit(null).cast(st))))) 

또는

dfStruct.withColumn("y", explode(
    when(col("y").isNotNull, col("y")) 
    .otherwise(array(lit(null).cast("struct<_1:int,_2:string>"))))) 

: 복합 형 (일명 structs)의 경우는 전체 스키마를 제공해야

배열 ColumnfalsecontainsNull 세트로 생성 된 경우 다음을 수행해야 먼저 변경 (Spark 2.1 테스트) :

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true))) 
+0

멋지다, 감사합니다!나는 다음과 같은 질문을 가지고있다. 만약 내 컬럼 타입이 StructType이라면? 캐스트 (새로운 StructType())를 사용하여 시도했지만'데이터 형식 불일치 : THEN 및 ELSE식이 모두 같은 형식이거나 공통 형식과 호환 가능해야합니다. '가능한 한 제네릭을 만들기 위해 노력하고 있습니다. 모든 열 유형에 적합합니다. – alexgbelov

+0

또한 열 유형을 가져 오려면 DataFrame.dtypes()를 사용하고 있습니다. 열 유형을 가져 오는 더 좋은 방법이 있습니까? – alexgbelov

+1

a) 모든 필드와 함께 전체 스키마를 제공해야합니다. b)'dtypes' 또는'schema'. – zero323

0

허용되는 대답에 따라 배열 요소가 복잡한 유형 인 경우 손으로 (예 : 대형 구조체로) 정의하기가 어려울 수 있습니다.

가 자동으로 작업을 수행하려면 나는 다음과 같은 헬퍼 메소드 썼다 :

def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = { 
     val arrayFields = df.schema.fields 
      .map(field => field.name -> field.dataType) 
      .collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])} 
      .toMap 

     columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) => 
     dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol)) 
     .otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))  
} 
관련 문제