2016-07-15 3 views
1

저는 gzipped 된 매우 큰 CSV 파일을 읽기 위해 팬더를 사용합니다. 약 30-50GB의 csv 파일에 압축을 풉니 다. 파일을 청크 처리하고 조작합니다. 마지막으로 내가 뭔가 잘 작동하지만 하루에 하나 개의 파일을 처리해야하고30-50Gb 더하기 파일 청크를 피하기 위해 RAM을 구입하십시오.

이 구매 수있는 데이터의 가치가 몇 년 (600TB 압축 CSV)을 갖고 있기 때문에 느린

을 압축 HDF5 파일에 관련 데이터를 추가 더 많은 숫양은 청킹을 방지하고 프로세스의 속도를 높이는 좋은 방법이 될 수 있습니다. 64GB/128GB라고할까요? 하지만 팬더는 느리고 다루기 힘들 수 있습니까? C++로 전환하면 프로세스 속도가 빨라질 수는 있지만 여전히 읽기 프로세스가 어려워서 데이터를 처리해야하는 경우가 있습니다. 마지막으로 누가이 문제를 해결할 수있는 최선의 방법에 대한 생각을 가지고 있습니까?

일단 작업이 완료되면 다시 돌아가서 데이터를 다시 처리해야하므로 합리적인 시간에 작업을 수행하여 병렬 프로세스가 좋지만 제한된 경험을 할 수 있도록 작성해야합니다. 그 지역을 건설하는 데는 시간이 걸릴 것입니다. 그렇게하지 않는 편이 좋습니다.

업데이트. 나는 코드를보기가 더 쉬울 것이라고 생각한다. 나는 코드가 어쨌든 특히 느리다는 것을 믿지 않는다. 나는 기술/방법론이있을 것이라고 생각한다.

