2017-03-12 2 views
0

나는 파이썬을 사용하여 트위터에서 스트리밍 데이터를 스트리밍하는 API를 사용하고 있습니다. 두 단계가 짧습니다. 1) get access_token 2) access_token을 사용하여 데이터를 요청하십시오. 1 단계는 완전히 잘 작동하지만 2 단계에서이 오류는 bad requeststatus 400입니다. 왜 그래? 트위터가 HTTP1.1을 사용하고 트위스트가 HTTP1.0을 사용하고 있기 때문에 그것은 deafult로 생각합니다. 다음 HTTP1.1트위터 트위터 API 나쁜 요청 오류

편집에 연결을 업그레이드하는 방법 : 것은 여기 내 오류 메시지

HTTP/1.0 400 Bad Request 
content-length: 0 
date: Sun, 12 Mar 2017 14:57:13 GMT 
server: tsa 
x-connection-hash: dca361a2b4214ad66203e9912b05cf7f 

[Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionDone'>: Connection was closed cleanly. 

입니다.

#!/usr/bin/python 
import oauth2 as oauth 
import urlparse 
import time 
import webbrowser 
from twisted.internet import reactor, protocol, ssl 
from twisted.web import http 


CONSUMER_KEY = 'xxxx' 
CONSUMER_SECRET = 'xxxx' 
CONSUMER = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET) 

ACCESS_TOKEN_FILE = 'OAUTH_ACCESS_TOKEN' 

TWITTER_REQUEST_TOKEN_URL = 'https://twitter.com/oauth/request_token' 
TWITTER_ACCESS_TOKEN_URL = 'https://twitter.com/oauth/access_token' 
TWITTER_AUTHORIZE_URL = 'https://twitter.com/oauth/authorize' 
TWITTER_STREAM_API_HOST = 'stream.twitter.com' 
TWITTER_STREAM_API_PATH = '/1.1/statuses/sample.json' 


class TwitterStreamer(http.HTTPClient): 
    def connectionMade(self): 
     self.sendCommand('GET', self.factory.url) 
     self.sendHeader('Host', self.factory.host) 
     self.sendHeader('User-Agent', self.factory.agent) 
     self.sendHeader('Authorization', self.factory.oauth_header) 
     self.endHeaders() 

    def handleStatus(self, version, status, message): 
     if status != '200': 
      self.factory.tweetError(ValueError("bad status")) 

    def lineReceived(self, line): 
     self.factory.tweetReceived(line) 

    def connectionLost(self, reason): 
     self.factory.tweetError(reason) 


class TwitterStreamerFactory(protocol.ClientFactory): 
    protocol = TwitterStreamer 

    def __init__(self, oauth_header): 
     self.url = TWITTER_STREAM_API_PATH 
     self.agent = 'Twisted/TwitterStreamer' 
     self.host = TWITTER_STREAM_API_HOST 
     self.oauth_header = oauth_header 

    def clientConnectionFailed(self, _, reason): 
     self.tweetError(reason) 

    def tweetReceived(self, tweet): 
     print tweet 

    def tweetError(self, error): 
     print error 


def save_access_token(key, secret): 
    with open(ACCESS_TOKEN_FILE, 'w') as f: 
     f.write("ACCESS_KEY=%s\n" % key) 
     f.write("ACCESS_SECRET=%s\n" % secret) 


def load_access_token(): 
    with open(ACCESS_TOKEN_FILE) as f: 
     lines = f.readlines() 

    str_key = lines[0].strip().split('=')[1] 
    str_secret = lines[1].strip().split('=')[1] 
    return oauth.Token(key=str_key, secret=str_secret) 


def fetch_access_token(): 
    CONSUMER_KEY = 'xxxxxxxx' 
    CONSUMER_SECRET = 'xxxxxxxxx' 
    ACCESS_KEY="xxxxxxx" 
    ACCESS_SECRET="xxxxxxxxx" 
    consumer = oauth.Consumer(key=CONSUMER_KEY, secret=CONSUMER_SECRET) 
    access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET) 

    return (access_token.key, access_token.secret) 


