1

Google App Engine Datastore에서 데이터를 가져 와서 Google Cloud Storage 및 BigQuery로 이동하는 방법을 보여주는 this Codelab을 따라 가려고합니다. MapReduce를 설정하여 관로. Google App Engine Datastore 엔티티를 설정하고 테스트와 마찬가지로 데이터를 수집하려는 특정 주식에 대한 트윗을 수집하는 프로세스가 있습니다. 이 예제에서 설명한대로 모든 것을 수행했다고 생각하지만 데이터를 분할하고 Cloud Storage로로드하는 모든 작업을 수행하는 파편이 UnicodeEncodeErrors를 발생시킵니다. 여기에 내가 dev에 응용 프로그램 서버에서 응용 프로그램을 테스트 한 곳에서 로그는 다음과 같습니다BigQuery에서 Google App Engine Datastore 프로세스의 UnicodeEncodeError

INFO  2012-12-18 20:41:07,645 dev_appserver.py:3103] "POST /mapreduce/worker_callback HTTP/1.1" 500 - 
WARNING 2012-12-18 20:41:07,648 taskqueue_stub.py:1981] Task appengine-mrshard-1582400592541472B07B9-0-0 failed to execute. This task will retry in 0.100 seconds 
ERROR 2012-12-18 20:41:09,453 webapp2.py:1552] 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128) 
Traceback (most recent call last): 
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1535, in __call__ 
rv = self.handle_exception(request, response, e) 
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1529, in __call__ 
rv = self.router.dispatch(request, response) 
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1278, in default_dispatcher 
return route.handler_adapter(request, response) 
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1102, in __call__ 
return handler.dispatch() 
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 572, in dispatch 
return self.handle_exception(e, self.app.debug) 
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 570, in dispatch 
return method(*args, **kwargs) 
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\base_handler.py", line 65, in post 
self.handle() 
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 181, in handle 
entity, input_reader, ctx, tstate) 
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 298, in process_data 
output_writer.write(output, ctx) 
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\output_writers.py", line 659, in write 
ctx.get_pool("file_pool").append(self._filename, str(data)) 
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128) 

여기에 코드입니다 :있을 수 있습니다

import json 
import webapp2 
import urllib2 
import time 
import calendar 
import datetime 
import httplib2 

from google.appengine.ext import db 
from google.appengine.api import taskqueue 
from google.appengine.ext import blobstore 
from google.appengine.ext.webapp.util import run_wsgi_app 
from google.appengine.ext.webapp import blobstore_handlers 
from google.appengine.ext.webapp import util 
from google.appengine.ext.webapp import template 
from google.appengine.api import urlfetch 

from mapreduce.lib import files 
from mapreduce import base_handler 
from mapreduce import mapreduce_pipeline 
from apiclient.discovery import build 
from oauth2client.appengine import AppAssertionCredentials 

SCOPE = 'https://www.googleapis.com/auth/bigquery' 
PROJECT_ID = 'project_id' # Your Project ID here 
BQ_DATASET_ID = 'datastore_data' 
GS_BUCKET = 'bucketname' 
ENTITY_KIND = 'main.streamdata' 

class streamdata(db.Model): 
    querydate = db.DateTimeProperty(auto_now_add = True) 
    ticker = db.StringProperty() 
    created_at = db.StringProperty() 
    tweet_id = db.StringProperty() 
    text = db.TextProperty() 
    source = db.StringProperty() 

class DatastoreMapperPipeline(base_handler.PipelineBase): 

    def run(self, entity_type): 

     output = yield mapreduce_pipeline.MapperPipeline(
      "Datastore Mapper %s" % entity_type, 
      "main.datastore_map", 
      "mapreduce.input_readers.DatastoreInputReader", 
      output_writer_spec="mapreduce.output_writers.FileOutputWriter", 
      params={ 
       "input_reader":{ 
        "entity_kind": entity_type, 
        }, 
       "output_writer":{ 
        "filesystem": "gs", 
        "gs_bucket_name": GS_BUCKET, 
        "output_sharding":"none", 
        } 
       }, 
       shards=10) 

     yield CloudStorageToBigQuery(output) 

class CloudStorageToBigQuery(base_handler.PipelineBase): 

    def run(self, csv_output): 

     credentials = AppAssertionCredentials(scope=SCOPE) 
     http = credentials.authorize(httplib2.Http()) 
     bigquery_service = build("bigquery", "v2", http=http) 

     jobs = bigquery_service.jobs() 
     table_name = 'datastore_data_%s' % datetime.datetime.utcnow().strftime(
      '%m%d%Y_%H%M%S') 
     files = [str(f.replace('/gs/', 'gs://')) for f in csv_output] 
     result = jobs.insert(projectId=PROJECT_ID, 
          body=build_job_data(table_name,files)) 

     result.execute() 

