2017-09-05 2 views
1

Twisted web.Resource를 사용하여 MJPEG 서버를 구현하려고합니다. 자체는 업스트림 gstreamer 프로세스에서 데이터를 가져옵니다. TCP에 MJPEG 데이터 작성 포트 localhost : 9999.파이썬 트위스트 : TCP 리더와 웹 리소스 간의 버퍼링 방지하기

from twisted.internet import reactor, protocol, defer 
from twisted.web import server, resource 

class MJpegResource(resource.Resource): 
    def __init__(self, queues): 
     self.queues = queues 

    @defer.inlineCallbacks 
    def deferredRenderer(self, request): 
     q = defer.DeferredQueue() 
     self.queues.append([q, request]) 
     while True: 
      yield q.get() 

    def render_GET(self, request): 
     request.setHeader("content-type", 'multipart/x-mixed-replace; boundary=--spionisto') 
     self.deferredRenderer(request) 
     return server.NOT_DONE_YET 

class JpegStreamReader(protocol.Protocol): 
    def dataReceived(self, data): 
     for (q, req) in self.factory.queues: 
      req.write(data) 
      q.put('') 

root = File('web') 
root.putChild('stream.mjpeg', MJpegResource(queues)) 

factory = protocol.Factory() 
factory.protocol = JpegStreamReader 
factory.queues = queues 
reactor.listenTCP(9999, factory) 

site = server.Site(root) 
reactor.listenTCP(80, site) 

# spawn gstreamer process which writes to port 9999. 
# The gstream process is launched using: 
# gst-launch-1.0 -v \ 
#  v4l2src device=/dev/video0 \ 
#   ! video/x-raw,framerate=15/1, width=640, height=480 \ 
#   ! jpegenc \ 
#   ! multipartmux boundary=spionisto \ 
#   ! tcpclientsink host=127.0.0.1 port=9999 \ 

reactor.run() 

그래서 같은 것이 : 나는 지금이 같은이

gstreamer --> JpegStreamReader --> MJpegResource 

이 확인을 작동하지만, 나는 이따금 것으로 나타났습니다은 브라우저에서 비디오 폭포까지 무엇 뒤에를 " 라이브 "(30-40 초만큼 가끔씩). 브라우저를 새로 고치 자마자 MJPEG 스트림은 을 "라이브"로 되돌립니다. 그래서 내 의심은 JpegStreamReader가 이 web.http.Request에 해당하는 TCP 소켓에 쓸 수 없다는 것입니다. gstreamer가 TCP 소켓 9999를 채우고 있고, JpegStreamReader의 입력 대기열에 이 버퍼링되고 있습니다.

스트림이 "라이브"로되어 있기 때문에 프레임을 삭제하면 비디오를 다시 가져올 수 있습니다. 그러나, 난 심지어 JpegStreamReader 뒤에 등 떨어지는 감지하는 방법을 잘 모르겠습니다? 이 파이프 라인을 라이브 스트림처럼 작동시키는 방법에 대한 제안 사항은 무엇입니까?

이 작업을 수행하기위한 근본적으로 다른 아키텍처가 있다면 제안을 크게 환영 할 것입니다.

답변

1

Request 개체에 제작자를 등록 할 수 있습니다. 그 Request의 쓰기 버퍼가 꽉 찼을 때 호출되는 메서드는 pauseProducing입니다. 방을 사용할 수있게되면 resumeProducing 메서드 호출이 발생합니다.

이 정보를 사용하여시기 적절하게 전달되지 않은 프레임을 삭제할 수 있습니다. 그러나 실제로 서버에서 프레임을 식별해야합니다 (현재 프레임이 시작되거나 끝나는 곳을 모르는 상태에서 스트림으로 데이터를 전달하는 dataReceived 메서드 만 있습니다). 이것은 또한 버퍼 충만도가 아마도 스트림의 지연에 대해 매우 지연되는 지표라는 문제가 있습니다. 그리고 시스템의 병목 현상이 gstreamer의 데이터를 읽고 요청에 쓰는 것 사이에 있지 않으면 프로그램의이 부분에 배압 감도를 추가하는 것이 도움이되지 않습니다.

