2014-07-11 6 views
1
)

나는 Spark에로드하는 HDFS에서 간단한 데이터 세트를 가지고 있습니다. 이것은 다음과 같습니다 :Spark : RDD 맵에서 반복자 람다 함수 사용하기 (

1 1 1 1 1 1 1 
1 1 1 1 1 1 1 
1 1 1 1 1 1 1 
... 

기본적으로 매트릭스입니다. 나는 그룹 행렬 행을 필요로 뭔가를 구현하기 위해 노력하고있어, 그래서 나는과 같이 모든 행에 대한 고유 키를 추가하기 위해 노력하고있어 : 전역 변수를 설정하고 쓰기 :

(1, [1 1 1 1 1 ... ]) 
(2, [1 1 1 1 1 ... ]) 
(3, [1 1 1 1 1 ... ]) 
... 

내가 다소 순진 뭔가를 시도 lambda 함수를 사용하여 전역 변수를 반복 할 수 있습니다.

# initialize global index 
global global_index 
global_index = 0 

# function to generate keys 
def generateKeys(x): 
    global_index+=1 
    return (global_index,x) 

# read in data and operate on it 
data = sc.textFile("/data.txt") 

...some preprocessing... 

data.map(generateKeys) 

그리고 전역 변수의 존재를 인식하지 못하는 것처럼 보입니다.

쉬운 방법이 있나요?

감사합니다, 잭

+0

신경 쓰지 마세요. 글로벌 변수 솔루션이 정상적으로 작동하는 것 같습니다. 그러나, 나는 이것을하는 더 우아한 방법을 보는 것에 흥미가있을 것이다. – Jack

+1

하나의 Spark worker 만 사용하지 않는 한 글로벌 변수 솔루션은 올바르지 않습니다. 왜냐하면 각 worker는 자신의 'global_index' 복사본을 가지고있을 것이고 중복 키 할당을 취소 할 것이기 때문입니다. –

+0

또한 https://stackoverflow.com/questions/23939153/how-to-assign-unique-contiguous-numbers-to-elements-in-a-spark-rdd에서 확인하십시오 –

답변

2
>>> lsts = [ 
...  [1, 1, 1, 1, 1, 1], 
...  [1, 1, 1, 1, 1, 1], 
...  [1, 1, 1, 1, 1, 1], 
...  [1, 1, 1, 1, 1, 1], 
...  [1, 1, 1, 1, 1, 1], 
...  [1, 1, 1, 1, 1, 1], 
...  [1, 1, 1, 1, 1, 2], 
...  [1, 1, 1, 2, 1, 2] 
...  ] 
... 
>>> list(enumerate(lsts)) 
[(0, [1, 1, 1, 1, 1, 1]), 
(1, [1, 1, 1, 1, 1, 1]), 
(2, [1, 1, 1, 1, 1, 1]), 
(3, [1, 1, 1, 1, 1, 1]), 
(4, [1, 1, 1, 1, 1, 1]), 
(5, [1, 1, 1, 1, 1, 1]), 
(6, [1, 1, 1, 1, 1, 2]), 
(7, [1, 1, 1, 2, 1, 2])] 

enumerate 값이 0 이외의 번호 매기기를 시작하려면 (index, original_item)

와 반복 가능한 수율 튜플의 각 항목에 대해 고유 인덱스를 생성, 시작 값을 전달 제 2 파라미터로서 enumerate으로 설정한다.

>>> list(enumerate(lsts, 1)) 
[(1, [1, 1, 1, 1, 1, 1]), 
(2, [1, 1, 1, 1, 1, 1]), 
(3, [1, 1, 1, 1, 1, 1]), 
(4, [1, 1, 1, 1, 1, 1]), 
(5, [1, 1, 1, 1, 1, 1]), 
(6, [1, 1, 1, 1, 1, 1]), 
(7, [1, 1, 1, 1, 1, 2]), 
(8, [1, 1, 1, 2, 1, 2])] 

참고 list이 반복자와 목록을 반환하지 않는 함수입니다 enumerate에서 실제 값을 얻기 위해 사용된다.

대안 : 전 세계적으로 사용 가능한 ID를 지정 기

enumerate를 사용하기 쉽습니다,하지만 당신은 코드의 diferrent 조각 ID를 assing 할 필요가 있다면, 그것은 어렵거나 불가능하게 될 것입니다. 이 경우 전 세계적으로 사용 가능한 발전기 ( OP의 기안자)가 될 수 있습니다.

itertools은 우리의 필요를 제공 할 수 count 제공 :

>>> from itertools import count 
>>> idgen = count() 

이제 우리는 (세계적으로 사용 가능) 고유 ID를 생성 할 idgen 발전기 준비.

우리는 기능 prid (프린트 ID)하여 테스트 할 수

>>> def prid(): 
...  id = idgen.next() 
...  print id 
... 
>>> prid() 
0 
>>> prid() 
1 
>>> prid() 
2 
>>> prid() 
3 

우리가 값 목록에서 테스트 할 수 있습니다 작동과 같이 실제 기능을

>>> lst = ['100', '101', '102', '103', '104', '105', '106', '107', '108', '109'] 

및 정의하는 경우 값으로 불리는 값은 튜플을 반환합니다. (id, value)

>>> def assignId(val): 
...  return (idgen.next(), val) 
... 

idgen을 전역으로 선언 할 필요가 없으므로 값을 변경하지 않습니다 (idgen은 호출시 내부 상태 만 변경하지만 동일한 생성기로 유지됩니다).

>>> assignId("ahahah") 
(4, 'ahahah') 

를하고 목록에 그것을 시도 :

테스트, 그것은 작동하는 경우

>>> map(assignId, lst) 
[(5, '100'), 
(6, '101'), 
(7, '102'), 
(8, '103'), 
(9, '104'), 
(10, '105'), 
(11, '106'), 
(12, '107'), 
(13, '108'), 
(14, '109')] 

주요 diferrence enumerate에 솔루션입니다, 우리는 어디 코드에 의해 식별자를 하나씩 할당 할 수 있습니다 모든 것을 처리하지 않고 모든 처리 enumerate.

>>> assignId("lonely line") 
(15, 'lonely line') 
+0

네,이게 효과가 있다고 생각합니다! 대단히 감사합니다 Jan. – Jack

+0

이렇게하려면'collect()'가 필요합니다 - RDD를 다시 드라이버에 연결해야합니까? 작은 데이터 세트의 경우이 작업은 가능하지만 여기에서 사용 된'enumerate()'는 병렬 처리가 가능하지 않기 때문에 이것이 확장 될 것이라고 생각하지 않습니다. –

+0

@NickChammas'enumerate'는 모든 결과를 수집 할 필요가 없습니다. 'itertools import count' 다음에 itr = enumerate (count())'를 시도하고 마지막으로'itr.next()'를 반복합니다. 'count()'iterable이 거의 무한한 수의 숫자를 제공 할 수있는 반면, 결과는 하나씩 반환합니다. 병렬화 관련 - 질문은'map'을 사용하고 있으므로 비슷한 시나리오가 예상됩니다. 여러분은 병렬 처리를 위해 일부 전역 ID 생성기가 필요할 것입니다. –

0

dataRdd.zipWithIndex을 시도하고 첫번째 인덱스를 갖는 것은 필수입니다 경우 결국 결과 튜플을 교환합니다.