2016-10-13 1 views
3

나는 하이브 테이블에서 스파크 데이터 프레임으로 데이터를로드하는 중입니다. 이제는 1 개의 데이터 프레임에 모든 고유 한 accts를, 또 다른 하나에는 모든 중복을 원합니다. 예를 들어 acct id 1,1,2,3,4가있는 경우입니다. 하나의 데이터 프레임에서 2,3,4, 다른 데이터 프레임에서 1,1을 얻고 싶습니다. 어떻게해야합니까?Spark Dataframe에서 두 개의 데이터 프레임에 중복 레코드와 개별 ​​레코드를 얻는 방법은 무엇입니까?

+1

논리 : ID &에 의해 집계, 계산 ', 원래 dataframe에 다시 id_count = 1','df_dup = DF id_count가> 1' – David

+0

안녕하세요 데이비드 .. 나는 그것을 얻을 타격을받지 df_unique = 안양를 카운트 가입 .. df.agg ("acctid")를 얻어야 만할까요? count()? –

답변

5
val acctDF = List(("1", "Acc1"), ("1", "Acc1"), ("1", "Acc1"), ("2", "Acc2"), ("2", "Acc2"), ("3", "Acc3")).toDF("AcctId", "Details") 
scala> acctDF.show() 
+------+-------+ 
|AcctId|Details| 
+------+-------+ 
|  1| Acc1| 
|  1| Acc1| 
|  1| Acc1| 
|  2| Acc2| 
|  2| Acc2| 
|  3| Acc3| 
+------+-------+ 

val countsDF = acctDF.map(rec => (rec(0), 1)).reduceByKey(_+_).map(rec=> (rec._1.toString, rec._2)).toDF("AcctId", "AcctCount") 

val accJoinedDF = acctDF.join(countsDF, acctDF("AcctId")===countsDF("AcctId"), "left_outer").select(acctDF("AcctId"), acctDF("Details"), countsDF("AcctCount")) 

scala> accJoinedDF.show() 
+------+-------+---------+ 
|AcctId|Details|AcctCount| 
+------+-------+---------+ 
|  1| Acc1|  3| 
|  1| Acc1|  3| 
|  1| Acc1|  3| 
|  2| Acc2|  2| 
|  2| Acc2|  2| 
|  3| Acc3|  1| 
+------+-------+---------+ 


val distAcctDF = accJoinedDF.filter($"AcctCount"===1) 
scala> distAcctDF.show() 
+------+-------+---------+ 
|AcctId|Details|AcctCount| 
+------+-------+---------+ 
|  3| Acc3|  1| 
+------+-------+---------+ 

val duplAcctDF = accJoinedDF.filter($"AcctCount">1) 
scala> duplAcctDF.show() 
+------+-------+---------+     
|AcctId|Details|AcctCount| 
+------+-------+---------+ 
|  1| Acc1|  3| 
|  1| Acc1|  3| 
|  1| Acc1|  3| 
|  2| Acc2|  2| 
|  2| Acc2|  2| 
+------+-------+---------+ 

(OR scala> duplAcctDF.distinct.show()) 
+0

내가 틀렸을 때 정정 해주세요. 그러나 데이터 프레임에서 map 또는 reduceByKey를 사용할 수 없습니다. ir를 rdd로 먼저 변환하고 마지막으로 DF로 다시 변환해야합니다. 이런 식으로 :'val countsDF = acctDF.rdd.map .... toDF()' – Emiliano

1

당신이 가지고있는 스파크의 버전에 따라, 당신은/SQL 아래와 같은 데이터 세트에서 윈도우 기능을 사용할 수

Dataset<Row> New = df.withColumn("Duplicate", count("*").over(Window.partitionBy("id"))); 

Dataset<Row> Dups = New.filter(col("Duplicate").gt(1)); 

Dataset<Row> Uniques = New.filter(col("Duplicate").equalTo(1)); 

위의 자바로 작성된 것입니다. scala에서 유사해야하며 파이썬에서하는 방법을 읽어야합니다. https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

관련 문제