2017-09-06 3 views
0

I는 유사한 구조를 갖는 DataFrame이 각 그룹에 액세스하는 column.field에 따라 npaNumber은 나중에 둘 이상의 요소 (동일한 npaNumber를 갖는 행 그룹)가있는 그룹에서 작동합니다. 그래서 나는 다음과 같은 쓴 : 나도 같은 npaNumber으로 행 그룹을 표시 할 것으로 예상 된 foreach 문으로스파크 DataFrame 족 행 독립적

df.groupBy($"npaHeaderData.npaNumber") 
    .count() 
    .filter("count > 1") 
    .foreach { x => println(x) } 

, 대신 요소가 I이었다 디스플레이하는 한을 바로 npaNumber 카운트 값 :

[3487208122633,2] 
[5668207771332,3] 
[3567207579910,4] 
[5768207822303,2] 
[9868207960414,7] 

는 또한 성공하지 않고 다음과 같은 시도 :

val groupedDF = df.groupBy($"npaHeaderData.npaNumber").agg($"npaDetails", $"npaHeaderData") 

하지만 오류 메시지가 얻을 :

을 0

"main"스레드의 예외 org.apache.spark.sql.AnalysisException : 표현 'npaHeaderData'이 (가) 그룹에 없거나 이라는 표현식이 집계 함수입니다. 얻은 가치에 신경 쓰지 않는다면 그룹으로 추가하거나 첫 번째()로 마무리하십시오 (또는 first_value). ;; 집계 [npaHeaderData # 6.npaNumber], [npaHeaderData # 6.npaNumber npaNumber 번호 36, npaHeaderData # 6]이 I /에 의해 그룹화 된 각 행의 그룹과 독립적으로 동작에 액세스 할 수있는 방법

데이터 프레임의 column.attribute? 이 관련이 이런 경우에

, 나는 행이 그룹을 통해 수행 할 계획입니다 다음 작업은 시간 npaHeaderData.npaIssuanceDate

감사를 기반으로하여 순서입니다

답변

1

Aggregations보존하지 것 대신 원래 행을 사용하면 그룹화 된 데이터의 모든 행을 집계하고 하나의 집계 된 행만 제공합니다. 그리고 groupByagg 함수에 사용 된 열만 있습니다.

모든 행과 모든 열을 보존하려는 경우 withColumn api로 이동하고 Window 기능을 사용해야합니다.

df.withColumn("count", count($"npaHeaderData").over(Window.partitionBy("npaHeaderData.npaNumber"))) 
    .filter($"count">1) 

이 당신이 할 수있는, 당신에게 filter로 계산하면 count 열을 원하지 않는 경우> 1. 모든 행을 것입니다 + count 열 그룹화 dataframe의 모든 열이있는 모든 행을 제공해야 .drop("count")

+0

완벽하게 작동합니다. 감사. 이 솔루션을 추가하고자하는 사용자는 다음 두 가지 가져 오기가 필요합니다. import org.apache.spark.sql.expressions.Window AND import org.apache.spark.sql.functions._ –

+0

네, 맞습니다 :) 감사합니다. 수락 및 upvote에 대해 :) –

+0

내가 확인하고 싶은 한가지 추가. 나는 하나의 필드가 아닌 두 개의 필드에 따라 행을 그룹화하고, 질문의 스키마를 따르는 경우 Window.partitionBy ("npaHeaderData.npaNumber", "npaDetails.npa Service ") npaService의 값과 npaService의 값을 가진 행을 그룹화하고 싶다면 Window.partitionBy가 작동하는 방식을 올바르게 이해했는지 확인해 주시겠습니까 –