지역 민감성 해시 (minhashLSH)를 사용하여 가장 가까운 이웃을 찾아 동일 여부를 확인할 수 있습니다.
데이터에 문자열이 있으므로 LSH를 적용하기 전에 데이터를 처리해야합니다. 우리는 pyspark ml의의 기능 모듈 stringIndexing와
시작을 사용하고
df= spark.createDataFrame([('C','B',21,'H'),('D','B',21,'J'),('E','c',21,'K'),('A','B',24,'J')], ["attr1","attr2","attr3","attr4"])
for col_ in ["attr1","attr2","attr4"]:
stringIndexer = StringIndexer(inputCol=col_, outputCol=col_+"_")
model = stringIndexer.fit(df)
df = model.transform(df)
encoder = OneHotEncoder(inputCol=col_+"_", outputCol="features_"+col_, dropLast = False)
df = encoder.transform(df)
df = df.drop("attr1","attr2","attr4","attr1_","attr2_","attr4_")
df.show()
+-----+--------------+--------------+--------------+
|attr3|features_attr1|features_attr2|features_attr4|
+-----+--------------+--------------+--------------+
| 21| (4,[2],[1.0])| (2,[0],[1.0])| (3,[1],[1.0])|
| 21| (4,[0],[1.0])| (2,[0],[1.0])| (3,[0],[1.0])|
| 21| (4,[3],[1.0])| (2,[1],[1.0])| (3,[2],[1.0])|
| 24| (4,[1],[1.0])| (2,[0],[1.0])| (3,[0],[1.0])|
+-----+--------------+--------------+--------------+
이 ID를 추가하고 모든 기능을 조립 onehotencoding됩니다 는
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("id", monotonically_increasing_id())
df.show()
assembler = VectorAssembler(inputCols = ["features_attr1", "features_attr2", "features_attr4", "attr3"]
, outputCol = "features")
df_ = assembler.transform(df)
df_ = df_.select("id", "features")
df_.show()
+----------+--------------------+
| id| features|
+----------+--------------------+
| 0|(10,[2,4,7,9],[1....|
| 1|(10,[0,4,6,9],[1....|
|8589934592|(10,[3,5,8,9],[1....|
|8589934593|(10,[1,4,6,9],[1....|
+----------+--------------------+
이 minHashLSH 모델 및 검색을 만들기 벡터 가까운 이웃의 경우
mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)
model = mh.fit(df_)
model.transform(df_)
key = df_.select("features").collect()[0]["features"]
model.approxNearestNeighbors(df_, key, 1).collect()
출력
[Row(id=0, features=SparseVector(10, {2: 1.0, 4: 1.0, 7: 1.0, 9: 21.0}), hashes=[DenseVector([-1272095496.0])], distCol=0.0)]
조금 더 정보여기에 도움이 될 것입니다. 특히 : 얼마나 많은 시간이 걸리고 있습니까? 얼마나 오래 걸릴까요? 얼마나 많은 클러스터/하드웨어를 실행하고 있습니까? 일반적으로 클러스터의 크기에 관계없이 스파크 작업을 수행 할 때 약간의 오버 헤드가 발생합니다. 스파크는 데이터와 코드를 클러스터에 배포 한 다음 결과를 수집해야하기 때문입니다. pyspark에서 이와 같은 간단한 작업을 수행하는 것은 로컬 시스템의 Python에서 이와 비슷한 간단한 작업을 수행하는 것처럼 빠르지는 않습니다. –