2012-06-26 3 views
2

나는 700K 이상의 라인을 가진 거대한 CSV를 가지고있다. 나는 그 CSV 라인을 파싱하고 작전을 수행해야한다. 나는 스레딩을 사용하여 그것을 생각했다. 내가 처음에 시도하는 것은 간단합니다. 모든 스레드는 csv의 고유 한 행을 가져야합니다. 3000 행으로 제한된 수의 행을 읽었습니다. 3 개의 스레드를 만듭니다. 각 스레드는 csv 행을 읽어야합니다. 전달 된 파일이 다음의 BufferedReader를 생성하고 제공하는 경우거대한 CSV 라인을 읽는 자바 쉐이딩 프로그램

import java.io.*; 

class CSVOps implements Runnable 
{ 
    static int lineCount = 1; 
    static int limit = 3000; 
    BufferedReader CSVBufferedReader; 
    public CSVOps(){} // default constructor 
    public CSVOps(BufferedReader br){ 
     this.CSVBufferedReader = br; 
    } 
    private synchronized void readCSV(){ 
     System.out.println("Current thread "+Thread.currentThread().getName()); 
     String line; 
     try { 
      while((line = CSVBufferedReader.readLine()) != null){ 
       System.out.println(line); 
       lineCount ++; 
       if(lineCount >= limit){ 
        break; 
       } 
      } 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    }  
    public void run() { 
     readCSV(); 
    } 
} 

class CSVResourceHandler 
{ 
    String CSVPath; 

    public CSVResourceHandler(){ }// default constructor 

    public CSVResourceHandler(String path){ 
     File f = new File(path); 
     if(f.exists()){ 
      CSVPath = path; 
     }else{ 
      System.out.println("Wrong file path! You gave: "+path); 
     } 
    } 
    public BufferedReader getCSVFileHandler(){ 
     BufferedReader br = null; 
     try{ 
      FileReader is = new FileReader(CSVPath); 
      br = new BufferedReader(is); 
     }catch(Exception e){ 

     } 
     return br; 
    } 
} 
public class invalidRefererCheck 
{ 
    public static void main(String [] args) throws InterruptedException 
    { 
     String pathToCSV = "/home/shantanu/DEV_DOCS/Contextual_Work/invalid_domain_kw_site_wise_click_rev2.csv"; 
     CSVResourceHandler csvResHandler = new CSVResourceHandler(pathToCSV); 
     CSVOps ops = new CSVOps(csvResHandler.getCSVFileHandler()); 
     Thread t1 = new Thread(ops); 
     t1.setName("T1"); 
     Thread t2 = new Thread(ops); 
     t1.setName("T2"); 
     Thread t3 = new Thread(ops); 
     t1.setName("T3"); 
     t1.start(); 
     t2.start(); 
     t3.start(); 
    } 
} 

클래스 CSVResourceHandler 간단한 발견 : 다음은 코드입니다. 이 리더는 CSVOps 클래스로 전달됩니다. 그것에는 csv의 한 줄을 읽고 그것을 인쇄하는 readCSV 메쏘드가 있습니다. 한도는 3000으로 설정되어 있습니다.

이제 스레드가 카운트를 잘못 계산하는 경우, 그 한계 및 카운트 변수를 모두 정적으로 선언합니다. 이 프로그램을 실행하면 이상한 결과를 얻습니다. 나는 약 1000 개의 레코드 만 얻는다. 나는 1500을 얻는다. 그들은 무작위 순서로있다. 출력이 끝나면 2 줄의 csv가 나오고 현재 쓰레드 이름이 나온다.

나는 스레드에서 매우 초보자입니다. 내가 원한 것은이 CSV를 읽는 것이 빨리되어야한다는 것입니다. 수행 할 수있는 것을 제안하십시오.

+6

하나 이상의 읽기 스레드를 사용하면 도움이되지 않습니다. 차단 지점은 CPU가 아니라 IO입니다. –

+2

하나의 스레드로 작업하고 두 번째 스레드에서 사용할 수있는 대기열로 읽으면 각 줄을 전달할 것을 제안합니다. 이렇게하면 주문이 보존됩니다. –

+0

@PeterLawrey : 멋진 아이디어! – Shades88

답변

0

큰 덩어리로 파일을 읽는 것이 좋습니다. 큰 버퍼 객체를 할당하고, 청크를 읽고, 마지막 EOL char를 찾기 위해 마지막에서 다시 구문 분석하고, 임시 문자열로 버퍼의 마지막 비트를 복사하고, EOL + 1에서 버퍼에 null을 밀어 넣고, 버퍼를 대기열에 넣습니다 참조, 즉시 새 문자열을 만들고 임시 문자열을 먼저 복사 한 다음 나머지 버퍼를 채우고 EOF까지 반복하십시오. 완료 될 때까지 반복하십시오. 스레드 풀을 사용하여 버퍼를 구문 분석/처리하십시오.

유효 라인의 전체 청크를 큐에 넣어야합니다. 단일 행을 대기열에 넣으면 스레드 통신이 구문 분석보다 오래 걸릴 수 있습니다.

이와 비슷하게, 풀의 스레드가 청크를 '순서가 잘못되어 처리'할 수 있습니다. 순서를 유지해야하는 경우 (예 : 입력 파일이 정렬되고 출력이 정렬 된 상태로 유지되어야하는 다른 파일로 이동하는 경우) 청크 어셈블러 스레드가 각 청크 객체에 시퀀스 번호를 삽입하도록 할 수 있습니다. 그런 다음 풀 스레드는 처리 된 버퍼를 이전의 모든 청크가 들어올 때까지 순서가 잘못된 청크 목록을 유지하는 또 다른 스레드 (또는 작업)에 전달할 수 있습니다.

다중 스레드는 어렵거나 위험 할 필요가 없습니다 /효과적인. 대기열/풀/작업을 사용하고 동기화/결합을 피하고 스레드를 지속적으로 생성/종료/파괴하지 않고 한 번에 하나의 스레드 만 작동하는 대형 버퍼 객체 주위에 대기열을 만들지 않으면 속도가 빨라집니다. 교착 상태, 가짜 공유 등의 가능성은 전혀 없습니다.

이러한 속도 향상의 다음 단계는 버퍼의 풀 대기열을 미리 할당하여 버퍼 및 관련 GC의 연속 작성/삭제와 시작시 [L1 캐시 크기] '데드 존'을 제거하는 것입니다 캐시 공유를 완전히 제거 할 수 있습니다. 멀티 코어 박스에서 SSD를 사용하면 충분합니다.

편집 - 오, 자바, 맞아. 널 터미네이터에 대한 나의 대답의 'CplusPlus-iness'에 대해 사과드립니다. 나머지 포인트는 괜찮습니다. 이것은 언어에 의존하지 않는 대답이어야합니다 :)

