2017-09-15 1 views
0

N 속성 (Atr1, Atr2, Atr3, ..., AtrN)이있는 DataFrame과 동일한 [1..N-1 ] 속성을 제외하고는 N 번째 속성을 제외합니다.필터가있는 Pyspark에서 데이터 프레임의 인스턴스를 검색하는 데 너무 많은 시간이 걸립니다.

인스턴스의 속성 [1..N-1]에 대해 동일한 값을 가진 DataFrame에 인스턴스가 있는지 확인하고 해당 인스턴스의 존재하는 경우, 내 목표는 Attributes [1..N]을 가진 DataFrame의 인스턴스. 예를 들어

, 내가있는 경우 : 나는 또한 Atr4의 값으로 DataFrame의 4 행을 얻으려면

Instance: 

[Row(Atr1=u'A', Atr2=u'B', Atr3=24)] 

Dataframe: 

+------+------+------+------+ 
| Atr1 | Atr2 | Atr3 | Atr4 | 
+------+------+------+------+ 
| 'C' | 'B' | 21 | 'H' | 
+------+------+------+------+ 
| 'D' | 'B' | 21 | 'J' | 
+------+------+------+------+ 
| 'E' | 'B' | 21 | 'K' | 
+------+------+------+------+ 
| 'A' | 'B' | 24 | 'I' | 
+------+------+------+------+ 

.

나는 그것을 시도 "필터()"이 같은 방법

df.filter("Atr1 = 'C' and Atr2 = 'B', and Atr3 = 24").take(1) 

그리고 내가 원하는 결과를 얻을 수 있지만, 많은 시간이 걸렸습니다.

내 질문은 : 같은 시간에 더 적은 시간에 할 수있는 방법이 있습니까?

감사합니다.

+0

조금 더 정보여기에 도움이 될 것입니다. 특히 : 얼마나 많은 시간이 걸리고 있습니까? 얼마나 오래 걸릴까요? 얼마나 많은 클러스터/하드웨어를 실행하고 있습니까? 일반적으로 클러스터의 크기에 관계없이 스파크 작업을 수행 할 때 약간의 오버 헤드가 발생합니다. 스파크는 데이터와 코드를 클러스터에 배포 한 다음 결과를 수집해야하기 때문입니다. pyspark에서 이와 같은 간단한 작업을 수행하는 것은 로컬 시스템의 Python에서 이와 비슷한 간단한 작업을 수행하는 것처럼 빠르지는 않습니다. –

답변

0

지역 민감성 해시 (minhashLSH)를 사용하여 가장 가까운 이웃을 찾아 동일 여부를 확인할 수 있습니다.

데이터에 문자열이 있으므로 LSH를 적용하기 전에 데이터를 처리해야합니다. 우리는 pyspark ml의의 기능 모듈 stringIndexing와

시작을 사용하고

df= spark.createDataFrame([('C','B',21,'H'),('D','B',21,'J'),('E','c',21,'K'),('A','B',24,'J')], ["attr1","attr2","attr3","attr4"]) 


for col_ in ["attr1","attr2","attr4"]: 

    stringIndexer = StringIndexer(inputCol=col_, outputCol=col_+"_") 
    model = stringIndexer.fit(df) 
    df = model.transform(df) 
    encoder = OneHotEncoder(inputCol=col_+"_", outputCol="features_"+col_, dropLast = False) 
    df = encoder.transform(df) 


df = df.drop("attr1","attr2","attr4","attr1_","attr2_","attr4_") 
df.show() 


+-----+--------------+--------------+--------------+ 
|attr3|features_attr1|features_attr2|features_attr4| 
+-----+--------------+--------------+--------------+ 
| 21| (4,[2],[1.0])| (2,[0],[1.0])| (3,[1],[1.0])| 
| 21| (4,[0],[1.0])| (2,[0],[1.0])| (3,[0],[1.0])| 
| 21| (4,[3],[1.0])| (2,[1],[1.0])| (3,[2],[1.0])| 
| 24| (4,[1],[1.0])| (2,[0],[1.0])| (3,[0],[1.0])| 
+-----+--------------+--------------+--------------+ 

이 ID를 추가하고 모든 기능을 조립 onehotencoding됩니다 는

from pyspark.sql.functions import monotonically_increasing_id 

df = df.withColumn("id", monotonically_increasing_id()) 
df.show() 


assembler = VectorAssembler(inputCols = ["features_attr1", "features_attr2", "features_attr4", "attr3"] 
          , outputCol = "features") 
df_ = assembler.transform(df) 
df_ = df_.select("id", "features") 
df_.show() 


+----------+--------------------+ 
|  id|   features| 
+----------+--------------------+ 
|   0|(10,[2,4,7,9],[1....| 
|   1|(10,[0,4,6,9],[1....| 
|8589934592|(10,[3,5,8,9],[1....| 
|8589934593|(10,[1,4,6,9],[1....| 
+----------+--------------------+ 

이 minHashLSH 모델 및 검색을 만들기 벡터 가까운 이웃의 경우

mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345) 
model = mh.fit(df_) 
model.transform(df_) 
key = df_.select("features").collect()[0]["features"] 
model.approxNearestNeighbors(df_, key, 1).collect() 

출력

[Row(id=0, features=SparseVector(10, {2: 1.0, 4: 1.0, 7: 1.0, 9: 21.0}), hashes=[DenseVector([-1272095496.0])], distCol=0.0)] 
관련 문제