+0

대단히 감사합니다. 방금 제안을 구현했으며 문제가 실제로 해결되어야한다고 생각합니다. 나는 잠시 동안 비디오 스트림을 가지고 놀았으며, 내부 네트워크에 여분의 네트워크 활동이있을 때 예상되는 프레임이 보이게됩니다. 마지막 해결책은 1 초 지연으로 resumeProducing에만 응답하는 것입니다. 후손을 위해 마침내 생각 나는 코드를 포함하고 싶습니다. 대회를 혼자서 별도의 답변에 넣거나 논평에 넣는 것이 관례입니다. 별도의 답변을 받아야합니까? –

2

이것은 Jean-Paul Calerone의 제안을 구현하는 최종 해결책입니다. 이제는 PushProducer 인터페이스 인 을 구현하는 JpegProducer 클래스가 있습니다. 일시 중지를 요청하면 플래그를 설정합니다. 이 은 TCP 스트림 판독기 (JpegStreamReader)가 특정 제작자가 막히면 으로 프레임을 푸시하지 못하게합니다. Jean-Paul의 제안에 따르면 도 멀티 파트 MJPEG 스트림을 청크로 분해해야하므로 은 항상 MJPEG 출력 형식을 손상시키지 않고 프레임을 드롭합니다.

from twisted.internet import reactor, protocol, defer, interfaces 
from twisted.web import server, resource 
from zope.interface import implementer 

class MJpegResource(resource.Resource): 
    def __init__(self, queues): 
     self.queues = queues 

    def setupProducer(self, request): 
     producer = JpegProducer(request) 
     request.notifyFinish().addErrback(self._responseFailed, producer) 
     request.registerProducer(producer, True) 

     self.queues.append(producer) 

    def _responseFailed(self, err, producer): 
     producer.stopProducing() 

    def render_GET(self, request): 
     request.setHeader("content-type", 'multipart/x-mixed-replace; boundary=--spionisto') 
     self.setupProducer(request) 
     return server.NOT_DONE_YET 

@implementer(interfaces.IPushProducer) 
class JpegProducer(object): 
    def __init__(self, request): 
     self.request = request 
     self.isPaused = False 
     self.isStopped = False 
     self.delayedCall = None 

    def cancelCall(self): 
     if self.delayedCall: 
      self.delayedCall.cancel() 
      self.delayedCall = None 

    def pauseProducing(self): 
     self.isPaused = True 
     self.cancelCall() 

    def resetPausedFlag(self): 
     self.isPaused = False 
     self.delayedCall = None 

    def resumeProducing(self): 
     # calling self.cancelCall is defensive. We should not really get 
     # called with multiple resumeProducing calls without any 
     # pauseProducing in the middle. 
     self.cancelCall() 
     self.delayedCall = reactor.callLater(1, self.resetPausedFlag) 
     log('producer is requesting to be resumed') 

    def stopProducing(self): 
     self.isPaused = True 
     self.isStopped = True 
     log('producer is requesting to be stopped') 

MJPEG_SEP = '--spionisto\r\n' 

class JpegStreamReader(protocol.Protocol): 
    def __init__(self): 
     self.tnow = None 

    def connectionMade(self): 
     self.data = '' 
     self.tnow = datetime.now() 

    def dataReceived(self, data): 
     self.data += data 

     chunks = self.data.rsplit(MJPEG_SEP, 1) 

     dataToSend = '' 
     if len(chunks) == 2: 
      dataToSend = chunks[0] + MJPEG_SEP 

     self.data = chunks[-1] 

     for producer in self.factory.queues: 
      if (not producer.isPaused): 
       producer.request.write(dataToSend) 
관련 문제