2017-11-22 4 views
2

여기서부터 this question으로 링크 될 새 주제가 시작됩니다.매우 긴 I/O 프로세스가 스레드로 처리 될 수 있습니까?

글로벌 아이디어를 얻으려면 배경을 읽는 것이 좋습니다.

그래서 파이썬 3.2 API (민간 기업에서 개발)에 의존하는 다운로드 기능이 있습니다. 이 프로세스는 파일 당 최대 400 초까지 걸릴 수 있습니다.

물론 다운로드 할 파일이 하나뿐이므로 모든 다운로드 프로세스를 스레드 풀에 넣으려고 몇 일 동안 노력했습니다. 풀의 각 스레드는 GUI 주 스레드에서 완전히 자율적이어야합니다. 그 중 하나가 끝나면 GUI에 신호를 보냅니다.

나는 몇 가지 테스트를했지만이 기술을 사용 어떤하지만

  1. GUI는 동결되고;
  2. 결과는 모든 스레드의 처리가 끝날 때만 제공되며 원하는 경우가 아니라 하나씩 제공됩니다.

API에서 제공하는 다운로드 방법은 스레드 할 수없는 차단 기능이라고 생각합니다.

제 질문은 간단합니다. 어떻게하면 스레드를 통해 I/O 메소드를 처리 할 수 ​​있는지 알 수 있습니다.


11월 24,2017 업데이트 당신은 부분적으로 내 기대를 충족 (탠덤 multiprocessing.pool/map_async 포함) 초안를 확인할 수

. 보시다시피, 불행히도 QPlainTextEdit에서 어떤 일이 벌어 졌는지에 대한 정보를 얻으려면 "Busy Waiting Loop"를 삽입해야했습니다.

작업 결과는 전역 처리 (동작 map_async) 끝에서만 제공됩니다. 그게 내가 원하는 건 아니야. 조금 더 실시간을 삽입하고 완료된 각 작업에 대한 메시지를 콘솔에서 즉시 확인하고 싶습니다. 언뜻

import time 
import multiprocessing 
import private.library as bathy 
from PyQt4 import QtCore, QtGui 
import os 
import sys 

user = 'user' 
password = 'password' 
server = 'server' 
basename = 'basename' 

workers = multiprocessing.cpu_count() 

node = bathy.NodeManager(user, password, server) 
database = node.get_database(basename) 

ids = (10547, 3071, 13845, 13846, 13851, 13844, 5639, 4612, 4613, 954, 
     961, 962, 4619, 4620, 4622, 4623, 4624, 4627, 4628, 4631, 
     4632, 4634, 4635, 4638, 4639, 4640, 4641, 4642, 10722, 1300, 
     1301, 1303, 1310, 1319, 1316, 1318, 1321, 1322, 1323, 1324, 
     1325, 1347, 1348, 1013, 1015, 1320, 8285, 8286, 8287, 10329, 
     9239, 9039, 5006, 5009, 5011, 5012, 5013, 5014, 5015, 5025, 
     5026, 4998, 5040, 5041, 5042, 5043, 11811, 2463, 2464, 5045, 
     5046, 5047, 5048, 5049, 5053, 5060, 5064, 5065, 5068, 5069, 
     5071, 5072, 5075, 5076, 5077, 5079, 5080, 5081, 5082, 5083, 
     5084, 5085, 5086, 5087, 5088, 5090, 5091, 5092, 5093) 


# --------------------------------------------------------------------------------- 
def download(surface_id, index): 
    global node 
    global database 

    t = time.time() 
    message = 'Surface #%d - Process started\n' % index 

    surface = database.get_surface(surface_id) 
    metadata = surface.get_metadata() 
    file_path = os.path.join("C:\\Users\\philippe\\Test_Download", 
          metadata["OBJNAM"] + ".surf") 

    try: 
     surface.download_bathymetry(file_path) 
    except RuntimeError as error: 
     message += "Error : " + str(error).split('\n')[0] + '\n' 
    finally: 
     message += ('Process ended : %.2f s\n' % (time.time() - t)) 

    return message 


# --------------------------------------------------------------------------------- 
def pass_args(args): 
    # Method to pass multiple arguments to download (multiprocessing.Pool) 
    return download(*args) 


