저는 온라인 리소스와이 사이트의 사람들의 도움을 받아 Python을 배우고 있습니다. Twitter RSS 피드 항목을 구문 분석하고 결과를 데이터베이스에 삽입하는이 첫 번째 스크립트에서는 해결할 수없는 문제가 하나 남아 있습니다. 즉, 중복 항목이 테이블 중 하나에 삽입되고 있습니다.sqlite, sqlalchemy, python을 사용하여 데이터베이스에 중복 삽입
약간의 배경 지식으로, 원래 HaloTis.com에서 RSS 피드를 다운로드하기위한 기본 스크립트를 발견하고 여러 가지 방법으로 수정했습니다 : 1) Twitter RSS 피드의 특이성을 설명하기 위해 수정되었습니다 (콘텐츠, 제목, URL 등); 2) "hashtags"및 다 대다 관계 (entry_tag 테이블)에 대한 테이블을 추가했습니다. 3) 테이블 설정을 sqlalchemy로 변경했습니다. 4)는 이상한 유니 코드 문제가 발생했음을 설명하기 위해 임시 변경했습니다. 결과적으로, 코드는 엉망이되지만 좋은 학습 경험이었고 지금은 작동합니다. 단, "entries"테이블에 중복을 계속 삽입한다는 점만 다릅니다.
내가 사람들에게 가장 도움이 될지 모르겠으므로 아래에서 전체 코드를 붙여 넣었으며 일부 의견은 내가 가장 중요하다고 생각하는 것을 지적했습니다.
정말 고맙겠습니다. 감사!
편집 : 누군가가 데이터베이스 스키마를 제안했습니다. 나는 전에 이것을 한 적이 없기 때문에 내가 제대로하지 않는다면 나와 함께 감내해라.
- RSSFeeds, 콘텐츠에 대한 열 피드의 각에서 (구문 분석 후) 다운로드 개별 항목의 목록을 (포함 트위터 RSS의 목록
- RSSEntries 피드를 포함 : 나는 네 개의 테이블을 설정하고 , 해시 태그, 날짜, URL)
- 개별 항목 (트윗)에서 발견되는 모든 해시 태그 목록을 포함하는 태그
- entry_tag : 항목에 태그를 매핑 할 수있는 열이 포함되어 있습니다. 한마디로
이 스크립트는 아래의 RSS 테이블 피드에서 다섯 개 테스트 RSS 피드를 잡고 각 피드에서 20 최신 항목/트윗을 다운로드, 항목을 구문 분석하고 RSS 항목, 태그에 정보를 넣습니다 및 entry_tag 테이블.
#!/usr/local/bin/python
import sqlite3
import threading
import time
import Queue
from time import strftime
import re
from string import split
import feedparser
from django.utils.encoding import smart_str, smart_unicode
from sqlalchemy import schema, types, ForeignKey, select, orm
from sqlalchemy import create_engine
engine = create_engine('sqlite:///test98.sqlite', echo=True)
metadata = schema.MetaData(engine)
metadata.bind = engine
def now():
return datetime.datetime.now()
#set up four tables, with many-to-many relationship
RSSFeeds = schema.Table('feeds', metadata,
schema.Column('id', types.Integer,
schema.Sequence('feeds_seq_id', optional=True), primary_key=True),
schema.Column('url', types.VARCHAR(1000), default=u''),
)
RSSEntries = schema.Table('entries', metadata,
schema.Column('id', types.Integer,
schema.Sequence('entries_seq_id', optional=True), primary_key=True),
schema.Column('feed_id', types.Integer, schema.ForeignKey('feeds.id')),
schema.Column('short_url', types.VARCHAR(1000), default=u''),
schema.Column('content', types.Text(), nullable=False),
schema.Column('hashtags', types.Unicode(255)),
schema.Column('date', types.String()),
)
tag_table = schema.Table('tag', metadata,
schema.Column('id', types.Integer,
schema.Sequence('tag_seq_id', optional=True), primary_key=True),
schema.Column('tagname', types.Unicode(20), nullable=False, unique=True),
)
entrytag_table = schema.Table('entrytag', metadata,
schema.Column('id', types.Integer,
schema.Sequence('entrytag_seq_id', optional=True), primary_key=True),
schema.Column('entryid', types.Integer, schema.ForeignKey('entries.id')),
schema.Column('tagid', types.Integer, schema.ForeignKey('tag.id')),
)
metadata.create_all(bind=engine, checkfirst=True)
# Insert test set of Twitter RSS feeds
stmt = RSSFeeds.insert()
stmt.execute(
{'url': 'http://twitter.com/statuses/user_timeline/14908909.rss'},
{'url': 'http://twitter.com/statuses/user_timeline/52903246.rss'},
{'url': 'http://twitter.com/statuses/user_timeline/41902319.rss'},
{'url': 'http://twitter.com/statuses/user_timeline/29950404.rss'},
{'url': 'http://twitter.com/statuses/user_timeline/35699859.rss'},
)
#These 3 lines for threading process (see HalOtis.com for example)
THREAD_LIMIT = 20
jobs = Queue.Queue(0)
rss_to_process = Queue.Queue(THREAD_LIMIT)
#connect to sqlite database and grab the 5 test RSS feeds
conn = engine.connect()
feeds = conn.execute('SELECT id, url FROM feeds').fetchall()
#This block contains all the parsing and DB insertion
def store_feed_items(id, items):
""" Takes a feed_id and a list of items and stores them in the DB """
for entry in items:
conn.execute('SELECT id from entries WHERE short_url=?', (entry.link,))
#note: entry.summary contains entire feed entry for Twitter,
#i.e., not separated into content, etc.
s = unicode(entry.summary)
test = s.split()
tinyurl2 = [i for i in test if i.startswith('http://')]
hashtags2 = [i for i in s.split() if i.startswith('#')]
content2 = ' '.join(i for i in s.split() if i not in tinyurl2+hashtags2)
content = unicode(content2)
tinyurl = unicode(tinyurl2)
hashtags = unicode (hashtags2)
print hashtags
date = strftime("%Y-%m-%d %H:%M:%S",entry.updated_parsed)
#Insert parsed feed data into entries table
#THIS IS WHERE DUPLICATES OCCUR
result = conn.execute(RSSEntries.insert(), {'feed_id': id, 'short_url': tinyurl,
'content': content, 'hashtags': hashtags, 'date': date})
entry_id = result.last_inserted_ids()[0]
#Look up tag identifiers and create any that don't exist:
tags = tag_table
tag_id_query = select([tags.c.tagname, tags.c.id], tags.c.tagname.in_(hashtags2))
tag_ids = dict(conn.execute(tag_id_query).fetchall())
for tag in hashtags2:
if tag not in tag_ids:
result = conn.execute(tags.insert(), {'tagname': tag})
tag_ids[tag] = result.last_inserted_ids()[0]
#insert data into entrytag table
if hashtags2: conn.execute(entrytag_table.insert(),
[{'entryid': entry_id, 'tagid': tag_ids[tag]} for tag in hashtags2])
#Rest of file completes the threading process
def thread():
while True:
try:
id, feed_url = jobs.get(False) # False = Don't wait
except Queue.Empty:
return
entries = feedparser.parse(feed_url).entries
rss_to_process.put((id, entries), True) # This will block if full
for info in feeds: # Queue them up
jobs.put([info['id'], info['url']])
for n in xrange(THREAD_LIMIT):
t = threading.Thread(target=thread)
t.start()
while threading.activeCount() > 1 or not rss_to_process.empty():
# That condition means we want to do this loop if there are threads
# running OR there's stuff to process
try:
id, entries = rss_to_process.get(False, 1) # Wait for up to a second
except Queue.Empty:
continue
store_feed_items(id, entries)
스키마를 제공하면 소스 코드에서 추론 할 필요가 없으므로 도움이됩니다. – Fragsworth
감사합니다. 위의 내용을 추가하겠습니다. –