2017-10-02 1 views
1

두 개 이상의 요소 행을 선택 후 :스파크 - withColumn ("뉴콜", collect_list (...)) 나는이 JSON에서 만든 DataFrame 함께 일하고

{"id" : "1201", "name" : "satish", "age" : "25"}, 
{"id" : "1202", "name" : "krishna", "age" : "28"}, 
{"id" : "1203", "name" : "amith", "age" : "39"}, 
{"id" : "1204", "name" : "javed", "age" : "23"}, 
{"id" : "1205", "name" : "mendy", "age" : "25"}, 
{"id" : "1206", "name" : "rob", "age" : "24"}, 
{"id" : "1207", "name" : "prudvi", "age" : "23"} 

는 처음에는 Dataframe의 모습 이 : 나는 그룹에 자신의 아이디에 따라 그 주문 같은 나이를 가진 모든 학생입니다 무엇이 필요

+---+----+-------+ 
|age| id| name| 
+---+----+-------+ 
| 25|1201| satish| 
| 28|1202|krishna| 
| 39|1203| amith| 
| 23|1204| javed| 
| 25|1205| mendy| 
| 24|1206| rob| 
| 23|1207| prudvi| 
+---+----+-------+ 

. 이것은 지금까지 제가 접근하고있는 방법입니다 :

* 참고 : 나는 withColumn("newCol", ..)을 사용하여 새로운 열을 추가하는 것보다 더 효율적이라고 확신하지만 select("newCol")을 사용합니다. 그러나 어떻게 해결 해야할지 모르겠습니다.

[WrappedArray([25,1201,satish], [25,1205,mendy])] 
[WrappedArray([24,1206,rob])] 
[WrappedArray([23,1204,javed])] 
[WrappedArray([23,1204,javed], [23,1207,prudvi])] 
[WrappedArray([28,1202,krishna])] 
[WrappedArray([39,1203,amith])] 

지금, 가 어떻게 두 개 이상의 요소을 가지고 행을 필터링 할 수 있습니다 : 그것은

val conf = new SparkConf().setAppName("SimpleApp").set("spark.driver.allowMultipleContexts", "true").setMaster("local[*]") 
    val sc = new SparkContext(conf) 

    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    val df = sqlContext.read.json("students.json") 

    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.expressions._ 

    val mergedDF = df.withColumn("newCol", collect_list(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))).select("List") 

더 나은 내가 얻고 출력이 무엇입니까? 결과가 예상 한 아니기 때문에,

val mergedDF = df.withColumn("newCol", collect_list(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))) 

val filterd = mergedDF.withColumn("count", count("age").over(Window.partitionBy("age"))).filter($"count" > 1).select("newCol") 

하지만 내가 모르는 뭔가가 있어야합니다 :

[WrappedArray([25,1201,satish], [25,1205,mendy])] 
[WrappedArray([23,1204,javed], [23,1207,prudvi])] 

내 가장 좋은 방법은 지금까지입니다 : 그건 내 마지막 dataframe 될 것을 원이다

[WrappedArray([23,1204,javed], [23,1207,prudvi])] 
[WrappedArray([25,1201,satish])] 
[WrappedArray([25,1201,satish], [25,1205,mendy])] 

답변

2

당신은 당신의 데이터를 필터링 할 size()를 사용할 수 있습니다

import org.apache.spark.sql.functions.{col,size} 

mergedDF.filter(size(col("newCol"))>1).show(false) 

+---+----+------+-----------------------------------+ 
|age|id |name |newCol        | 
+---+----+------+-----------------------------------+ 
|23 |1207|prudvi|[[23,1204,javed], [23,1207,prudvi]]| 
|25 |1205|mendy |[[25,1201,satish], [25,1205,mendy]]| 
+---+----+------+-----------------------------------+ 
관련 문제