저는 Jira changelog 히스토리 데이터를 처리하고 있습니다. 많은 양의 데이터와 처리 시간의 대부분이 I/O 기반이라는 사실 때문에 비동기식 접근 방식이 잘 작동 할 수 있다고 생각했습니다.concurrent.futures.ThreadPoolExecutor에서 교착 상태가 발생하지 않고 잠금을 사용하는 방법?
나는 내가 jira-python
API를 통해 요청을하는 기능으로 공급하고있어 모든 issue_id
년대의 목록이하는 dict
에 정보를 추출, 다음 DictWriter
전달을 통해 그것을 써 있습니다. 스레드 세이프 (threadafe safe)로 만들기 위해 나는 threading
모듈에서 Lock()
을 가져 왔습니다. 테스트 중에는 특정 지점에서 교착 상태가되어 멈춘 것 같습니다. 문서에서 작업이 서로 의존적이라면 행할 수 있으며, 구현중인 잠금으로 인해 발생한다고 가정합니다. 어떻게 이런 일이 일어나지 않도록 할 수 있습니까?
def write_issue_history(
jira_instance: JIRA,
issue_id: str,
writer: DictWriter,
lock: Lock):
logging.debug('Now processing data for issue {}'.format(issue_id))
changelog = jira_instance.issue(issue_id, expand='changelog').changelog
for history in changelog.histories:
created = history.created
for item in history.items:
to_write = dict(issue_id=issue_id)
to_write['date'] = created
to_write['field'] = item.field
to_write['changed_from'] = item.fromString
to_write['changed_to'] = item.toString
clean_data(to_write)
add_etl_fields(to_write)
print(to_write)
with lock:
print('Lock obtained')
writer.writerow(to_write)
if __name__ == '__main__':
with open('outfile.txt', 'w') as outf:
writer = DictWriter(
f=outf,
fieldnames=fieldnames,
delimiter='|',
extrasaction='ignore'
)
writer_lock = Lock()
with ThreadPoolExecutor(max_workers=5) as exec:
for key in keys[:5]:
exec.submit(
write_issue_history,
j,
key,
writer,
writer_lock
)
EDIT (코드에서이 시점에서 모든 ISSUE_ID의와 keys
라는 목록이) : 여기