2017-11-23 1 views
0

저는 DataFramesSpark 2.2.0Scala 2.11.8에 있습니다. 내가 df1에서 rank을 찾으려면, df2Spark 2에서 struct를 검색하는 방법은 무엇입니까?

df2 = 

+----------+-------------+ 
|itemA  | itemB  | 
+----------+-------------+ 
| 111  | 333   | 
| 222  | 444   | 
| 333  | 555   | 
| 444  | 777   | 
+----------+-------------+ 

각 쌍의 경우 :

|-- item: string (nullable = true) 
|-- other_items: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- item: string (nullable = true) 
| | |-- rank: double (nullable = true) 

그리고 :

df1 = 

+----------+-------------------------------+ 
|item  |  other_items   | 
+----------+-------------------------------+ 
| 111  |[[444,1.0],[333,0.5],[666,0.4]]| 
| 222  |[[444,1.0],[333,0.5]]   | 
| 333  |[]        | 
| 444  |[[111,2.0],[555,0.5],[777,0.2]]| 
+----------+-------------------------------+ 

printScheme는 다음과 같은 출력을 제공합니다. 이렇게하려면 df1에서 같은 쌍을 찾아서 df1.itemdf2.itemA이고 other_items.struct.[item]df2.itemB과 같아야합니다. 같은 쌍을 찾을 수없는 경우, 순위는 0

이 결과는이 일해야해야한다 :

+----------+-------------+-------------+ 
|itemA  | itemB  | rank  | 
+----------+-------------+-------------+ 
| 111  | 333   | 0.5  | 
| 222  | 444   | 1.0  | 
| 333  | 555   | 0.0  | 
| 444  | 777   | 0.2  | 
+----------+-------------+-------------+ 

내가 어떻게 할 수 있습니까?

+1

안녕하세요. 질문에 무슨 문제가 있습니까? – Markus

답변

1

원하는대로해야합니다.

df2.as("df2").join(
    df1.select($"item", explode($"other_items").as("other_items")).as("df1"), 
    $"df2.itemA" === $"df1.item" and $"df2.itemB" === $"df1.other_items.item" 
    , "left" 
) 
.select($"itemA", $"itemB", coalesce($"df1.other_items.rank", lit(0.0)).as("rank")) 
.show() 
1

당신은 udf 함수를 정의하여 요구 사항을 달성 할 수 udf 기능을 사용하면 join 모두 dataframe의 후

import org.apache.spark.sql.functions._ 
def findRank = udf((items: mutable.WrappedArray[String], ranks: mutable.WrappedArray[Double], itemB: String) => { 
    val index = items.indexOf(itemB) 
    if(index != -1) ranks(index) else 0.0 
}) 
df1.join(df2, df1("item") === df2("itemA"), "right") 
    .select(df2("itemA"), df2("itemB"), findRank(df1("other_items.item"), df1("other_items.rank"), df2("itemB")).as("rank")) 
    .show(false) 

당신이 dataframe을받을 것을 전화 : 조인하기 전에 트릭은 other_items을 폭발하는 것입니다 as

+-----+-----+----+ 
|itemA|itemB|rank| 
+-----+-----+----+ 
|111 |333 |0.5 | 
|222 |444 |1.0 | 
|333 |555 |0.0 | 
|444 |777 |0.2 | 
+-----+-----+----+ 
+0

스칼라를 사용하고 있습니다. – Markus

+0

이 답변은 Scala에도 있습니다. :) –

+0

스칼라에서'def'를 사용할 수 있습니까? 나는 그것을 몰랐다. – Markus

관련 문제