Pyspark에서 분류 자의 입력 데이터를 준비 중입니다. SparkSQL에서 aggregate 함수를 사용하여 평균 및 분산과 같은 기능을 추출했습니다. 이들은 활동, 이름 및 창별로 그룹화됩니다. Unix 시간 소인을 10000으로 나눈 값으로 창을 계산하여 10 초의 시간 창으로 구분합니다.Pyspark 사용자 정의 열 계산
sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data GROUP BY activity,name,window ORDER BY activity,name,window")
이것의 결과가 보일 것이다 내가 지금하고 싶은 것은, 내가 타임 스탬프를 필요로 이에
X.
에서 각 지점의 평균 기울기를 계산하는 것이다Activity Name Window AvgX VarX
Walk accelerometer 95875 2.0 1.0
처럼, 창 및 X를 사용합니다. 파이썬에서 배열을 사용하여 논리를 구현했습니다. 이것은 각 점 사이의 기울기를 계산 한 다음 평균 기울기를 얻는 것과 같습니다. 이상적으로, 나는 Pyspark에서 아직 지원되지 않는 UDAF에서 이것을하고 싶다. . (이 기능은 아래의 기울기 호출 된 경우 다음 SQL 당신이
slope(timestamp, X) as avgSlopeX
편집을 할 수있는 말과 같을 것이다 - 그것은 명확하므로 입력을 변경 그래서, 내가 정확히 뭐하는 거지하는 사이의 기울기를 계산이다. 각 포인트는 다음 나는 각 윈도우의 평균과 분산을 얻고, 나는 또한 평균 기울기를 얻으려면, 그래서. 그 창에 슬로프의 평균을 반환.
#sample input
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529]
values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02]
i = 0;
slope = 0.0;
totalSlope = 0.0;
while (i < len(timestamp) - 1):
y2 = values[i+1];
y1 = values[i];
x2 = timestamp[i + 1];
x1 = timestamp[i];
slope = ((y2-y1)/(x2-x1));
totalSlope = totalSlope + slope;
i=i+1
avgSlope = (totalSlope/len(x_values))
를 내가이를 구현 수있는 방법 pandas 데이터 프레임으로 변환 한 다음 numpy 배열로 변환해야합니까? 그렇다면 어떻게 데이터가 여전히 올바르게 매핑되는지 확인하고 GROUP BY 동작을 염두에 두십시오 ivity, SQL 쿼리의 이름 창
이것은 확실히 UDAF의 직업이 아닙니다. – zero323
@ zero323 어떻게 접근할까요? – other15
연속 점에 대한 기울기를 계산 한 다음 간단한 평균을 취하십시오. 그러나 여기에 입력 된 설명은 다소 모호합니다. 예상 결과와 함께 예제 데이터를 게시 할 수 있습니까? – zero323