2009-10-01 7 views
0

저는 온라인 리소스와이 사이트의 사람들의 도움을 받아 Python을 배우고 있습니다. Twitter RSS 피드 항목을 구문 분석하고 결과를 데이터베이스에 삽입하는이 첫 번째 스크립트에서는 해결할 수없는 문제가 하나 남아 있습니다. 즉, 중복 항목이 테이블 중 하나에 삽입되고 있습니다.sqlite, sqlalchemy, python을 사용하여 데이터베이스에 중복 삽입

약간의 배경 지식으로, 원래 HaloTis.com에서 RSS 피드를 다운로드하기위한 기본 스크립트를 발견하고 여러 가지 방법으로 수정했습니다 : 1) Twitter RSS 피드의 특이성을 설명하기 위해 수정되었습니다 (콘텐츠, 제목, URL 등); 2) "hashtags"및 다 대다 관계 (entry_tag 테이블)에 대한 테이블을 추가했습니다. 3) 테이블 설정을 sqlalchemy로 변경했습니다. 4)는 이상한 유니 코드 문제가 발생했음을 설명하기 위해 임시 변경했습니다. 결과적으로, 코드는 엉망이되지만 좋은 학습 경험이었고 지금은 작동합니다. 단, "entries"테이블에 중복을 계속 삽입한다는 점만 다릅니다.

내가 사람들에게 가장 도움이 될지 모르겠으므로 아래에서 전체 코드를 붙여 넣었으며 일부 의견은 내가 가장 중요하다고 생각하는 것을 지적했습니다.

정말 고맙겠습니다. 감사!

편집 : 누군가가 데이터베이스 스키마를 제안했습니다. 나는 전에 이것을 한 적이 없기 때문에 내가 제대로하지 않는다면 나와 함께 감내해라.

  1. RSSFeeds, 콘텐츠에 대한 열 피드의 각에서 (구문 분석 후) 다운로드 개별 항목의 목록을 (포함 트위터 RSS의 목록
  2. RSSEntries 피드를 포함 : 나는 네 개의 테이블을 설정하고 , 해시 태그, 날짜, URL)
  3. 개별 항목 (트윗)에서 발견되는 모든 해시 태그 목록을 포함하는 태그
  4. entry_tag : 항목에 태그를 매핑 할 수있는 열이 포함되어 있습니다. 한마디로

이 스크립트는 아래의 RSS 테이블 피드에서 다섯 개 테스트 RSS 피드를 잡고 각 피드에서 20 최신 항목/트윗을 다운로드, 항목을 구문 분석하고 RSS 항목, 태그에 정보를 넣습니다 및 entry_tag 테이블.

#!/usr/local/bin/python 

import sqlite3 
import threading 
import time 
import Queue 
from time import strftime 
import re  
from string import split 
import feedparser 
from django.utils.encoding import smart_str, smart_unicode  
from sqlalchemy import schema, types, ForeignKey, select, orm 
from sqlalchemy import create_engine 

engine = create_engine('sqlite:///test98.sqlite', echo=True) 
metadata = schema.MetaData(engine) 
metadata.bind = engine 

def now(): 
    return datetime.datetime.now() 


#set up four tables, with many-to-many relationship 
RSSFeeds = schema.Table('feeds', metadata, 
    schema.Column('id', types.Integer, 
     schema.Sequence('feeds_seq_id', optional=True), primary_key=True), 
    schema.Column('url', types.VARCHAR(1000), default=u''), 
) 


RSSEntries = schema.Table('entries', metadata, 
    schema.Column('id', types.Integer, 
     schema.Sequence('entries_seq_id', optional=True), primary_key=True), 
    schema.Column('feed_id', types.Integer, schema.ForeignKey('feeds.id')), 
    schema.Column('short_url', types.VARCHAR(1000), default=u''), 
    schema.Column('content', types.Text(), nullable=False), 
    schema.Column('hashtags', types.Unicode(255)), 
    schema.Column('date', types.String()), 
) 


tag_table = schema.Table('tag', metadata, 
    schema.Column('id', types.Integer, 
     schema.Sequence('tag_seq_id', optional=True), primary_key=True), 
    schema.Column('tagname', types.Unicode(20), nullable=False, unique=True), 
) 


entrytag_table = schema.Table('entrytag', metadata, 
    schema.Column('id', types.Integer, 
     schema.Sequence('entrytag_seq_id', optional=True), primary_key=True), 
    schema.Column('entryid', types.Integer, schema.ForeignKey('entries.id')), 
    schema.Column('tagid', types.Integer, schema.ForeignKey('tag.id')), 
) 


metadata.create_all(bind=engine, checkfirst=True) 


