2013-10-24 5 views
10

파이썬의 multiprocessing.Manager을 사용하여 하나의 프로세스가 생성하고 다른 프로세스가 볼 수있는 데이터 세트에 대한 액세스를 공유합니다. 그러나 manager.dict()에 의해 반환 된 dict 프록시가 iteritems()을 지원하지 않는 문제에 직면하고 있습니다.파이썬에서 dict 프록시를 반복하는 방법은 무엇입니까?

나는 items()을 반복 할 수 있지만, 이는 큰 수인 dict의 모든 항목의 새로운 튜플을 만드는 것을 의미합니다. 중간 목록/튜플을 생성하지 않고이를 수행하는 방법이 있습니까? 따라서 일정량의 추가 메모리 만 사용하면됩니까?

참고 : 솔루션에서 반복을 위해 생성 프로세스가 일시 중지되어야하는 것이면 문제가되지 않습니다.

+0

당신은'SyncManager'를 사용하고'iteritems'가 노출 된 자신의 프록시를 등록했다고 생각 했습니까? – oleg

+1

@oleg iterators가 리턴하는 dict iterator는 pickleable이 아니기 때문에 단순히 iteritem을 노출 할 수는 없습니다. 그게 기본 dict 프록시가 그것을 노출하고 따라서 질문을 노출하지 않는 이유입니다. – otus

+0

나는 "간단하게"폭로한다고 말하지 않았다. :)'iteritems'을 노출하기 위해'IteratorProxy'를 사용할 수 있습니까? – oleg

답변

2

keys()을 반복하여 메모리 사용 공간을 줄일 수 있습니다. 키가 삭제되지 않도록주의해야합니다.

그렇지 않은 경우 다음은 두 가지 방법으로 예제를 작성하는 예제입니다.이 두 가지 방법을 통해 사전에있는 항목을 반복 할 수 있습니다. 이 예의 iteritems() 메서드는 관리자 개체를 만드는 프로세스와 관리자 개체가 만드는 자식 프로세스에서만 작동합니다. 이는 관리자 객체가 새 프록시를 만드는 데 필요하고 다른 프로세스는 프록시에 액세스 할 수 없기 때문입니다. iteritems2() 메서드는 해당 프로세스에서 새 프록시를 만드는 데 의존하지 않으므로 다른 프로세스에서 작동합니다.

import multiprocessing as mp 
import multiprocessing.managers 

class mydict(dict): 
    def __init__(self, *args, **kwargs): 
     dict.__init__(self, *args, **kwargs) 
     self.iters = {} 

    def iteritems(self): 
     print "iteritems", mp.current_process() 
     return dict.iteritems(self) 

    def _iteritems_start(self): 
     print "_iteritems_start", mp.current_process() 
     i = dict.iteritems(self) 
     self.iters[id(i)] = i 
     return id(i) 

    def _iteritems_next(self, iter_id): 
     try: 
      return self.iters[iter_id].next() 
     except StopIteration: 
      del self.iters[iter_id] 
      return None 

class mydict_proxy(mp.managers.DictProxy): 
    def iteritems(self): 
     print "iteritems proxy", mp.current_process() 
     return self._callmethod("iteritems") 

    def iteritems2(self): 
     print "iteritems2 proxy", mp.current_process() 
     iter_id = self._callmethod("_iteritems_start") 
     def generator(): 
      while True: 
       a = self._callmethod("_iteritems_next", 
          (iter_id,)) 
       if a == None: 
        return 
       yield a 
     return generator() 

    _method_to_typeid_ = { "iteritems": "Iterator" } 
    _exposed_ = mp.managers.DictProxy._exposed_ 
    _exposed_ += ("iteritems", "_iteritems_start", "_iteritems_next") 

class mymanager(mp.managers.BaseManager): 
    pass 
mymanager.register("mydict", mydict, mydict_proxy) 
mymanager.register("Iterator", proxytype = mp.managers.IteratorProxy, 
      create_method = False) 

def other(d): 
    for k, v in d.iteritems2(): 
     d[k] = v.lower() 
    for k, v in d.iteritems(): 
     d[k] = ord(v) 

def main(): 
    manager = mymanager() 
    manager.start() 
    d = manager.mydict(list(enumerate("ABCDEFGHIJKLMNOP"))) 
    for (k, v) in d.iteritems(): 
     print k, v 
    proc = mp.Process(target = other, args = (d,)) 
    proc.start() 
    proc.join() 
    for (k, v) in d.iteritems(): 
     print k, v 

if __name__ == "__main__": 
    main() 

이 코드는 메모리 효율이 높지만 아마도 느린 속도 일 수 있습니다.

-2

iteritems()이고, 목록은입니다. for 루프를 사용할 수 있습니다. 또는 sorted()이라고 말하면 정렬 된 목록의 키를 반환 한 다음 해당 목록을 반복하고 dict[key]을 수행 할 수 있습니다. 희망이 도움이됩니다. 더 좋은 방법이 있다면. 나와 공유하십시오. 나는 알고 싶어 죽겠다.

0

SyncManager 클래스를 사용하여 자신의 유형을 등록 할 수 있습니다. 그런 다음 해당 유형의 메소드를 구현할 수 있습니다. dict에서 제한된 수의 항목 만 가져 오는 것입니다.

따라서
import multiprocessing 
from multiprocessing import managers 


class TakerDict(dict): 
    """Like a dict, but allows taking a limited number of items.""" 

    def take(self, items=1): 
     """Take the first `items` items.""" 
     return [item for _, item in zip(range(items), self.items())] 


# NOTE: add other dict methods to the tuple if you need them. 
TakerProxy = managers.MakeProxyType('TakerProxy', ('take',)) 

managers.SyncManager.register('taker', TakerDict, TakerProxy) 


if __name__ == '__main__': 
    manager = multiprocessing.Manager() 
    taker = manager.taker() 
    # in other processes, use e.g. taker.take(5) 

, 메모리 사용을 제한하기 위해, 당신은 요소의 다음 배치를 얻기 위해 반복적으로 관리자 프로세스를 호출 할 것이다 :

다음은 시작하는 예입니다.

그러나이를 수행하려면 사용자의 사전이 색인 생성을 지원해야합니다 (특정 오프셋에서 재개 할 수 있도록). 사전에 요소의 기본 순서에 액세스 할 수 없기 때문에 목록 대신 (예 : manager.list()) 목록을 사용하는 것이 좋습니다. 그런 다음 하위 프로세스에서 목록의 len()을 요청하고 슬라이스별로 색인을 만들어 적절한 크기의 일괄 처리를 가져옵니다. 프록시 유형을 등록 할 필요가 없습니다.

+2

기본적으로 질문에 언급 된 "목록으로 변환"해결 방법을 구현하지 않습니까? 다소 복잡합니다.이것은 (리스트를 필요로하는 메모리 사용의) 문제를 전혀 해결하지 못합니다. – otus

+0

글쎄, 이것은 결국 데이터를 목록으로 변환하므로 메모리 오버 헤드가 발생합니다. 단지 청크에서 그렇게하므로 오버 헤드가 많지 않습니다. 필자는 * IteratorProxy 접근법보다 성능이 좋지 않다고 생각하지만, 아무것도 측정하지 않았습니다. –

+0

실제로는 청크를 수행하지 않는다는 점을 제외하면 : * "이렇게하려면 인덱싱을 지원해야합니다. 따라서 특정 오프셋에서 다시 시작할 수 있습니다."* – otus

관련 문제