2011-03-01 5 views
2

Twisted Python을 사용하여 Redis 위에 키/값 데이터를 저장하는 서버를 구축하고 있습니다. 서버는 HTTP를 통해 JSON 사전을받습니다.이 사전은 파이썬 사전으로 변환되어 버퍼에 저장됩니다. 새로운 데이터가 저장 될 때마다 서버는 버퍼에서 한 사전을 팝하는 작업을 예약하고 txredis 클라이언트를 사용하여 모든 튜플을 Redis 인스턴스에 씁니다. Twisted Python의 또 다른 생산자/소비자 문제

class Datastore(Resource): 

isLeaf = True 

def __init__(self): 
    self.clientCreator = protocol.ClientCreator(reactor, Redis) 
    d = self.clientCreator.connectTCP(...) 
    d.addCallback(self.setRedis) 
    self.redis = None 
    self.buffer = deque() 


def render_POST(self, request): 
    try: 
     task_id = request.requestHeaders.getRawHeaders('x-task-id')[0] 
    except IndexError: 
     request.setResponseCode(503) 
     return '<html><body>Error reading task_id</body></html>' 

    data = json.loads(request.content.read()) 
    self.buffer.append((task_id, data)) 
    reactor.callLater(0, self.write_on_redis) 
    return ' ' 

@defer.inlineCallbacks 
def write_on_redis(self): 
    try: 
     task_id, dic = self.buffer.pop() 
     log.msg('Buffer: %s' % len(self.buffer)) 
    except IndexError: 
     log.msg('buffer empty') 
     defer.returnValue(1) 

    m = yield self.redis.sismember('DONE', task_id) 
    # Simple check 
    if m == '1': 
     log.msg('%s already stored' % task_id) 
    else: 
     log.msg('%s unpacking' % task_id) 
     s = yield self.redis.sadd('DONE', task_id) 

     d = defer.Deferred() 
     for k, v in dic.iteritems(): 
      k = k.encode() 
      d.addCallback(self.redis.push, k, v) 

     d.callback(None) 

는 기본적으로, 두 개의 서로 다른 연결 사이의 생산자/소비자 문제에 직면하고있다,하지만 난 현재 구현이 트위스트 paradygm에서 잘 작동하는지 모르겠습니다. Twisted에서 제작자/소비자 인터페이스에 대한 작은 설명서를 읽었지만 제 경우에는 사용할 수 있을지 잘 모르겠습니다. 비평가는 환영합니다. 나는 너무 오랜 세월의 스레드 동시성 (thread concurrency) 후에 이벤트 중심 프로그래밍에 대한 이해를 얻으려고합니다.

답변

2

Twisted의 생산자 및 소비자 API IProducer 및 은 흐름 제어와 관련이 있습니다. 여기에 어떤 흐름 제어도없는 것 같습니다. 한 프로토콜에서 다른 프로토콜로 메시지를 중계하는 것뿐입니다.

흐름 제어가 없기 때문에 버퍼는 단지 복잡합니다. 데이터를 직접 write_on_redis 메소드에 전달하면 제거 할 수 있습니다. 이렇게하면 write_on_redis은 빈 버퍼 케이스를 처리 할 필요가 없으며 리소스에 별도의 속성이 필요하지 않으며 버퍼를 유지하더라도이 작업을 수행 할 수 있지만 callLater을 제거 할 수도 있습니다.

그래도이 질문에 대한 답변이 있는지 알 수 없습니다. 지금까지이 방법이 "잘 작동"여부와 같은, 여기에 단지 코드를 읽어 내가 알 수있는 것들이다 :

  • 데이터 레디 스 그것을 받아들보다 더 빨리 도착하는 경우는, 뛰어난 작업 목록은 원인이 임의로 커질 수 있습니다 메모리가 부족합니다. 이것은 흐름 제어가 도움이 될 것입니다.
  • sismember 호출 또는 sadd 호출과 관련하여 오류를 처리하지 않으면 작업 버퍼에서 이미 호출 했으므로 실패 할 경우 작업이 손실 될 수 있습니다.
  • Deferredd의 콜백으로 밀어 넣기를 수행하면 실패한 푸시로 인해 나머지 데이터가 푸시되지 않습니다. 또한 push에 의해 반환 된 Deferred의 결과를 전달합니다 (Deferred을 반환한다고 가정 함) push이 첫 번째 인수를 무시하지 않으면 올바른 데이터를 푸시하지 않습니다. 다시 시작합니다. 하지self.buffer에 추가하고 클라이언트에 일부 오류 코드를 반환 - 당신이 흐름 제어를 구현하려면

는, 당신은 당신의 HTTP 서버가 self.buffer의 길이를 확인하고 가능하면 새 작업을 거부 있어야합니다. 당신은 여전히 ​​IConsumerIProducer을 사용하지 않을 것이지만 그것은 비슷합니다.