3

Pyspark에서 분류 자의 입력 데이터를 준비 중입니다. SparkSQL에서 aggregate 함수를 사용하여 평균 및 분산과 같은 기능을 추출했습니다. 이들은 활동, 이름 및 창별로 그룹화됩니다. Unix 시간 소인을 10000으로 나눈 값으로 창을 계산하여 10 초의 시간 창으로 구분합니다.Pyspark 사용자 정의 열 계산

sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data GROUP BY activity,name,window ORDER BY activity,name,window") 

이것의 결과가 보일 것이다 내가 지금하고 싶은 것은, 내가 타임 스탬프를 필요로 이에

X.

에서 각 지점의 평균 기울기를 계산하는 것이다
Activity Name   Window  AvgX  VarX 
Walk accelerometer 95875  2.0   1.0 

처럼, 창 및 X를 사용합니다. 파이썬에서 배열을 사용하여 논리를 구현했습니다. 이것은 각 점 사이의 기울기를 계산 한 다음 평균 기울기를 얻는 것과 같습니다. 이상적으로, 나는 Pyspark에서 아직 지원되지 않는 UDAF에서 이것을하고 싶다. . (이 기능은 아래의 기울기 호출 된 경우 다음 SQL 당신이 slope(timestamp, X) as avgSlopeX

편집을 할 수있는 말과 같을 것이다 - 그것은 명확하므로 입력을 변경 그래서, 내가 정확히 뭐하는 거지하는 사이의 기울기를 계산이다. 각 포인트는 다음 나는 각 윈도우의 평균과 분산을 얻고, 나는 또한 평균 기울기를 얻으려면, 그래서. 그 창에 슬로프의 평균을 반환.

#sample input 
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529] 

values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02] 

i = 0; 
slope = 0.0; 
totalSlope = 0.0; 

while (i < len(timestamp) - 1): 
    y2 = values[i+1]; 
    y1 = values[i]; 

    x2 = timestamp[i + 1]; 
    x1 = timestamp[i]; 
    slope = ((y2-y1)/(x2-x1)); 
    totalSlope = totalSlope + slope; 
    i=i+1 

avgSlope = (totalSlope/len(x_values)) 

를 내가이를 구현 수있는 방법 pandas 데이터 프레임으로 변환 한 다음 numpy 배열로 변환해야합니까? 그렇다면 어떻게 데이터가 여전히 올바르게 매핑되는지 확인하고 GROUP BY 동작을 염두에 두십시오 ivity, SQL 쿼리의 이름 창

+0

이것은 확실히 UDAF의 직업이 아닙니다. – zero323

+0

@ zero323 어떻게 접근할까요? – other15

+0

연속 점에 대한 기울기를 계산 한 다음 간단한 평균을 취하십시오. 그러나 여기에 입력 된 설명은 다소 모호합니다. 예상 결과와 함께 예제 데이터를 게시 할 수 있습니까? – zero323

답변

4

UDAF는 순서를 정의 할 수있는 수단을 제공하지 않기 때문에 일반적으로 UDAF의 역할을하지 못합니다. 여기에 실제로 필요한 것은 윈도우 함수와 표준 집계의 조합입니다.

from pyspark.sql.functions import col, lag, avg 
from pyspark.sql.window import Window 

df = ... 
## DataFrame[activity: string, name: string, window: bigint, 
## timestamp: bigint, value: float] 

group = ["activity", "name", "window"] 

w = (Window() 
    .partitionBy(*group) 
    .orderBy("timestamp")) 

v_diff = col("value") - lag("value", 1).over(w) 
t_diff = col("timestamp") - lag("timestamp", 1).over(w) 

slope = v_diff/t_diff 

df.withColumn("slope", slope).groupBy(*group).agg(avg(col("slope"))) 
+0

이것은 좋은 접근법처럼 보입니다. 그러나 withColumn은 "기울기"를 무시하고 단지 값의 평균을 반환합니다 :/기울기가 정규 기능을 사용할 수 있습니까? – other15

+0

오타 (Typo). – zero323