2016-07-21 2 views
0

pyspark를 처음 사용합니다. 나는 쌍 RDD (키, 값)가 있습니다. 각 키에 대해 n 개의 버킷 막대 그래프를 만들고 싶습니다. 결과는 다음과 같을 것이다 : I가 최대 값 또는 각각의 키의 합을 검색하는 예를 보이면서Pyspark : Pair RDD의 각 키에 대한 히스토그램 작성

[(key1, [...buckets...], [...counts...]), 
(key2, [...buckets...], [...counts...])] 

하지만 히스토그램 (n)의 함수를 전달하는 방법은 각 키의 적용이있을 것이다 가치?

답변

-1

시도 :

>>> import numpy as np 
>>> 
>>> rdd.groupByKey().map(lambda (x, y): np.histogram(list(y))) 
+0

이것은 나를 위해 작동하지 않았다. np.histogram은 groupByKey가 생성 한 'ResultIterable'을 허용하지 않습니다. – Petrichor

+0

도움이 될 것이라고 생각하는 이유를 설명하십시오. 코드 만의 대답은 종종별로 유용하지 않습니다. – Pureferret

0

나는이 게시물 오히려 오래된,하지만 여전히 PySpark 솔루션을 추구하는 사람들을 위해, 여기에 문제에 대한 내 두 센트를 알고있다.

(키, 값) 쌍 RDD를 고려해 보자. "히스토그램"으로 우리가 각 키에 대해 얼마나 많은 다른 값을 갖고 있는지, 그리고 각각의 카디널리티가 있는지에 대한 평범한 카운터를 보자.

aggregateByKey()은 좋은 방법입니다. aggregateByKey()에서 기본적으로 애그리 게이터 기본값, 파티션 내 집계 함수, 파티션 간 집계 함수라는 세 가지 입력 값을 선언합니다.

것은 우리가 양식 본인이 아는

[(124, 2), 
(124, 2), 
(124, 2), 
(125, 2), 
(125, 2), 
(125, 2), 
(126, 2), 
(126, 2), 
(126, 2), 
(127, 2), 
(127, 2), 
(127, 2), 
(128, 2), 
(128, 2), 
(128, 2), 
(129, 2), 
(129, 2), 
(129, 2), 
(130, 2), 
(130, 2), 
(130, 2), 
(131, 2), 
(131, 2), 
(131, 2), 
(132, 2), 
(132, 2), 
(132, 2), 
(133, 2), 
(133, 2), 
(133, 2), 
(134, 2), 
(134, 2), 
(134, 2), 
(135, 2), 
(135, 2), 
(135, 2), 
(136, 2), 
(136, 1), 
(136, 2), 
(137, 2), 
(137, 2), 
(137, 2), 
(138, 2), 
(138, 2), 
(138, 2), 
(139, 2), 
(139, 2), 
(139, 2), 
(140, 2), 
(140, 2), 
(140, 2), 
(141, 2), 
(141, 1), 
(141, 1), 
(142, 2), 
(142, 2), 
(142, 2), 
(143, 2), 
(143, 2), 
(143, 2), 
(144, 1), 
(144, 1), 
(144, 2), 
(145, 1), 
(145, 1), 
(145, 1), 
(146, 2), 
(146, 2), 
(146, 2), 
(147, 2), 
(147, 2), 
(147, 2), 
(148, 2), 
(148, 2), 
(148, 2), 
(149, 2), 
(149, 2), 
(149, 2), 
(150, 2), 
(150, 2), 
(150, 2), 
(151, 2), 
(151, 2), 
(151, 2), 
(152, 2), 
(152, 2), 
(152, 2), 
(153, 2), 
(153, 1), 
(153, 2), 
(154, 2), 
(154, 2), 
(154, 2), 
(155, 2), 
(155, 1), 
(155, 2), 
(156, 2), 
(156, 2), 
(156, 2), 
(157, 1), 
(157, 2), 
(157, 2), 
(158, 2), 
(158, 2), 
(158, 2), 
(159, 2), 
(159, 2), 
(159, 2), 
(160, 2), 
(160, 2), 
(160, 2), 
(161, 2), 
(161, 1), 
(161, 2), 
(162, 2), 
(162, 2), 
(162, 2), 
(163, 2), 
(163, 1), 
(163, 2), 
(164, 2), 
(164, 2), 
(164, 2), 
(165, 2), 
(165, 2), 
(165, 2), 
(166, 2), 
(166, 1), 
(166, 2), 
(167, 2), 
(167, 2), 
(167, 2), 
(168, 2), 
(168, 1), 
(168, 1), 
(169, 2), 
(169, 2), 
(169, 2), 
(170, 2), 
(170, 2), 
(170, 2), 
(171, 2), 
(171, 2), 
(171, 2), 
(172, 2), 
(172, 2), 
(172, 2), 
(173, 2), 
(173, 2), 
(173, 1), 
(174, 2), 
(174, 1), 
(174, 1), 
(175, 1), 
(175, 1), 
(175, 1), 
(176, 1), 
(176, 1), 
(176, 1), 
(177, 2), 
(177, 2), 
(177, 2)] 