def txttohdf(path, contract): 
    #create dataframes for trade and quote 
    dftrade = pd.DataFrame(columns = ["datetime", "Price", "Volume"]) 
    dfquote = pd.DataFrame(columns = ["datetime", "BidPrice", "BidSize","AskPrice", "AskSize"]) 
    #create an hdf5 file with high compression and table so we can append 
    hdf = pd.HDFStore(path + contract + '.h5', complevel=9, complib='blosc') 
    hdf.put('trade', dftrade, format='table', data_columns=True) 
    hdf.put('quote', dfquote, format='table', data_columns=True) 
    #date1 = date(start).strftime('%Y%m%d') 
    #date2 = date(end).strftime('%Y%m%d') 
    #dd = [date1 + timedelta(days=x) for x in range((date2-date1).days + 1)] 
    #walkthrough directories 
    for subdir, dir, files in os.walk(path): 
     for file in files: 
      #check if contract has name 
      #print(file) 
       #create filename from directory and file 

      filename = os.path.join(subdir, file) 
       #read in csv 
      if filename.endswith('.gz'): 

       df = pd.read_csv(gzip.open(filename),header=0,iterator=True,chunksize = 10000, low_memory =False, names = ['RIC','Date','Time','GMTOffset','Type','ExCntrbID','LOC','Price','Volume','MarketVWAP','BuyerID','BidPrice','BidSize','NoBuyers','SellerID','AskPrice','AskSize','NoSellers','Qualifiers','SeqNo','ExchTime','BlockTrd','FloorTrd','PERatio','Yield','NewPrice','NewVol','NewSeqNo','BidYld','AskYld','ISMABidYld','ISMAAskYld','Duration','ModDurtn','BPV','AccInt','Convexity','BenchSpd','SwpSpd','AsstSwpSpd','SwapPoint','BasePrice','UpLimPrice','LoLimPrice','TheoPrice','StockPrice','ConvParity','Premium','BidImpVol','AskImpVol','ImpVol','PrimAct','SecAct','GenVal1','GenVal2','GenVal3','GenVal4','GenVal5','Crack','Top','FreightPr','1MnPft','3MnPft','PrYrPft','1YrPft','3YrPft','5YrPft','10YrPft','Repurch','Offer','Kest','CapGain','Actual','Prior','Revised','Forecast','FrcstHigh','FrcstLow','NoFrcts','TrdQteDate','QuoteTime','BidTic','TickDir','DivCode','AdjClose','PrcTTEFlag','IrgTTEFlag','PrcSubMktId','IrgSubMktId','FinStatus','DivExDate','DivPayDate','DivAmt','Open','High','Low','Last','OpenYld','HighYld','LowYld','ShortPrice','ShortVol','ShortTrdVol','ShortTurnnover','ShortWeighting','ShortLimit','AccVolume','Turnover','ImputedCls','ChangeType','OldValue','NewValue','Volatility','Strike','Premium','AucPrice','Auc Vol','MidPrice','FinEvalPrice','ProvEvalPrice','AdvancingIssues','DecliningIssues','UnchangedIssues','TotalIssues','AdvancingVolume','DecliningVolume','UnchangedVolume','TotalVolume','NewHighs','NewLows','TotalMoves','PercentageChange','AdvancingMoves','DecliningMoves','UnchangedMoves','StrongMarket','WeakMarket','ChangedMarket','MarketVolatility','OriginalDate','LoanAskVolume','LoanAskAmountTradingPrice','PercentageShortVolumeTradedVolume','PercentageShortPriceTradedPrice','ForecastNAV','PreviousDaysNAV','FinalNAV','30DayATMIVCall','60DayATMIVCall','90DayATMIVCall','30DayATMIVPut','60DayATMIVPut','90DayATMIVPut','BackgroundReference','DataSource','BidSpread','AskSpread','ContractPhysicalUnits','Miniumumquantity','NumberPhysicals','ClosingReferencePrice','ImbalanceQuantity','FarClearingPrice','NearClearingPrice','OptionAdjustedSpread','ZSpread','ConvexityPremium','ConvexityRatio','PercentageDailyReturn','InterpolatedCDSBasis','InterpolatedCDSSpread','ClosesttoMaturityCDSBasis','SettlementDate','EquityPrice','Parity','CreditSpread','Delta','InputVolatility','ImpliedVolatility','FairPrice','BondFloor','Edge','YTW','YTB','SimpleMargin','DiscountMargin','12MonthsEPS','UpperTradingLimit','LowerTradingLimit','AmountOutstanding','IssuePrice','GSpread','MiscValue','MiscValueDescription']) 
       #parse date time this is quicker than doing it while we read it in 
       for chunk in df: 
        chunk['datetime'] = chunk.apply(lambda row: datetime.datetime.strptime(row['Date']+ ':' + row['Time'],'%d-%b-%Y:%H:%M:%S.%f'), axis=1) 
        #df = df[~df.comment.str.contains('ALIAS')] 
       #drop uneeded columns inc date and time 
        chunk = chunk.drop(['Date','Time','GMTOffset','ExCntrbID','LOC','MarketVWAP','BuyerID','NoBuyers','SellerID','NoSellers','Qualifiers','SeqNo','ExchTime','BlockTrd','FloorTrd','PERatio','Yield','NewPrice','NewVol','NewSeqNo','BidYld','AskYld','ISMABidYld','ISMAAskYld','Duration','ModDurtn','BPV','AccInt','Convexity','BenchSpd','SwpSpd','AsstSwpSpd','SwapPoint','BasePrice','UpLimPrice','LoLimPrice','TheoPrice','StockPrice','ConvParity','Premium','BidImpVol','AskImpVol','ImpVol','PrimAct','SecAct','GenVal1','GenVal2','GenVal3','GenVal4','GenVal5','Crack','Top','FreightPr','1MnPft','3MnPft','PrYrPft','1YrPft','3YrPft','5YrPft','10YrPft','Repurch','Offer','Kest','CapGain','Actual','Prior','Revised','Forecast','FrcstHigh','FrcstLow','NoFrcts','TrdQteDate','QuoteTime','BidTic','TickDir','DivCode','AdjClose','PrcTTEFlag','IrgTTEFlag','PrcSubMktId','IrgSubMktId','FinStatus','DivExDate','DivPayDate','DivAmt','Open','High','Low','Last','OpenYld','HighYld','LowYld','ShortPrice','ShortVol','ShortTrdVol','ShortTurnnover','ShortWeighting','ShortLimit','AccVolume','Turnover','ImputedCls','ChangeType','OldValue','NewValue','Volatility','Strike','Premium','AucPrice','Auc Vol','MidPrice','FinEvalPrice','ProvEvalPrice','AdvancingIssues','DecliningIssues','UnchangedIssues','TotalIssues','AdvancingVolume','DecliningVolume','UnchangedVolume','TotalVolume','NewHighs','NewLows','TotalMoves','PercentageChange','AdvancingMoves','DecliningMoves','UnchangedMoves','StrongMarket','WeakMarket','ChangedMarket','MarketVolatility','OriginalDate','LoanAskVolume','LoanAskAmountTradingPrice','PercentageShortVolumeTradedVolume','PercentageShortPriceTradedPrice','ForecastNAV','PreviousDaysNAV','FinalNAV','30DayATMIVCall','60DayATMIVCall','90DayATMIVCall','30DayATMIVPut','60DayATMIVPut','90DayATMIVPut','BackgroundReference','DataSource','BidSpread','AskSpread','ContractPhysicalUnits','Miniumumquantity','NumberPhysicals','ClosingReferencePrice','ImbalanceQuantity','FarClearingPrice','NearClearingPrice','OptionAdjustedSpread','ZSpread','ConvexityPremium','ConvexityRatio','PercentageDailyReturn','InterpolatedCDSBasis','InterpolatedCDSSpread','ClosesttoMaturityCDSBasis','SettlementDate','EquityPrice','Parity','CreditSpread','Delta','InputVolatility','ImpliedVolatility','FairPrice','BondFloor','Edge','YTW','YTB','SimpleMargin','DiscountMargin','12MonthsEPS','UpperTradingLimit','LowerTradingLimit','AmountOutstanding','IssuePrice','GSpread','MiscValue','MiscValueDescription'], axis=1) 
       # convert to datetime explicitly and add nanoseconds to same time stamps 
        chunk['datetime'] = pd.to_datetime(chunk.datetime) 
       #nanoseconds = df.groupby(['datetime']).cumcount() 
       #df['datetime'] += np.array(nanoseconds, dtype='m8[ns]') 
       # drop empty prints and make sure all prices are valid 
        dfRic = chunk[(chunk["RIC"] == contract)] 
        if len(dfRic)>0: 
         print(dfRic) 
        if ~chunk.empty: 
         dft = dfRic[(dfRic["Type"] == "Trade")] 
         dft.dropna(subset = ["Volume"], inplace =True) 
         dft = dft.drop(["RIC","Type","BidPrice", "BidSize", "AskPrice", "AskSize"], axis=1) 
         dft = dft[(dft["Price"] > 0)] 

        # clean up bid and ask 
         dfq = dfRic[(dfRic["Type"] == "Quote")] 
         dfq.dropna(how = 'all', subset = ["BidSize","AskSize"], inplace =True) 
         dfq = dfq.drop(["RIC","Type","Price", "Volume"], axis=1) 
         dfq = dfq[(dfq["BidSize"] > 0) | (dfq["AskSize"] > 0)] 
         dfq = dfq.ffill() 
        else: 
         print("Empty")  
    #add to hdf and close if loop finished 
        hdf.append('trade', dft, format='table', data_columns=True) 
        hdf.append('quote', dfq, format='table', data_columns=True) 
    hdf.close() 