def build_authorization_header(access_token): 
    url = "https://%s%s" % (TWITTER_STREAM_API_HOST, TWITTER_STREAM_API_PATH) 
    params = { 
     'oauth_version': "1.0", 
     'oauth_nonce': oauth.generate_nonce(), 
     'oauth_timestamp': str(int(time.time())), 
     'oauth_token': access_token.key, 
     'oauth_consumer_key': CONSUMER.key 
    } 

    # Sign the request. 
    # For some messed up reason, we need to specify is_form_encoded to prevent 
    # the oauth2 library from setting oauth_body_hash which Twitter doesn't like. 
    req = oauth.Request(method="GET", url=url, parameters=params, is_form_encoded=True) 
    req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), CONSUMER, access_token) 

    # Grab the Authorization header 
    header = req.to_header()['Authorization'].encode('utf-8') 
    print "Authorization header:" 
    print "  header = %s" % header 
    return header 

if __name__ == '__main__': 
    # Check if we have saved an access token before. 
    try: 
     f = open(ACCESS_TOKEN_FILE) 
    except IOError: 
     # No saved access token. Do the 3-legged OAuth dance and fetch one. 
     (access_token_key, access_token_secret) = fetch_access_token() 
     # Save the access token for next time. 
     save_access_token(access_token_key, access_token_secret) 

    # Load access token from disk. 
    access_token = load_access_token() 

    # Build Authorization header from the access_token. 
    auth_header = build_authorization_header(access_token) 

    # Twitter stream using the Authorization header. 
    twsf = TwitterStreamerFactory(auth_header) 
    reactor.connectSSL(TWITTER_STREAM_API_HOST, 443, twsf, ssl.ClientContextFactory()) 
    reactor.run() 

UPDATE : 작업 코드 :

import base64, urllib 
from twisted.internet import reactor 
from twisted.internet.defer import Deferred 
from twisted.protocols import basic 
from twisted.python.failure import DefaultException 
from twisted.web.client import Agent 
from twisted.web.http_headers import Headers 
import json 
import oauth2 as oauth 
import time 
from twisted.web import server,resource 
from twisted.internet import endpoints 
from twisted.web.server import Site 
CONSUMER_KEY = 'xxxxxxxxxxxx' 
CONSUMER_SECRET = 'xxxxxxxxxxxxxx' 
TWITTER_STREAM_API_HOST = 'stream.twitter.com' 
TWITTER_STREAM_API_PATH = '/1.1/statuses/sample.json' 
ACCESS_TOKEN_FILE = 'OAUTH_ACCESS_TOKEN' 

CONSUMER = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET) 


def callback(result): 
    print result 
def errback(error): 
    print error 

class StreamingParser(basic.LineReceiver): 
    delimiter = '\r\n' 

    def __init__(self, user_callback, user_errback): 
     self.user_callback = user_callback 
     self.user_errback = user_errback 

    def lineReceived(self, line): 
     d = Deferred() 
     d.addCallback(self.user_callback) 
     d.addErrback(self.user_errback) 
     line = line.strip() 
     print line,'........' 
     try: 
      d.callback(json.loads(line)) 
     except ValueError, e: 
      if self.user_errback: 
       d.errback(e) 

    def connectionLost(self, reason): 
     if self.user_errback: 
      d = Deferred() 
      d.addErrback(self.user_errback) 
      d.errback(DefaultException(reason.getErrorMessage())) 

def _get_response(response, callback, errback): 
    print 'got response......' 
    response.deliverBody(StreamingParser(callback, errback)) 
    return Deferred() 

def _shutdown(reason, errback): 
    d = Deferred() 
    d.addErrback(errback) 
    d.errback(reason) 
    if reactor.running: 
     reactor.stop() 

def save_access_token(key, secret): 
    with open(ACCESS_TOKEN_FILE, 'w') as f: 
    f.write("ACCESS_KEY=%s\n" % key) 
    f.write("ACCESS_SECRET=%s\n" % secret) 


