2017-02-17 1 views
1

간단한 예제를 통해 내가하려는 일을 설명하겠습니다. 우리가 두 개의 매우 간단합니다 아래 dataframes 있다고 가정 해 봅시다 : DF1에서pyspark 데이터 프레임의 복수 열의 데카르트 곱을 기반으로 새 열을 만드는 방법

Df1 
+---+---+---+ 
| a1| a2| a3| 
+---+---+---+ 
| 2| 3| 7| 
| 1| 9| 6| 
+---+---+---+ 

Df2 
+---+---+ 
| b1| b2| 
+---+---+ 
| 10| 2| 
| 9| 3| 
+---+---+ 

을, DF2, 우리는 DF1, DF2에서 원래 열 직교 제품입니다 열 df 명령을 새로 만들어야합니다. 특히 새 df는 'a1b1', 'a1b2', 'a2b1', 'a2b2', 'a3b1', 'a3b2'를 가지며 행은 df1, df2의 해당 열을 곱합니다. 결과 DF는 다음과 같이한다 :

Df3 
+----+----+----+----+----+----+ 
|a1b1|a1b2|a2b1|a2b2|a3b1|a3b2| 
+----+----+----+----+----+----+ 
| 20| 4| 30| 6| 70| 14| 
| 9| 3| 81| 27| 54| 18| 
+----+----+----+----+----+----+ 

내가 검색 한 스파크 온라인 문서뿐만 아니라 질문은 여기에 게시하지만, 그들이 행이 아닌 열 직교 제품에 대한 모든 것 같다.

r = sc.parallelize([1, 2]) 
r.cartesian(r).toDF().show() 

+---+---+ 
| _1| _2| 
+---+---+ 
| 1| 1| 
| 1| 2| 
| 2| 1| 
| 2| 2| 
+---+---+ 

을하지만 내가 필요하지 않습니다 예를 들어, rdd.cartesian()는 다음 코드와 같은 행에있는 값의 다른 조합의 직교 제품을 제공합니다. 다시 말하지만, 행 대신 새로운 열을 작성해야합니다. 내 문제에서 행 수가 동일하게 유지됩니다. 나는 udf가 궁극적으로 문제를 해결할 수 있음을 이해합니다. 그러나 실제 응용 프로그램에서는 모든 열 (모든 가능한 열의 조합으로 약 500 개의 새로운 열)을 만드는 데 너무 오래 걸리는 방대한 데이터 집합이 있습니다. 우리는 효율성을 증가시킬 수있는 일종의 벡터 연산을 선호합니다. 나는 틀릴 수도 있지만 spud udf는 행 작업을 기반으로하는 것처럼 보입니다. 행 작업이 너무 오래 걸리는 이유 일 수 있습니다.

제안/피드백/의견을 보내 주셔서 감사합니다. 여러분의 편의를 위해

, 나는 위 dataframes 예제를 만들려면 여기 간단한 코드를 첨부 :

df1 = sqlContext.createDataFrame([[2,3,7],[1,9,6]],['a1','a2','a3']) 
df1.show() 

df2 = sqlContext.createDataFrame([[10,2],[9,3]],['b1','b2']) 
df2.show() 
+0

행을 어떻게 연결합니까? 질서는 일반적으로 의존할만한 것이 아닙니다. – zero323

+0

안녕하세요. Zero323, 메시지를 보내 주셔서 감사합니다. 행을 연결하는 기본 키가 있습니다. 여기서는 단순히 행이 정수 인덱스로 일치하고 모든 데이터 프레임의 행 수가 같다고 가정합니다. – spectrum

+0

OK, 프로 팁 : 명시적인 키를 사용하는 것이 좋습니다. 인덱스에 의존하지 않는다 :) 일반적으로'df1.join (df2, [ 'id']) .df2.columns에있는 y에 대해 df1.columns에서 x에 대해 ([df1 [x] * df2 [y] = 'id'및 y! = 'id'])'id '가 연결 열일 때. – zero323

답변

-1

아니고 간단 내가 아는 한. 당신이 spark.ml.feature에 Elementwise 제품이지만 더 덜 복잡 수 없습니다에 도움이 될 수있는 다른

# function to add rownumbers in a dataframe 
def addrownum(df): 
    dff = df.rdd.zipWithIndex().toDF(['features','rownum']) 
    odf = dff.map(lambda x : tuple(x.features)+tuple([x.rownum])).toDF(df.columns+['rownum']) 
    return odf 

df1_ = addrownum(df1) 
df2_ = addrownum(df2) 
# Join based on rownumbers 
outputdf = df1_.rownum.join(df2_,df1_.rownum==df2_.rownum).drop(df1_.rownum).drop(df2_.rownum) 

n1 = ['a1','a2','a3'] # columns in set1 
n2 = ['b1','b2']  # columns in set2 

# I create a string of expression that I want to execute 
eval_list = ['x.'+l1+'*'+'x.'+l2 for l1 in n1 for l2 in n2] 
eval_str = '('+','.join(eval_list)+')' 
col_list = [l1+l2 for l1 in n1 for l2 in n2] 

dfcartesian = outputdf.map(lambda x:eval(eval_str)).toDF(col_list) 

뭔가 : 여기에 평가를 사용의 기회입니다. 여러 요소를 하나의 목록에서 다른 목록으로 가져 와서 특성 벡터를 다시 데이터 프레임으로 확장합니다.

+0

안녕하세요. 답장을 보내 주셔서 감사합니다. 다시 말하지만, 사용중인 메소드는 거대한 데이터 세트에 대해 매우 느린 행 조작입니다. 또한 별도의 가중치 벡터를 사용하여 행의 배열 셀을 곱하기 때문에 mllib의 Elementwise 제품이 작동하지 않습니다. – spectrum

관련 문제