2017-12-03 5 views
2

Pyspark의 시계열에 대한 변형 탐지 알고리즘을 쓰고 있습니다. 나는 (-3,3) 또는 (-4,4) 윈도우의 가중 이동 평균을 계산하려고합니다. 지금은 래그 (lag)를 사용하여 창 함수를 넘기고 가중치 집합을 곱합니다. 내 창은 현재 (-2,2)입니다.Pyspark의 가중 이동 평균

Pyspark에서 가중 이동 평균을 계산하는 또 다른 방법이 있는지 알고 싶습니다. 내가 사용하고

현재 코드는 다음과 같습니다

data_frame_1 = spark_data_frame.withColumn("weighted_score_predicted", (weights[0] * lag(column_metric, 1).over(w) + weights[1] * lag(column_metric, 2).over(w) + weights[2] * lead(column_metric, 1).over(w) + weights[3] * lead(column_metric, 2).over(w))/2).na.drop() 

답변

3

당신은 현재의 코드를 일반화 할 수

from pyspark.sql.window import Window 

df = spark.createDataFrame([ 
    ("a", 1, 1.4), ("a", 2, 8.0), ("a", 3, -1.0), ("a", 4, 2.4), 
    ("a", 5, 99.0), ("a", 6, 3.0), ("a", 7, -1.0), ("a", 8, 0.0) 
]).toDF("id", "time", "value") 

w = Window.partitionBy("id").orderBy("time") 
offsets, delays = [-2, -1, 0, 1, 2], [0.1, 0.20, 0.4, 0.20, 0.1] 

result = df.withColumn("avg", weighted_average(
    col("value"), w, offsets, delays 
)) 
result.show() 

## +---+----+-----+-------------------+ 
## | id|time|value|    avg| 
## +---+----+-----+-------------------+ 
## | a| 1| 1.4|    2.06| 
## | a| 2| 8.0| 3.5199999999999996| 
## | a| 3| -1.0|    11.72| 
## | a| 4| 2.4|    21.66| 
## | a| 5| 99.0| 40.480000000000004| 
## | a| 6| 3.0|    21.04| 
## | a| 7| -1.0|    10.1| 
## | a| 8| 0.0|0.10000000000000003| 
## +---+----+-----+-------------------+ 

:

from pyspark.sql.functions import coalesce, lit, col, lead, lag 
from operator import add 
from functools import reduce 

def weighted_average(c, window, offsets, weights): 
    assert len(weights) == len(offsets) 

    def value(i): 
     if i < 0: return lag(c, -i).over(window) 
     if i > 0: return lead(c, i).over(window) 
     return c 

    # Create a list of Columns 
    # - `value_i * weight_i` if `value_i IS NOT NULL` 
    # - literal 0 otherwise 
    values = [coalesce(value(i) * w, lit(0)) for i, w in zip(offsets, weights)] 

    # or sum(values, lit(0)) 
    return reduce(add, values, lit(0)) 

그것은로 사용할 수 있습니다 :

누락 부분이있는 프레임의 결과를 정규화하는 것이 좋습니다.

result.withColumn(
    "normalization_factor", 
    weighted_average(lit(1), w, offsets, delays) 
).withColumn(
    "normalized_avg", 
     col("avg")/col("normalization_factor") 
).show() 

## +---+----+-----+-------------------+--------------------+------------------+ 
## | id|time|value|    avg|normalization_factor| normalized_avg| 
## +---+----+-----+-------------------+--------------------+------------------+ 
## | a| 1| 1.4|    2.06| 0.7000000000000001|2.9428571428571426| 
## | a| 2| 8.0| 3.5199999999999996|     0.9|3.9111111111111105| 
## | a| 3| -1.0|    11.72| 1.0000000000000002|11.719999999999999| 
## | a| 4| 2.4|    21.66| 1.0000000000000002|21.659999999999997| 
## | a| 5| 99.0| 40.480000000000004| 1.0000000000000002|    40.48| 
## | a| 6| 3.0|    21.04| 1.0000000000000002|21.039999999999996| 
## | a| 7| -1.0|    10.1| 0.9000000000000001| 11.22222222222222| 
## | a| 8| 0.0|0.10000000000000003| 0.7000000000000001|0.1428571428571429| 
## +---+----+-----+-------------------+--------------------+------------------+