+0

느린 점과 왜 느린 이유를 설명 할 수 있습니까? 세부 사항이 없으면 프로세스 속도를 높이는 데 도움이되는 정보를 추측하기가 어렵습니다. –

+1

프로그램의 성능을 프로파일 링하고 측정하여 가장 느린 점과 메모리 또는 CPU 전력이 제한 요소인지 파악해야합니다. 그러면 특정 변경 사항이 도움이 될 수있는 범위를 좁히는 데 도움이됩니다. 그런 다음 소스 코드의 가장 느린 부분을 http://codereview.stackexchange.com/의 질문에 업로드하고 성능 향상에 대한 조언을 구할 수 있습니다. – gfv

+0

gzipped CSV를 청크로 읽으려고합니다. 먼저 압축을 풀지 말고 IO (대개 가장 느린 부품 중 하나)가 적어야합니다. 그보다 더 많은 RAM을 가지고 있다면 더 큰 덩어리를 가질 수 있어야합니다. 그렇지 않으면 덩어리없이 RAM을 대략적으로 수행해야합니다. 결과 DF에 비해 2 배 더 큽니다. 동일한 서버/컴퓨터에서 병행 처리 (DASK를 의미하는 경우)는 오버 헤드로 인해 모든 것을 훨씬 악화시킬 수 있습니다. Apache PySpark SQL에서 실제 전력이 필요하다면 하둡 클러스터에 대한 투자가 늘어날 것입니다. 단지 2 센트입니다 ... – MaxU

답변

1

나는 당신이 최적화 할 수 있습니다 꽤 몇 가지 생각 :

모든 당신이 정말로 대신 읽는 데만 열을 읽은 다음 놓아의
  • 첫째 - usecols=list_of_needed_columns 매개 변수를 사용을

  • 당신의 chunksize 영역을 증가 - 다른 값으로 시도 - 내가 시작하는 것 10**5

  • 은 날짜 시간을 변환 chunk.apply(...)를 사용하지 마십시오 - 그것은 매우 느린 - 사용 pd.to_datetime (열, 형식 = '...') 여러 조건을 결합 할 때 대신

  • 당신이 당신의 데이터를보다 효율적으로 비트 필터링 할 수 있습니다 단계별로 수행하는 대신 다음을 수행하십시오.

+0

그 위대한, 변경 사항을 적용 할 것입니다 - 당신은 내가 잊어 버린 신청에 대해 확실히 맞습니다. – azuric

관련 문제