def load_access_token(): 
    with open(ACCESS_TOKEN_FILE) as f: 
    lines = f.readlines() 

    str_key = lines[0].strip().split('=')[1] 
    str_secret = lines[1].strip().split('=')[1] 
    return oauth.Token(key=str_key, secret=str_secret) 


def fetch_access_token(): 
    ACCESS_KEY="xxxxx-xxxx" 
    ACCESS_SECRET="xxxxxxxxxxxx" 
    access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET) 
    return (access_token.key, access_token.secret) 


def make_header(access_token): 
    url = "https://%s%s" % (TWITTER_STREAM_API_HOST, TWITTER_STREAM_API_PATH) 
    params = { 
    # "Authorization": "Oauth %s" % auth, 
    "oauth_version": "1.0", 
    "oauth_nonce": oauth.generate_nonce(), 
    "oauth_timestamp": str(int(time.time())), 
    "oauth_token": access_token.key, 
    "oauth_consumer_key": CONSUMER.key 
    } 

    req = oauth.Request(method="GET", url=url, parameters=params, is_form_encoded=True) 
    req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), CONSUMER, access_token) 
    header = req.to_header()['Authorization'].encode('utf-8') 
    print "Authorization header:" 
    print "  header = %s" % header 
    return header 

def start_streaming(): 
    print 'streaming started...........' 
    try: 
     f = open(ACCESS_TOKEN_FILE) 
    except IOError: 
     access_token_key, access_token_secret = fetch_access_token() 
     save_access_token(access_token_key, access_token_secret) 

    access_token = load_access_token() 
    auth_header = make_header(access_token) 
    url = 'https://stream.twitter.com/1.1/statuses/sample.json' 
    headers = Headers({ 
     'User-Agent': ['TwistedSTreamReciever'], 
     'Authorization': [auth_header]}) 
    agent = Agent(reactor) 
    d = agent.request('GET', url, headers, None) 
    d.addCallback(_get_response, callback, errback) 
    d.addBoth(_shutdown, errback) 
    # reactor.run() 

class _Stream(resource.Resource): 
    isLeaf = True 
    def render_GET(self, request): 
     start_streaming()# Streaming started here....... 
     time.sleep(8) # wait for 8 seconds... 
     ########.........??? stop streaming here?? 
     return "<html>streaming started...........%s</html>" % (time.ctime(),) 


if __name__ == "__main__": 

    resource = _Stream() 
    factory = Site(resource) 
    endpoint = endpoints.TCP4ServerEndpoint(reactor, 8880) 
    endpoint.listen(factory) 
    reactor.run() 
+2

왜'twisted.web.http.HTTPClient'을 사용하고 있습니다 :

def _get_response(response, callback, errback): print 'got response......' proto = StreamingParser(callback, errback) save_stream_by_name(stream_name, proto) response.deliverBody(proto) return Deferred() 

는 그 스트림을 완료하면? 대신에'Agent'를 사용하십시오. –

+0

@ Jean-PaulCalderone이 답장을 보내 주셔서 감사합니다. 에이전트를 사용하려고 시도합니다. – anekix

+0

'에이전트'와 함께 작동하도록했습니다. 나는 또한 스트리밍을 시작하는 HTTP 종점을 정의했다. 이제는 스트리밍을 중단하는 방법에 대한 약간의 힌트가 필요합니다 (아마도 'Agent'에 의해 strted 된 Connection을 막을 것입니다). 내 현재의 의심에 대한 참조를 위해 – anekix

답변

0

이 (가 필요할 수 있습니다 보인다 - 나는 자신에 끝이 없을 스트림이 트위터를 추측하고있어) 특정 스트리밍 응답을 읽기에 포기하고 해당 요청/응답과 관련된 연결을 닫으십시오 (HTTP는 응답을 포기할 다른 방법이 없기 때문에). 본문 전달 프로토콜의 transport.loseConnection 메서드를 사용하십시오. 따라서, 예를 들어 :

pop_stream_by_name(stream_name).transport.loseConnection()