# Insert test set of Twitter RSS feeds 
stmt = RSSFeeds.insert() 
stmt.execute(
    {'url': 'http://twitter.com/statuses/user_timeline/14908909.rss'}, 
    {'url': 'http://twitter.com/statuses/user_timeline/52903246.rss'}, 
    {'url': 'http://twitter.com/statuses/user_timeline/41902319.rss'}, 
    {'url': 'http://twitter.com/statuses/user_timeline/29950404.rss'}, 
    {'url': 'http://twitter.com/statuses/user_timeline/35699859.rss'}, 
) 



#These 3 lines for threading process (see HalOtis.com for example) 
THREAD_LIMIT = 20 
jobs = Queue.Queue(0) 
rss_to_process = Queue.Queue(THREAD_LIMIT) 


#connect to sqlite database and grab the 5 test RSS feeds 
conn = engine.connect() 
feeds = conn.execute('SELECT id, url FROM feeds').fetchall() 

#This block contains all the parsing and DB insertion 
def store_feed_items(id, items): 
    """ Takes a feed_id and a list of items and stores them in the DB """ 
    for entry in items: 
     conn.execute('SELECT id from entries WHERE short_url=?', (entry.link,)) 
     #note: entry.summary contains entire feed entry for Twitter, 
        #i.e., not separated into content, etc. 
     s = unicode(entry.summary) 
     test = s.split() 
     tinyurl2 = [i for i in test if i.startswith('http://')] 
     hashtags2 = [i for i in s.split() if i.startswith('#')] 
     content2 = ' '.join(i for i in s.split() if i not in tinyurl2+hashtags2) 
     content = unicode(content2) 
     tinyurl = unicode(tinyurl2) 
     hashtags = unicode (hashtags2) 
     print hashtags 
     date = strftime("%Y-%m-%d %H:%M:%S",entry.updated_parsed) 


     #Insert parsed feed data into entries table 
        #THIS IS WHERE DUPLICATES OCCUR 
     result = conn.execute(RSSEntries.insert(), {'feed_id': id, 'short_url': tinyurl, 
      'content': content, 'hashtags': hashtags, 'date': date}) 
     entry_id = result.last_inserted_ids()[0] 


     #Look up tag identifiers and create any that don't exist: 
     tags = tag_table 
     tag_id_query = select([tags.c.tagname, tags.c.id], tags.c.tagname.in_(hashtags2)) 
     tag_ids = dict(conn.execute(tag_id_query).fetchall()) 
     for tag in hashtags2: 
      if tag not in tag_ids: 
       result = conn.execute(tags.insert(), {'tagname': tag}) 
       tag_ids[tag] = result.last_inserted_ids()[0] 

     #insert data into entrytag table 
     if hashtags2: conn.execute(entrytag_table.insert(), 
      [{'entryid': entry_id, 'tagid': tag_ids[tag]} for tag in hashtags2]) 


#Rest of file completes the threading process  
def thread(): 
    while True: 
     try: 
      id, feed_url = jobs.get(False) # False = Don't wait 
     except Queue.Empty: 
      return 

     entries = feedparser.parse(feed_url).entries 
     rss_to_process.put((id, entries), True) # This will block if full 

for info in feeds: # Queue them up 
    jobs.put([info['id'], info['url']]) 

for n in xrange(THREAD_LIMIT): 
    t = threading.Thread(target=thread) 
    t.start() 

while threading.activeCount() > 1 or not rss_to_process.empty(): 
    # That condition means we want to do this loop if there are threads 
    # running OR there's stuff to process 
    try: 
     id, entries = rss_to_process.get(False, 1) # Wait for up to a second 
    except Queue.Empty: 
     continue 

    store_feed_items(id, entries) 
+0

스키마를 제공하면 소스 코드에서 추론 할 필요가 없으므로 도움이됩니다. – Fragsworth

+0

감사합니다. 위의 내용을 추가하겠습니다. –

답변

2

SQLAlchemy를 사용하지 않은 기존의 스크립트에 SQLAlchemy를 포함시킨 것처럼 보입니다. 우리 중 누구도 충분히 잘 이해하지 못하는 부분이 너무 많습니다.

처음부터 시작하는 것이 좋습니다. 스레딩을 사용하지 마십시오. sqlalchemy를 사용하지 마십시오. 아마 SQL 데이터베이스를 사용하지 않아도됩니다. 간단한 루프와 아마도 time.sleep()을 사용하여 단순한 데이터 구조로 원하는 정보를 수집하는 스크립트를 작성하십시오. 그렇다면 SQL 데이터베이스에 스토리지를 추가 할 수 있습니다. SQL 문을 직접 작성하는 것이 ORM을 사용하는 것보다 훨씬 어렵다고 생각하며 IMHO를 디버그하는 것이 더 쉽습니다. 스레딩을 추가 할 필요가 결코없는 좋은 기회입니다.

"멀티 스레드 프로그램을 작성하는 데 충분히 똑똑하다고 생각하면 그렇지 않습니다." - James Ahlstrom.

관련 문제