def build_job_data(table_name, files): 
    return {"projectId": PROJECT_ID, 
      "configuration":{ 
       "load": { 
        "sourceUris": files, 
        "schema":{ 
         "fields":[ 
          { 
           "name":"querydate", 
           "type":"INTEGER", 
          }, 
          { 
           "name":"ticker", 
           "type":"STRING", 
          }, 
          { 
           "name":"created_at", 
           "type":"STRING", 
          }, 
          { 
           "name":"tweet_id", 
           "type":"STRING", 
          }, 
          { "name":"text", 
           "type":"TEXT", 
          }, 
          {  
           "name":"source", 
           "type":"STRING", 
          } 
          ] 
         }, 
        "destinationTable":{ 
         "projectId": PROJECT_ID, 
         "datasetId": BQ_DATASET_ID, 
         "tableId": table_name, 
         }, 
        "maxBadRecords": 0, 
        } 
       } 
      } 

def datastore_map(entity_type): 
    data = db.to_dict(entity_type) 
    resultlist = [timestamp_to_posix(data.get('querydate')), 
        data.get('ticker'), 
        data.get('created_at'), 
        data.get('tweet_id'), 
        data.get('text'), 
        data.get('source')] 
    result = ','.join(['"%s"' % field for field in resultlist]) 
    yield("%s\n" % result) 

def timestamp_to_posix(timestamp): 
    return int(time.mktime(timestamp.timetuple())) 

class DatastoretoBigQueryStart(webapp2.RequestHandler): 
    def get(self): 
     pipeline = DatastoreMapperPipeline(ENTITY_KIND) 
     pipeline.start() 
     path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id 
     self.redirect(path) 

class StreamHandler(webapp2.RequestHandler): 

    def get(self): 

     tickers = ['AAPL','GOOG', 'IBM', 'BAC', 'INTC', 
        'DELL', 'C', 'JPM', 'WFM', 'WMT', 
        'AMZN', 'HOT', 'SPG', 'SWY', 'HTSI', 
        'DUK', 'CEG', 'XOM', 'F', 'WFC', 
        'CSCO', 'UAL', 'LUV', 'DAL', 'COST', 'YUM', 
        'TLT', 'HYG', 'JNK', 'LQD', 'MSFT', 
        'GE', 'LVS', 'MGM', 'TWX', 'DIS', 'CMCSA', 
        'TWC', 'ORCL', 'WPO', 'NYT', 'GM', 'JCP', 
        'LNKD', 'OPEN', 'NFLX', 'SBUX', 'GMCR', 
        'SPLS', 'BBY', 'BBBY', 'YHOO', 'MAR', 
        'L', 'LOW', 'HD', 'HOV', 'TOL', 'NVR', 'RYL', 
        'GIS', 'K', 'POST', 'KRFT', 'CHK', 'GGP', 
        'RSE', 'RWT', 'AIG', 'CB', 'BRK.A', 'CAT'] 

     for i in set(tickers): 

      url = 'http://search.twitter.com/search.json?q=' 
      resultcount = '&rpp=100' 
      language = '&lang=en' 
      encoding = '%40%24' 
      tickerstring = url + encoding + i + resultcount + language 
      tickurl = urllib2.Request(tickerstring) 
      tweets = urllib2.urlopen(tickurl) 
      code = tweets.getcode() 

      if code == 200: 
       results = json.load(tweets, 'utf-8') 
       if "results" in results: 
        entries = results["results"] 
        for entry in entries: 
         tweet = streamdata() 
         created = entry['created_at'] 
         tweetid = entry['id_str'] 
         tweettxt = entry['text'] 
         tweet.ticker = i 
         tweet.created_at = created 
         tweet.tweet_id = tweetid 
         tweet.text = tweettxt 
         tweet.source = "Twitter" 
         tweet.put() 

class MainHandler(webapp2.RequestHandler): 

    def get(self): 
     self.response.out.write('<a href="/start">Click here</a> to start the Datastore to BigQuery pipeline. ') 
     self.response.out.write('<a href="/add_data">Click here</a> to start adding data to the datastore. ') 


app = webapp2.WSGIApplication([ 
           ('/', MainHandler), 
           ('/start', DatastoretoBigQueryStart), 
           ('/add_data', StreamHandler)], 
           debug=True) 

모든 통찰력의 사람이 큰 도움이 될 것입니다.

많은 감사.

답변

3

보십시오 :

ctx.get_pool("file_pool").append(self._filename, str(data)) 

당신은 인코딩을 지정하지 않고, 파이썬은 다시 ASCII 인 디폴트로 떨어지면 그렇게

. 대신 데이터에 포함 된 모든 유니 코드 코드 포인트를 처리 할 수있는 다른 인코딩을 사용해야합니다.

대부분의 텍스트에서 UTF-8을 사용하는 것이 좋습니다. 비 서구 텍스트 (아랍어, 아시아 인 등)가 많은 경우 UTF-16이 더 효율적일 수 있습니다. 각각의 경우에, 당신은 명시 적으로해야 인코딩 할 수 있습니다 : 해당 파일의 데이터를 다시 읽을 때, 다시 유니 코드로 디코딩하는 filedata.decode('utf8')를 사용

ctx.get_pool("file_pool").append(self._filename, data.encode('utf8')) 

.

파이썬 및 유니 코드에 대한 자세한 내용은 Python Unicode HOWTO을 참조하십시오.

0
ctx.get_pool("file_pool").append(self._filename, str(data)) 

데이터에 유니 코드 문자가 포함되어 있으면 오류가 발생합니다. 당신은 bytestring에 유니 코드 데이터를 변환하는

ctx.get_pool("file_pool").append(self._filename, unicode(data)) 
관련 문제