# --------------------------------------------------------------------------------- 
class Console(QtGui.QDialog): 
    def __init__(self): 
     super(self.__class__, self).__init__() 

     self.resize(600, 300) 
     self.setMinimumSize(QtCore.QSize(600, 300)) 
     self.setWindowTitle("Console") 
     self.setModal(True) 

     self.verticalLayout = QtGui.QVBoxLayout(self) 

     # Text edit 
     # ------------------------------------------------------------------------- 

     self.text_edit = QtGui.QPlainTextEdit(self) 
     self.text_edit.setReadOnly(True) 
     self.text_edit_cursor = QtGui.QTextCursor(self.text_edit.document()) 
     self.verticalLayout.addWidget(self.text_edit) 

     # Ok/Close 
     # ------------------------------------------------------------------------- 
     self.button_box = QtGui.QDialogButtonBox(self) 
     self.button_box.setStandardButtons(QtGui.QDialogButtonBox.Close | 
              QtGui.QDialogButtonBox.Ok) 
     self.button_box.setObjectName("button_box") 
     self.verticalLayout.addWidget(self.button_box) 

     # Connect definition 
     # ------------------------------------------------------------------------- 

     self.connect(self.button_box.button(QtGui.QDialogButtonBox.Close), 
        QtCore.SIGNAL('clicked()'), 
        self.button_cancel_clicked) 
     self.connect(self.button_box.button(QtGui.QDialogButtonBox.Ok), 
        QtCore.SIGNAL('clicked()'), 
        self.button_ok_clicked) 

     # Post initialization 
     # ------------------------------------------------------------------------- 
     self.pool = multiprocessing.Pool(processes=workers) 

    # Connect functions 
    # ----------------------------------------------------------------------------- 
    def button_cancel_clicked(self): 
     self.close() 

    def button_ok_clicked(self): 
     jobs_args = [(surface_id, index) for index, surface_id in enumerate(ids)] 
     async = pool.map_async(pass_args, jobs_args) 
     pool.close() 

     # Busy waiting loop 
     while True: 
      # pool.map_async has a _number_left attribute, and a ready() method 
      if async.ready(): 
       self.write_stream("All tasks completed\n") 
       pool.join() 
       for line in async.get(): 
        self.write_stream(line) 
       break 

      remaining = async._number_left 
      self.write_stream("Waiting for %d task(s) to complete...\n" % remaining) 
      time.sleep(0.5) 


    # Other functions 
    # ----------------------------------------------------------------------------- 
    def write_stream(self, text): 
     self.text_edit.insertPlainText(text) 
     cursor = self.text_edit.textCursor() 
     self.text_edit.setTextCursor(cursor) 
     app.processEvents() 


# --------------------------------------------------------------------------------- 
if __name__ == '__main__': 
    app = QtGui.QApplication(sys.argv) 
    window = Console() 
    window.show() 
    app.exec_() 

질문

  1. 당신을위한 위의 코드는 개념 오류를 제시 하는가?
  2. 이 특별한 경우에 apply_async 메소드를 사용하여 대화식을 더 많이 사용해야합니까?
  3. 콜백 함수를 사용하여 콘솔을 업데이트하기 위해 맞춤 이벤트를 게시하는 방법 (@ekhumoro가 제안한 방법론)을 안내해 주시겠습니까?이 식으로 작업을 수행하여, 불행하게도

    def write_stream(self, text): 
        # This is called whenever pool.apply_async(i) returns a result 
        self.text_edit.insertPlainText(text) 
        cursor = self.text_edit.textCursor() 
        self.text_edit.setTextCursor(cursor) 
        # Update the text edit 
        app.processEvents() 
    

    :

    def button_ok_clicked(self): 
        # Pool.apply_async - the call returns immediately instead of 
        # waiting for the result 
        for index, surface_id in enumerate(ids): 
         async = pool.apply_async(download, 
               args=(surface_id, index), 
               callback=self.write_stream) 
        pool.close() 
    

    콜백과 :


11월 25,2017 업데이트

나는 apply_async와 시도했다 응용 프로그램이 충돌합니다. 모든 작업이 동시에 텍스트 편집에 쓰지 못하도록 잠금 장치를 두어야한다고 생각합니다.

+0

