2016-10-31 4 views
1

나는 인 - 애플 리케이션 사용자 행동을 추적하기 위해 코호트 연구를 시도하고 있는데, 나는 .join을 사용할 때 pyspark에서 조건을 어떻게 지정할 수 있는지에 대해 알고 싶다.) 감안할 때 :코호트 연구를위한 Pyspark와 내부 조인

rdd1 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8', 
    ((u'service1', 
     u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', 
     u'2016-02-08', 
     u'2016-39', 
     u'2016-6', 
     u'2016-2', 
     '2016-10-19'), 
    (u'service2', 
     u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', 
     u'1', 
     u'67.0', 
     u'2016-293', 
     u'2016-42', 
     u'2016-10', 
     '2016-10-19')))]) 


rdd2 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8', 
    ((u'serice1', 
     u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', 
     u'2016-02-08', 
     u'2016-39', 
     u'2016-6', 
     u'2016-2', 
     '2016-10-20'), 
    (u'service2', 
     u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', 
     u'10', 
     u'3346.0', 
     u'2016-294', 
     u'2016-42', 
     u'2016-10', 
     '2016-10-20')))]) 

그 두 rdds이 ID로 '6df99638e4584a618f92a9cfdf318cf8'로, 사용자에 대한 정보를 나타내며, 누가 2016년 10월 19일 및 2016년 10월 20일에 서비스 1, 서비스 2에 기록했다. 내 목표는 두 개의 rdd에 참여하는 것인데 각각은 최소 20,000 개의 행을 포함합니다. 따라서 내부 조인이어야합니다. 진짜 목표는 2016-10-19 '에 이미 로그인 한 모든 사용자를 얻고 2016-10-20에 로그인 한 것입니다. 그래서 좀 더 구체적으로 말하자면, 최종 목표는 rxemple에 대해 내부 조인 후 rdd2의 내용을 결과로 가져 오는 것입니다.

예상 출력 :

[(u'6df99638e4584a618f92a9cfdf318cf8', 
((u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'), 
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20')) 
) ] 

간단한 rdd1.join (rdd2) 날 수득 가입 논리적 두 rdds 일치하는 모든 요소를 ​​포함하는 쌍 RDD. leftOuterJoin 또는 rightOuterJoin은 내부 조인 (rdd1과 rdd2에 이미있는 ID)을 원하기 때문에 내 토지에도 적합하지 않습니다.

예상 출력 : dict1 = { 'a '남자', 'b': 여자, 'c': '아기'} 그리고 dict2 = { 'a': 'Zara', 'x': 망고, 'y': 'Celio'}. 예상 출력은 다음과 같아야합니다 : output_dict = { 'a': 'Zara'}. 'a'(키)는 이미 dict 1에 존재하며, 원하는 것은 dict2의 핵심입니다.

그것은이 작업을 수행하려고 :

rdd1.map(lambda (k, v) : k).join(rdd2) 

이 코드는 나에게 빈 RDD을 제공합니다.

enter image description here

무엇을 할 수 ? 추신 : 나는 데이터 프레임이 아닌 rdds를 다루어야합니다! 그래서 나는 내 rdd를 DataFrames로 변환하고 싶지 않습니다 : D 도움을 주시면 감사하겠습니다. 고마워 !

+0

예상 출력은 무엇인가? – Yaron

+0

@Yaron [(u'6df99638e4584a618f92a9cfdf318cf8 ' ((u'serice1' u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A ' u'2016-02-08' u'2016-39 ' U를, '2016년 10월 20일') (u'service2 ' u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A' ' u'2016-2'u'2016-6 '10', u'3346.0 ' u'2016-294' u'2016-42 ' u'2016-10' '2016년 10월 20일'))) ] – DataAddicted

+0

@Yaron : rdd2의 내용. rdd1 (2016-10-19) 및 rdd2 (2016-10-20)에있는 사용자를 찾고 있습니다. – DataAddicted

답변

2

그래서, 당신은 단지 rdd2에서 키와 값을 것이다, rdd1 및 rdd2의 참여를 찾고 있습니다 :

rdd_output = rdd1.join(rdd2).map(lambda (k,(v1,v2)):(k,v2)) 

결과는 다음과 같습니다

print rdd_output.take(1) 

[(u'6df99638e4584a618f92a9cfdf318cf8', (
(u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'), 
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20') 
))] 
+0

소리가 좋습니다! 고맙다 @ 야론! – DataAddicted