에 대한 RDD을 가지고 생각해 보자, 할 수있는 가장 쉬운 방법은 사전 키입니다 파이썬 사전에 따라 각 키의 집계 값이다 RDD 값 및 각 사전 키와 관련된 값은 각 RDD 값에 대해 얼마나 많은 RDD 값이 있는지에 대한 카운터입니다. aggregateByKey() 기능이 자동으로 RDD 키를 처리하므로 RDD 키를 고려할 필요가 없습니다.

집계 호출의 형식은 우리가 빈 사전으로 모든 축전지를 초기화

myRDD.aggregateByKey(dict(), withinPartition, betweenPartition) 

있습니다.

집계 기능, 즉, 다음과 같은 형태 dictionary은 당 RDD 값 카운터가

def withinPartition(dictionary, record): 
    if record in dictionary.keys(): 
     dictionary[record] += 1 
    else: 
     dictionary[record] = 1 
    return dictionary 

, record 반면 소정 RDD 값 (정수이고, 본 실시 예에서 참조한다 있습니다 내에 파티션

위의 RDD 예제). 기본적으로 주어진 RDD 값이 이미 사전에 존재하면 +1 카운터가 증가합니다. 그렇지 않으면 카운터를 초기화합니다.

사이에 파티션 함수는 주어진 RDD 키를, 우리가 두 개의 사전을 가지고 생각해 보자, 같은

def betweenPartition(dictionary1, dictionary2): 
    return {k: dictionary1.get(k, 0) + dictionary2.get(k, 0) for k in set(dictionary1) | set(dictionary2)} 

기본적으로 거의 작동합니다. 주어진 두 키의 값을 합산하거나 두 개의 사전 중 하나에 존재하지 않으면 주어진 키를 추가하여 (logic OR) 두 사전을 고유 한 사전에 병합합니다. 사전 병합을위한 georg's solution in this post의 크레딧

결과 RDD는

[(162, {2: 3}), 
(132, {2: 3}), 
(168, {1: 2, 2: 1}), 
(138, {2: 3}), 
(174, {1: 2, 2: 1}), 
(144, {1: 2, 2: 1}), 
(150, {2: 3}), 
(156, {2: 3}), 
(126, {2: 3}), 
(163, {1: 1, 2: 2}), 
(133, {2: 3}), 
(169, {2: 3}), 
(139, {2: 3}), 
(175, {1: 3}), 
(145, {1: 3}), 
(151, {2: 3}), 
(157, {1: 1, 2: 2}), 
(127, {2: 3}), 
(128, {2: 3}), 
(164, {2: 3}), 
(134, {2: 3}), 
(170, {2: 3}), 
(140, {2: 3}), 
(176, {1: 3}), 
(146, {2: 3}), 
(152, {2: 3}), 
(158, {2: 3}), 
(129, {2: 3}), 
(165, {2: 3}), 
(135, {2: 3}), 
(171, {2: 3}), 
(141, {1: 2, 2: 1}), 
(177, {2: 3}), 
(147, {2: 3}), 
(153, {1: 1, 2: 2}), 
(159, {2: 3}), 
(160, {2: 3}), 
(130, {2: 3}), 
(166, {1: 1, 2: 2}), 
(136, {1: 1, 2: 2}), 
(172, {2: 3}), 
(142, {2: 3}), 
(148, {2: 3}), 
(154, {2: 3}), 
(124, {2: 3}), 
(161, {1: 1, 2: 2}), 
(131, {2: 3}), 
(167, {2: 3}), 
(137, {2: 3}), 
(173, {1: 1, 2: 2}), 
(143, {2: 3}), 
(149, {2: 3}), 
(155, {1: 1, 2: 2}), 
(125, {2: 3})] 

원래 RDD 키는 여전히이 새로운 RDD에서 찾을 수 있습니다 양식을해야합니다. 각각의 새 RDD 값은 사전입니다.차례로 각 사전 키는 가능한 RDD 값 중 하나에 해당하는 반면 각 사전 값은 주어진 RDD 값이 각 RDD 키에 대해 몇 번 존재하는지의 계수기입니다.

관련 문제