+0

답변에 감사드립니다. 이것은 언어에 구애받지 않는 대답이어야한다. 네, 그걸 기억하고 내가 자바에 맞을 수 있는지 보자 :) – Shades88

2

좋아, 먼저 하나의 기계 디스크에서 병렬 I/O를 수행하는 데 여러 개의 스레드를 사용하지 마십시오. 스레드가 실행될 때마다 기계 헤드가 다음 읽기 위치를 찾아야하기 때문에 실제로 성능이 저하됩니다. 따라서 불필요하게 비용이 많이 드는 작업 인 디스크의 헤드를 불필요하게 튀게됩니다.

단일 생산자 다중 소비자 모델을 사용하여 단일 스레드를 사용하여 행을 읽고 작업자 풀을 사용하여 처리합니다. 당신의 문제에

:

스레드가 주요 종료하기 전에 완료 될 때까지 실제로 대기하지 않나요?

public class invalidRefererCheck 
{ 
    public static void main(String [] args) throws InterruptedException 
    { 
     ... 
     t1.start(); 
     t2.start(); 
     t3.start(); 

     t1.join(); 
     t2.join(); 
     t3.join(); 
    } 
} 
+0

기본적으로 자바 스레드가 데몬되지 않습니다 그래서 그는 그럴 필요가 없습니다. – Voo

+0

'단일 생산자 다중 소비자 모델을 사용하여 단일 스레드를 사용하여 행을 읽고 작업자 풀을 사용하여 처리합니다 .' 단서를 가져 주셔서 감사합니다. 그리고 join()은 실제로 하나의 쓰레드가 다른 쓰레드의 처리를 대기하므로 모든 쓰레드가 끝날 때까지 main을 멈추지 않을 것입니다. 나는 이론적으로 그렇게했다 : O – Shades88

+0

@ Shades88 : 사실상 아니오. 모든 스레드가 시작된 후에는 주 스레드와 병렬로 실행됩니다. 'join'은 자식 스레드가 끝날 때까지 메인 스레드가 진행하지 못하도록 보장하지만 자식 프로세스는 여전히 서로 병렬 적으로 작동합니다. – Tudor

관련 문제