나는 하이브 테이블에서 스파크 데이터 프레임으로 데이터를로드하는 중입니다. 이제는 1 개의 데이터 프레임에 모든 고유 한 accts를, 또 다른 하나에는 모든 중복을 원합니다. 예를 들어 acct id 1,1,2,3,4가있는 경우입니다. 하나의 데이터 프레임에서 2,3,4, 다른 데이터 프레임에서 1,1을 얻고 싶습니다. 어떻게해야합니까?Spark Dataframe에서 두 개의 데이터 프레임에 중복 레코드와 개별 레코드를 얻는 방법은 무엇입니까?
3
A
답변
5
val acctDF = List(("1", "Acc1"), ("1", "Acc1"), ("1", "Acc1"), ("2", "Acc2"), ("2", "Acc2"), ("3", "Acc3")).toDF("AcctId", "Details")
scala> acctDF.show()
+------+-------+
|AcctId|Details|
+------+-------+
| 1| Acc1|
| 1| Acc1|
| 1| Acc1|
| 2| Acc2|
| 2| Acc2|
| 3| Acc3|
+------+-------+
val countsDF = acctDF.map(rec => (rec(0), 1)).reduceByKey(_+_).map(rec=> (rec._1.toString, rec._2)).toDF("AcctId", "AcctCount")
val accJoinedDF = acctDF.join(countsDF, acctDF("AcctId")===countsDF("AcctId"), "left_outer").select(acctDF("AcctId"), acctDF("Details"), countsDF("AcctCount"))
scala> accJoinedDF.show()
+------+-------+---------+
|AcctId|Details|AcctCount|
+------+-------+---------+
| 1| Acc1| 3|
| 1| Acc1| 3|
| 1| Acc1| 3|
| 2| Acc2| 2|
| 2| Acc2| 2|
| 3| Acc3| 1|
+------+-------+---------+
val distAcctDF = accJoinedDF.filter($"AcctCount"===1)
scala> distAcctDF.show()
+------+-------+---------+
|AcctId|Details|AcctCount|
+------+-------+---------+
| 3| Acc3| 1|
+------+-------+---------+
val duplAcctDF = accJoinedDF.filter($"AcctCount">1)
scala> duplAcctDF.show()
+------+-------+---------+
|AcctId|Details|AcctCount|
+------+-------+---------+
| 1| Acc1| 3|
| 1| Acc1| 3|
| 1| Acc1| 3|
| 2| Acc2| 2|
| 2| Acc2| 2|
+------+-------+---------+
(OR scala> duplAcctDF.distinct.show())
+0
내가 틀렸을 때 정정 해주세요. 그러나 데이터 프레임에서 map 또는 reduceByKey를 사용할 수 없습니다. ir를 rdd로 먼저 변환하고 마지막으로 DF로 다시 변환해야합니다. 이런 식으로 :'val countsDF = acctDF.rdd.map .... toDF()' – Emiliano
1
당신이 가지고있는 스파크의 버전에 따라, 당신은/SQL 아래와 같은 데이터 세트에서 윈도우 기능을 사용할 수
Dataset<Row> New = df.withColumn("Duplicate", count("*").over(Window.partitionBy("id")));
Dataset<Row> Dups = New.filter(col("Duplicate").gt(1));
Dataset<Row> Uniques = New.filter(col("Duplicate").equalTo(1));
위의 자바로 작성된 것입니다. scala에서 유사해야하며 파이썬에서하는 방법을 읽어야합니다. https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
관련 문제
- 1. spark Dataframe에서 병합 작업을 수행하는 방법은 무엇입니까?
- 2. Spark Dataframe에서 numpy 배열을 수정하는 방법은 무엇입니까?
- 3. 두 개의 개별 페이지를 두 개의 개별 iframe에로드하는 방법은 무엇입니까?
- 4. 데이터 테이블에서 고유 레코드를 얻는 방법은 무엇입니까?
- 5. spark에서 데이터 프레임에 레코드를 삽입하는 방법
- 6. 오라클에 일치하는 고유 레코드와 중복 레코드를 식별하십시오.
- 7. SQL에서 레코드와 가까운 레코드를 선택하는 방법은 무엇입니까?
- 8. Spark DataFrame에서 na.fill 스칼라
- 9. 팬더가있는 DataFrame에서 열을 두 개의 개별 열로 분할하는 방법
- 10. 데이터 프레임에 두 개의 열을 플롯하고 범례를 추가하는 방법은 무엇입니까?
- 11. Pandas DataFrame에서 두 개의 열을 효율적으로 추가하는 방법은 무엇입니까?
- 12. Spark Dataframe에서 Python 함수 실행
- 13. 데이터 테이블에서 중복 레코드를 삭제하는 방법은 무엇입니까?
- 14. 두 개의 개별 프로젝트를 만드는 방법은 무엇입니까?
- 15. Spark DataFrame에서 VectorUDT 열의 요소에 액세스하는 방법은 무엇입니까?
- 16. 두 개의 개별 시트에서 두 개의 개별 열 비교
- 17. Spark 데이터 프레임에 누락 된 값을 입력하십시오.
- 18. Spark 스칼라 : DataFrame에서 필터 식을 대체하는 문제
- 19. 두 개의 개별 데이터 배열을 사용하여 두 개의 드롭 다운리스트로드
- 20. LINQ - 두 테이블에서 n 개의 레코드를 가져 오는 방법은 무엇입니까?
- 21. 2 개의 개별 VC에서 두 개의 개별 UIGestureRecognizer 대리자
- 22. ddply를 사용하여 데이터 프레임에 클래스의 가중 평균을 얻는 방법은 무엇입니까?
- 23. 두 레코드를 동시에 두 개의 데이터베이스 테이블에 삽입하는 방법은 무엇입니까?
- 24. Spark Dataframe에서 파티션 분할을 보장하는 방법
- 25. SparkSQL Dataframe에서 MatrixUDT를 열로 사용했습니다.
- 26. java를 사용하여 .csv 파일에서 마지막 두 레코드를 얻는 방법은 무엇입니까?
- 27. coobol의 중복 레코드와 파일 비교
- 28. 두 개의 데이터 프레임 열을 비교하여 데이터 프레임에 추가하십시오.
- 29. 두 개의 개별 화면에 두 개의 창로드
- 30. 집계 루트 테이블의 레코드와 관련된 모든 레코드를 얻는 방법
논리 : ID &에 의해 집계, 계산 ', 원래 dataframe에 다시 id_count = 1','df_dup = DF id_count가> 1' – David
안녕하세요 데이비드 .. 나는 그것을 얻을 타격을받지 df_unique = 안양를 카운트 가입 .. df.agg ("acctid")를 얻어야 만할까요? count()? –