2017-01-25 1 views
0

한다고 가정 벡터 열에서 계산하는 NaN은 I는 features 열의 각 행 플로트 NaN 데이터 유형들의 조합을 포함하는 DenseVector 곳이스파크 :

+-----------+------------------+ 
| id  |   features| 
+-----------+------------------+ 
|   1|[57.0,1.0,0.0,0.0]| 
|   2|[63.0,NaN,0.0,0.0]| 
|   3|[74.0,1.0,3.0,NaN]| 
|   4|[67.0,NaN,0.0,0.0]| 
|   5|[NaN,1.0,NaN,NaN] | 

같다 스파크 DataFrame있다. DenseVector 또는 임의의 열의 첫 번째 열에서 NaN의 수를 계산하는 방법이 있습니까? 예를 들어 첫 번째 열에는 1 NaN, 2 번째 열에는 3, 4 번째 열에는 2가 반환되는 것이 좋습니다.

답변

1

Spark SQL은 이와 같은 방법을 제공하지는 않지만 알고 있습니다. 약간 RDD과 약간의 NumPy.

from pyspark.ml.linalg import DenseVector, Vector 
import numpy as np 

df = sc.parallelize([ 
    (1, DenseVector([57.0, 1.0, 0.0, 0.0])), 
    (2, DenseVector([63.0, float("NaN"), 0.0, 0.0])), 
    (3, DenseVector([74.0, 1.0, 3.0, float("NaN")])), 
    (4, DenseVector([67.0, float("NaN"), 0.0, 0.0])), 
    (5, DenseVector([float("NaN"), 1.0, float("NaN"), float("NaN")])), 
]).toDF(["id", "features"]) 

(df 
    .select("features") 
    .rdd 
    .map(lambda x: np.isnan(x.features.array)) 
    .sum()) 
array([1, 2, 1, 2]) 

당신은 할 수 SQL과 유사한 것은 그것은 훨씬 더 많은 노력이 필요하지만. 도우미 기능 : 마지막 select

from pyspark.sql.functions import col, isnan, sum as sum_ 

feature_array = as_array("features").alias("features") 

:

(df 
    .na.drop(subset=["features"]) 
    .select([sum_(isnan(feature_array[i]).cast("bigint")) for i in range(vlen)])) 

from pyspark.sql.functions import col, size 

(vlen,) = df.na.drop().select(size(as_array(col("features")))).first() 

가 식을 만듭니다

from pyspark.sql.functions import udf 
from pyspark.sql.types import ArrayType, DoubleType 
from pyspark.sql import Column 
from typing import List 

def as_array(c: Column) -> Column: 
    def as_array_(v: Vector) -> List[float]: 
     return v.array.tolist() 
    return udf(as_array_, ArrayType(DoubleType()))(c) 

는 벡터의 크기를 결정