2016-07-16 2 views
2

특정 값 (필터 창)이다 필터 행 :pyspark 데이터 프레임 API : 리드/래그 I이 같은 데이터 프레임이

id x y 
1 a 1 P 
2 a 2 S 
3 b 3 P 
4 b 4 S 

I 행을 유지하려는 어디의 우위 가치 Y는 'S'내 결과 데이터 프레임이 될 수 있도록 우리가 가정 해 봅시다 : 나는 그것을 할 수 있어요

 id  x  y 
1  a  1  P 
2  b  3  P 

을 pyspark으로 다음과 같이

getLeadPoint = udf(lambda x: 'S' if (y == 'S') else 'NOTS', StringType()) 
windowSpec = Window.partitionBy(df['id']) 
df = df.withColumn('lead_point', getLeadPoint(lead(df.y).over(windowSpec))) 
dfNew = df.filter(df.lead_point == 'S') 

하지만 여기서는 불필요한 열을 돌연변이 한 다음 필터링하는 중입니다. 내가 대신 수행 할 작업을

내가 리드를()를 사용하여 필터링 할 경우 이런 일이지만,이 동작하지 않습니다 : 내가 직접 필터가있는 결과를 얻을 수있는 방법에

dfNew = df.filter(lead(df.y).over(windowSpec) == 'S') 

어떤 아이디어 창을 사용하고 있습니까?

library(dplyr) 
df %>% group_by(id) %>% filter(lead(y) == 'S') 
+0

죄송합니다 .... BTW - 주문 부는 장소 홀더입니다. 내 데이터에는 주문해야하는 '타임 스탬프'열이 있습니다. – Gopala

+1

실제로 버그가 있습니다. 가장 간단한 해결 방법은'withColumn'을 사용하여 열을 추가하고 필터링에 사용하는 것입니다. – zero323

+0

확인해 주셔서 감사합니다. 그래서 현재 가능한 해결책은 내가 할 수있는 최선의 방법인가? – Gopala

답변

2

가정 데이터는 다음과 같습니다

(df 
    .withColumn("lead_y", lead("y").over(w)) 
    .where(col("lead_y") == "S").drop("lead_y")) 

그것을 :

df = sc.parallelize([ 
    ("a", 1, 1, "P"), ("a", 2, 2, "S"), 
    ("b", 4, 2, "S"), ("b", 3, 1, "P"), ("b", 2, 3, "P"), ("b", 3, 3, "S") 
]).toDF(["id", "x", "timestamp", "y"]) 

창 사양은 단순히 열을 추가 및 필터링에 사용할 수

from pyspark.sql.functions import lead, col 
from pyspark.sql import Window 

w = Window.partitionBy("id").orderBy("timestamp") 

에 해당 꽤 아니지만 UDF 호출보다 더 효율적입니다.

0

효율적이지,하지만 당신은 당신이 다음 인덱스 조인 인덱스에 1을 추가 할 경우 다음 새 RDD을, 색인 압축 할 수 있으며 다음은 간단한로 바뀝니다 :

R 상당이다 필터 작동.

+0

고마워요! 현재 제가하고있는 것처럼 새로운 가치의 리드 값을 돌연변이시키는 것보다 더 나쁜 해결책이라고 생각합니다. 또한 코드가 없으면 솔루션이 실제로 어떻게 보이는지 또는 말하기 어렵습니다. – Gopala

관련 문제