정확히이 작업을 수행하는 방법을 검색하면서이 포스트를 발견했습니다. Ajax를 사용하여 서버에 요청을 프록시 처리하고 실행중인 스레드에 대한 모든 출력을 반환하는 대화 형 Python 콘솔을 만들고 싶습니다. 나는 그것을 알아 내고 끝내고 나의 해결책을 공유하고 싶었다.
local.LocalProxy
이라는 werkzeug
파이썬 라이브러리와 함께 제공되는 클래스가 있는데, 모듈 수준의 함수가 속성처럼 작동 할 수 있습니다. 예를 들어, 이렇게하면 sys.stdout
이 정상적으로 작동하지만 LocalProxy
클래스를 통해 프록시가 수행됩니다.
import threading
import sys
import cStringIO
import werkzeug
thread_proxies = {}
def redirect():
ident = threading.currentThread().ident
thread_proxies[ident] = cStringIO.StringIO()
return thread_proxies[ident]
def proxy():
ident = threading.currentThread().ident
return thread_proxies.get(ident, sys.stdout)
sys.stdout = werkzeug.local.LocalProxy(proxy)
그리고 어떤 스레드에서 내가 리디렉션하려는, I : 그것은 다른 스레드의 경우
import sys
import werkzeug
sys.stdout = werkzeug.local.LocalProxy(lambda: sys.stdout)
이에 확장, 나는 다음 StringIO
개체를 반환 위의 lambda
대신 함수를 썼다 바로 호출 할 수 있습니다
string_io = redirect()
그리고 대신에 지금 StringIO
객체에 기록됩니다 sys.stdout
에 갈 것 출력을 모두.
기다려라! 나는 sys.stdout
, sys.__stdout__
, sys.stderr
및 sys.__stderr__
을 캡처해야합니다, 그래서 난 내 코드베이스에 stdout_helpers
라는이 라이브러리 썼다 :
stdout_helpers.enable_proxy()
: 내가 전화를 내 응용 프로그램의 시작 부분에서 지금
import threading
import sys
import cStringIO
from werkzeug import local
# Save all of the objects for use later.
orig___stdout__ = sys.__stdout__
orig___stderr__ = sys.__stderr__
orig_stdout = sys.stdout
orig_stderr = sys.stderr
thread_proxies = {}
def redirect():
"""
Enables the redirect for the current thread's output to a single cStringIO
object and returns the object.
:return: The StringIO object.
:rtype: ``cStringIO.StringIO``
"""
# Get the current thread's identity.
ident = threading.currentThread().ident
# Enable the redirect and return the cStringIO object.
thread_proxies[ident] = cStringIO.StringIO()
return thread_proxies[ident]
def stop_redirect():
"""
Enables the redirect for the current thread's output to a single cStringIO
object and returns the object.
:return: The final string value.
:rtype: ``str``
"""
# Get the current thread's identity.
ident = threading.currentThread().ident
# Only act on proxied threads.
if ident not in thread_proxies:
return
# Read the value, close/remove the buffer, and return the value.
retval = thread_proxies[ident].getvalue()
thread_proxies[ident].close()
del thread_proxies[ident]
return retval
def _get_stream(original):
"""
Returns the inner function for use in the LocalProxy object.
:param original: The stream to be returned if thread is not proxied.
:type original: ``file``
:return: The inner function for use in the LocalProxy object.
:rtype: ``function``
"""
def proxy():
"""
Returns the original stream if the current thread is not proxied,
otherwise we return the proxied item.
:return: The stream object for the current thread.
:rtype: ``file``
"""
# Get the current thread's identity.
ident = threading.currentThread().ident
# Return the proxy, otherwise return the original.
return thread_proxies.get(ident, original)
# Return the inner function.
return proxy
def enable_proxy():
"""
Overwrites __stdout__, __stderr__, stdout, and stderr with the proxied
objects.
"""
sys.__stdout__ = local.LocalProxy(_get_stream(sys.__stdout__))
sys.__stderr__ = local.LocalProxy(_get_stream(sys.__stderr__))
sys.stdout = local.LocalProxy(_get_stream(sys.stdout))
sys.stderr = local.LocalProxy(_get_stream(sys.stderr))
def disable_proxy():
"""
Overwrites __stdout__, __stderr__, stdout, and stderr with the original
objects.
"""
sys.__stdout__ = orig___stdout__
sys.__stderr__ = orig___stderr__
sys.stdout = orig_stdout
sys.stderr = orig_stderr
을 그리고
그리고 어떤 스레드에서 지금 전화는 :
string_io = stdout_helpers.redirect()
표준 출력을 복제 할 수 있습니까?이 점이 어떤 차이가 있습니까? – ATOzTOA
더 나은 그림을 얻으려면 [this] (http://codewiki.wikidot.com/c:system-calls:dup2)를 읽으십시오. – owobeid
내 업데이트 된 질문을 참조하십시오 ... – ATOzTOA