2012-12-12 5 views
1

Cassandra 인서트를 더 빠르게 삽입하려면 멀티 스레딩을 사용하고 있습니다. 작동하는 것이 좋지만, 스레드를 더 추가하면 더 많은 연결이 생성되지 않는다고 생각합니다. 사용하고 있어야한다고 생각합니다. pool.execute (f, * args, ** kwargs) 그러나 그것을 사용하는 방법을 모릅니다. 문서가 꽤 부족합니다. Heres는 내 코드 지금까지 ..Cassandra Pycassa 연결 풀, 제대로 사용하는 방법?

이 또 다른 파일
import connect_to_ks_bp 
from connect_to_ks_bp import ks_refs 
import time 
import pycassa 
from datetime import datetime 
import json 
import threadpool 
pool = threadpool.ThreadPool(20) 
count = 1 
bench = open("benchCassp20_100000.txt", "w") 

def process_tasks(lines): 

    #let threadpool format your requests into a list 
    requests = threadpool.makeRequests(insert_into_cfs, lines) 

    #insert the requests into the threadpool 
    for req in requests: 
     pool.putRequest(req) 

    pool.wait() 

def read(file): 
    """read data from json and insert into keyspace""" 
    json_data=open(file) 
    lines = [] 
    for line in json_data: 
     lines.append(line) 
    print len(lines) 
    process_tasks(lines) 


def insert_into_cfs(line): 
    global count 
    count +=1 
    if count > 5000: 
      bench.write(str(datetime.now())+"\n") 
      count = 1 
    #print count 
    #print kspool.checkedout() 
    """ 
    user_tweet_cf = pycassa.ColumnFamily(kspool, 'UserTweet') 
    user_name_cf = pycassa.ColumnFamily(kspool, 'UserName') 
    tweet_cf = pycassa.ColumnFamily(kspool, 'Tweet') 
    user_follower_cf = pycassa.ColumnFamily(kspool, 'UserFollower') 
    """ 
    tweet_data = json.loads(line) 
    """Format the tweet time as an epoch seconds int value""" 
    tweet_time = time.strptime(tweet_data['created_at'],"%a, %d %b %Y %H:%M:%S +0000") 
    tweet_time = int(time.mktime(tweet_time)) 

    new_user_tweet(tweet_data['from_user_id'],tweet_time,tweet_data['id']) 
    new_user_name(tweet_data['from_user_id'],tweet_data['from_user_name']) 
    new_tweet(tweet_data['id'],tweet_data['text'],tweet_data['to_user_id']) 

    if tweet_data['to_user_id'] != 0: 
     new_user_follower(tweet_data['from_user_id'],tweet_data['to_user_id']) 


""""4 functions below carry out the inserts into specific column families"""   
def new_user_tweet(from_user_id,tweet_time,id): 
    ks_refs.user_tweet_cf.insert(from_user_id,{(tweet_time): id}) 

def new_user_name(from_user_id,user_name): 
    ks_refs.user_name_cf.insert(from_user_id,{'username': user_name}) 

def new_tweet(id,text,to_user_id): 
    ks_refs.tweet_cf.insert(id,{ 
    'text': text 
    ,'to_user_id': to_user_id 
    }) 

def new_user_follower(from_user_id,to_user_id): 
    ks_refs.user_follower_cf.insert(from_user_id,{to_user_id: 0}) 

    read('tweets.json') 
if __name__ == '__main__': 

..

import pycassa 
from pycassa.pool import ConnectionPool 
from pycassa.columnfamily import ColumnFamily 

"""This is a static class I set up to hold the global database connection stuff, 
I only want to connect once and then the various insert functions will use these fields a lot""" 
class ks_refs(): 
    pool = ConnectionPool('TweetsKS',use_threadlocal = True,max_overflow = -1) 

    @classmethod 
    def cf_connect(cls, column_family): 
     cf = pycassa.ColumnFamily(cls.pool, column_family) 
     return cf 

ks_refs.user_name_cfo = ks_refs.cf_connect('UserName') 
ks_refs.user_tweet_cfo = ks_refs.cf_connect('UserTweet') 
ks_refs.tweet_cfo = ks_refs.cf_connect('Tweet') 
ks_refs.user_follower_cfo = ks_refs.cf_connect('UserFollower') 

#trying out a batch mutator whihc is supposed to increase performance 
ks_refs.user_name_cf = ks_refs.user_name_cfo.batch(queue_size=10000) 
ks_refs.user_tweet_cf = ks_refs.user_tweet_cfo.batch(queue_size=10000) 
ks_refs.tweet_cf = ks_refs.tweet_cfo.batch(queue_size=10000) 
ks_refs.user_follower_cf = ks_refs.user_follower_cfo.batch(queue_size=10000) 

답변

0

몇 가지 생각 : 10,000

  • 배치 크기가 너무 큽니다. 100을 시도하십시오.
  • pool_size 매개 변수를 사용하여 적어도 ConnectionPool 크기를 스레드 수만큼 늘리십시오. 기본값은 5입니다. 풀 오버플로는 활성 스레드 수가 고정 된 수의 스레드가 아닌 시간에 따라 다를 수있는 경우에만 사용해야합니다. 그 이유는 새로운 연결을 불필요하게 열거 나 닫는 결과를 낳을 것이기 때문에 상당히 비싼 프로세스입니다. 당신이 그 문제를 해결 한 후에는

, 이러한 조사 :

  • 난 당신이 사용하고있는 스레드 라이브러리에 익숙하지 않다. 그림에서 Cassandra 삽입을 수행하면 스레드 수를 늘릴 때 성능이 향상되는 것을 확인하십시오.
  • 파이썬에는 GIL로 인해 얼마나 많은 스레드가 유용 할 수 있는지에 대한 제한이 있습니다. 일반적으로 20을 넘지 않아야합니다. 그러나 CPU 집중적 인 작업이나 많은 Python 해석이 필요한 작업을 수행하는 경우 일 수 있습니다. 필자가 이전에 설명한 테스트에서이 점에 대해서도 다룰 것입니다. multiprocessing 모듈 사용을 고려해야 할 수도 있지만,이를 처리하려면 코드 변경이 필요합니다 (즉, ConnectionPools, CF 또는 프로세스간에 거의 공유되지 않음).