사용 [멀티 프로세싱 (https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing) 대신 멀티 스레딩. – ekhumoro

+0

@ekhumoro 100 개의 객체를 다운로드하기 위해 tandem multiprocessing.pool/map_async를 사용해 보았습니다. 성공했습니다. 그러나 슬롯/신호 메커니즘을 잃어 버리고 있기 때문에 QPlainTextEdit에서 진행중인 작업에 대한 정보를 얻기 위해 "바쁜 대기 루프"를 추가해야했습니다. 이 스크립트에 좀 더 실시간을 도입하는 우아한 방법이 있습니까 (즉, 작업자가 작업을 마칠 때마다 콘솔에 최종 메시지를 보냅니다). –

+0

콜백 함수를 사용하여 [post] (https://doc.qt.io/qt-5/qcoreapplication.html#postEvent) a [custom event] (https://doc.qt.io/qt- 5/qevent.html # registerEventType). – ekhumoro

답변

1

다음은 콜백을 사용하여 사용자 지정 이벤트를 게시하는 방법을 보여주는 예제 스크립트의 단순화 된 버전입니다. 각 작업은 apply_async을 통해 개별적으로 처리되므로 모든 작업이 완료된 시점을 나타내는 간단한 카운터가 업데이트됩니다.

import sys, time, random, multiprocessing 
from PyQt4 import QtCore, QtGui 

ids = (10547, 3071, 13845, 13846, 13851, 13844, 5639, 4612, 4613, 954, 
     961, 962, 4619, 4620, 4622, 4623, 4624, 4627, 4628, 4631, 
     4632, 4634, 4635, 4638, 4639, 4640, 4641, 4642, 10722, 1300, 
     1301, 1303, 1310, 1319, 1316, 1318, 1321, 1322, 1323, 1324, 
     1325, 1347, 1348, 1013, 1015, 1320, 8285, 8286, 8287, 10329, 
     9239, 9039, 5006, 5009, 5011, 5012, 5013, 5014, 5015, 5025, 
     5026, 4998, 5040, 5041, 5042, 5043, 11811, 2463, 2464, 5045, 
     5046, 5047, 5048, 5049, 5053, 5060, 5064, 5065, 5068, 5069, 
     5071, 5072, 5075, 5076, 5077, 5079, 5080, 5081, 5082, 5083, 
     5084, 5085, 5086, 5087, 5088, 5090, 5091, 5092, 5093) 

def download(surface_id, index): 
    t = time.time() 
    message = 'Surface #%s (%s) - Process started\n' % (index, surface_id) 
    time.sleep(random.random()) 
    message += 'Process ended : %.2f s\n' % (time.time() - t) 
    return message 

def pass_args(args): 
    return download(*args) 

class CustomEvent(QtCore.QEvent): 
    DownloadComplete = QtCore.QEvent.registerEventType() 

    def __init__(self, typeid, *args): 
     super().__init__(typeid) 
     self.data = args 

class Console(QtGui.QDialog): 
    def __init__(self): 
     super().__init__() 
     self.resize(600, 300) 
     self.setMinimumSize(QtCore.QSize(600, 300)) 
     self.setWindowTitle("Console") 
     self.verticalLayout = QtGui.QVBoxLayout(self) 
     self.text_edit = QtGui.QPlainTextEdit(self) 
     self.text_edit.setReadOnly(True) 
     self.text_edit_cursor = QtGui.QTextCursor(self.text_edit.document()) 
     self.verticalLayout.addWidget(self.text_edit) 
     self.button_box = QtGui.QDialogButtonBox(self) 
     self.button_box.setStandardButtons(
      QtGui.QDialogButtonBox.Close | QtGui.QDialogButtonBox.Ok) 
     self.button_box.setObjectName("button_box") 
     self.verticalLayout.addWidget(self.button_box) 
     self.button_box.button(QtGui.QDialogButtonBox.Close 
      ).clicked.connect(self.button_cancel_clicked) 
     self.button_box.button(QtGui.QDialogButtonBox.Ok 
      ).clicked.connect(self.button_ok_clicked) 
     self.pool = multiprocessing.Pool(None) 

    def event(self, event): 
     if event.type() == CustomEvent.DownloadComplete: 
      message, complete = event.data 
      self.write_stream(message) 
      if complete: 
       self.write_stream('Downloads complete!') 
     return super().event(event) 

    def button_cancel_clicked(self): 
     self.close() 

    def button_ok_clicked(self): 
     total = len(ids) 
     def callback(message): 
      nonlocal total 
      total -= 1 
      QtGui.qApp.postEvent(self, CustomEvent(
       CustomEvent.DownloadComplete, message, not total)) 
     for index, surface_id in enumerate(ids): 
      self.pool.apply_async(
       pass_args, [(surface_id, index)], callback=callback) 

    def write_stream(self, text): 
     self.text_edit.insertPlainText(text) 
     cursor = self.text_edit.textCursor() 
     self.text_edit.setTextCursor(cursor) 

if __name__ == '__main__': 

    app = QtGui.QApplication(sys.argv) 
    window = Console() 
    window.show() 
    app.exec_() 
+0

카운터를 최신 상태로 유지하는 가장 좋은 장소는 이벤트 기능 (self. task_count + = 1과 같은 것)에있는 다음이 카운터가 len (ids) 일 때 새 이벤트를 게시하는 것입니다. –

+0

이 문제에 대해 시간을내어 감사드립니다. –

관련 문제