2016-07-14 5 views
1

저는 Jira changelog 히스토리 데이터를 처리하고 있습니다. 많은 양의 데이터와 처리 시간의 대부분이 I/O 기반이라는 사실 때문에 비동기식 접근 방식이 잘 작동 할 수 있다고 생각했습니다.concurrent.futures.ThreadPoolExecutor에서 교착 상태가 발생하지 않고 잠금을 사용하는 방법?

나는 내가 jira-python API를 통해 요청을하는 기능으로 공급하고있어 모든 issue_id 년대의 목록이하는 dict에 정보를 추출, 다음 DictWriter 전달을 통해 그것을 써 있습니다. 스레드 세이프 (threadafe safe)로 만들기 위해 나는 threading 모듈에서 Lock()을 가져 왔습니다. 테스트 중에는 특정 지점에서 교착 상태가되어 멈춘 것 같습니다. 문서에서 작업이 서로 의존적이라면 행할 수 있으며, 구현중인 잠금으로 인해 발생한다고 가정합니다. 어떻게 이런 일이 일어나지 않도록 할 수 있습니까?

def write_issue_history(
     jira_instance: JIRA, 
     issue_id: str, 
     writer: DictWriter, 
     lock: Lock): 
    logging.debug('Now processing data for issue {}'.format(issue_id)) 
    changelog = jira_instance.issue(issue_id, expand='changelog').changelog 

    for history in changelog.histories: 
     created = history.created 
     for item in history.items: 
      to_write = dict(issue_id=issue_id) 
      to_write['date'] = created 
      to_write['field'] = item.field 
      to_write['changed_from'] = item.fromString 
      to_write['changed_to'] = item.toString 
      clean_data(to_write) 
      add_etl_fields(to_write) 
      print(to_write) 
      with lock: 
       print('Lock obtained') 
       writer.writerow(to_write) 

if __name__ == '__main__': 
    with open('outfile.txt', 'w') as outf: 
       writer = DictWriter(
        f=outf, 
        fieldnames=fieldnames, 
        delimiter='|', 
        extrasaction='ignore' 
       ) 
       writer_lock = Lock() 
       with ThreadPoolExecutor(max_workers=5) as exec: 
        for key in keys[:5]: 
         exec.submit(
          write_issue_history, 
          j, 
          key, 
          writer, 
          writer_lock 
         ) 

EDIT (코드에서이 시점에서 모든 ISSUE_ID의와 keys라는 목록이) : 여기

참조 내 코드입니다 또한 내가되고있어 매우 가능 Jira API에 의해 조절되었습니다.

답변

0

futs이라는 이름의 목록에 exec의 결과를 저장 한 다음 해당 목록을 반복하여 결과를 얻으려면 발생한 오류를 처리해야합니다.

(즉 더 기존의 나는 또한 executorexec를 기회 것 그리고 내장 된 재정의 방지)

from traceback import print_exc 

... 

with ThreadPoolExecutor(max_workers=5) as executor: 
    futs = [] 
    for key in keys[:5]: 
     futs.append(executor.submit(
      write_issue_history, 
      j, 
      key, 
      writer, 
      writer_lock) 
     ) 

for fut in futs: 
    try: 
     fut.result() 
    except Exception as e: 
     print_exc() 
관련 문제