2009-09-18 6 views
3

로컬 디스크 디렉토리 (파일 잠금 사용)에 저장된 많은 수의 파일을 처리하기 위해 Java에서 여러 스레드를 사용하는 방법로컬 디스크에 저장된 많은 수의 파일을 처리하기 위해 다중 스레드를 사용하는 방법 (파일 잠금 사용)

+4

내가하지 말라고 조언하고 싶습니다. 많은 수의 파일을 처리 할 때 CPU가 아닌 디스크 I/O가 사용자를 죽입니다. 다중 스레드는 병목 현상을 악화시킵니다. – Joey

+0

@Johannes는 일반적으로 사실이지만 처리, 디스크 버퍼링 및 서로 다른 물리적 미디어 간의 파일 배포에 의존합니다. 처리가 엄청나게 복잡하고 디스크 I/O 시간을 훨씬 능가 할 수 있습니다. – paxdiablo

+0

인원 : 그래서 "아마"가 거기에 있습니다. 그러나 "파일 수가 많다"는 것은 나에게 약 10k에서 시작하며 각각 3 분이 걸리면 다음 몇 달 동안 다른 컴퓨터를 찾는 것과 같은 다른 걱정이있을 것입니다. – Joey

답변

4

내가 알고있는 가장 좋은 방법은 제작자/다중 소비자 패러다임을 사용하는 것이다.

하나의 스레드가 대기열을 생성 한 다음 N 개의 다른 스레드를 시작하십시오. 이 주 스레드는 모든 파일을 열거하고 이름을 해당 대기열에 배치합니다. 그런 다음 대기열 끝에 N 대기열 마커를 배치합니다.

"다른"스레드는 해당 큐에서 다음 이름을 읽고 파일을 처리합니다. 그들이 end-of-queue 마커를 읽을 때 그들은 빠져 나간다 (그리고 메인 쓰레드는 필요할 경우 종료 상태를 얻을 수있다).

이렇게하면 대기열에 대한 스레드 간의 통신이 단순 해집니다 (물론 모든 스레드에서 경쟁 조건이 발생하지 않도록 뮤텍스로 보호해야 함). 또한 스레드가 특정 멀티 스레딩 문제를 피할 수있는 또 다른 좋은 방법 인 자신의 종료 조건 (주 스레드의 방향으로)을 제어 할 수 있습니다.

0

많은 동시 작업이 Java 동시성 클래스에서 수행되었습니다. 아마도 ConcurrentLinkedQueue과 같은 것을 원할 것입니다.

링크 된 노드를 기반으로 한 제한되지 않은 스레드 안전 큐입니다. 이 대기열은 요소 FIFO (선입 선출)를 정렬합니다. 대기열의 헤드는 가장 긴 시간 대기열에 있었던 요소입니다. 대기열의 꼬리는 가장 짧은 시간에 대기열에 있었던 요소입니다. 새로운 요소는 큐의 끝에 삽입되고 큐 검색 조작은 큐의 헤드에서 요소를 가져옵니다. ConcurrentLinkedQueue는 많은 스레드가 공통 콜렉션에 대한 액세스를 공유 할 때 적절한 선택입니다.

주 스레드 또는 별도 스레드의 큐에 항목을 넣으려면 offer() 메서드를 사용합니다. 그런 다음 poll() 메서드를 사용하여 대기열에서 다음 항목을 가져 와서 처리하는 작업자 꿀벌 (이상적으로는 ExecutorService과 같이 생성됨)이 있습니다.

이 디자인을 사용하면 대기/폴링 코드를 직접 수행 할 필요없이 생성자 수와 동시 실행 소비자 수를 엄청나게 유연하게 결정할 수 있습니다. Executors.newFixedThreadPool()을 사용하여 미니언 풀을 만들 수 있습니다.

0

당신이 정말로하고 싶은 것은 주 프로그램이 File 참조를 얻는 디렉토리를 가로 지르게합니다. 이러한 참조를 사용하여 Runnable을 구현하는 객체를 만듭니다. Runnable의 run() 메소드는 모든 처리 로직입니다. ExecutorService을 작성하고 execute (Runnable)를 호출하여 실행 프로그램에 태스크를 제출하십시오. Executor는 당신이 만든 Executor의 타입에 기반하여 쓰레드가 사용 가능해 지도록 요청합니다. Executors.newFixedThreadPool()은 좋은 선택입니다. 메인 쓰레드가 모든 파일을 찾아서 작업으로 제출하면, shutdown() executor를 호출하면 executor가 주어진 작업의 실행을 완료 한 다음 닫고 awaitTermination()을 호출하면 Executor가 종료 될 때까지 주 스레드가 차단됩니다. . http://java.sun.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination(long, java.util.concurrent.TimeUnit을)

3

나는 보통 그것을 할 방법은 다음과 같습니다 물론 그 모든 작업을 완료 한 후 더 많은 처리를 할

[6]까지 기다려야 할 생각했습니다.

이 같은 차단 큐를 만들 수 있습니다

LinkedBlockingQueue<String> files; 
files = new LinkedBlockingQueue<String>(1000); 
AtomicBoolean done = new AtomicBoolean(false); 

당신이 어떤 방법 억원 파일 또는 무엇이든이있는 경우, 당신이 밖으로 실행에 대해 걱정할 필요가 없습니다 큐는 1000 개 요소를 저장할 수 메모리의. 당신이 차지할 메모리 양에 따라 원하는 크기로 크기를 변경할 수 있습니다. 공간이 큐에서 사용할 수있을 때까지

File directory = new File("path\to\folder"); 
for(File file : directory.listFiles()){ 
    files.put(file.getAbsolutePath()); 
} 
files.put(null);//this last entry tells the worker threads to stop 

올리기 기능 블록, 그래서 당신이 채울 경우 파일이 읽기 중지 : 메인 스레드에서

당신이 그런 짓을. 물론 File.listFiles()는 실제로 메모리에로드 될 필요가없는 Collection이 아니라 배열을 반환하기 때문에이 함수를 사용하면 파일의 전체 목록을 메모리에로드 할 수 있습니다. 그게 문제가된다면 다른 일을해야 할 것 같아요.

그러나이 모델은 파일을 나열하는 다른 방법이있는 경우에도 작동합니다 (예 : 데이터베이스에있는 경우 등). directory.listFiles()의 호출을 파일 목록. 또한 하위 디렉토리에서 파일을 처리해야하는 경우 재귀 적으로 처리해야하므로 성가신 일이 될 수 있습니다 (이 경우 매우 큰 디렉토리의 메모리 문제가 발생합니다)

다음 작업자 스레드에서 :

public void run(){ 
    while(!done.get()){ 
     String filename = files.take(); 
     if(filename != null){ 
     //do stuff with your file. 
     } 
     else{ 
     done.set(true);//signal to the other threads that we found the final element. 
     } 
    } 
} 

대기열의 모든 파일이 처리 된 경우 take는 새로운 요소가 나타날 때까지 대기합니다.

어쨌든이 코드는 내 머리 꼭대기에서 떨어져 있으므로 정확하게 테스트되지 않았습니다.

+0

'files.put (null);'줄 것이다 사양에 따른 널 포인터 예외 – VHS

5

평행선으로 파일을 읽지 않으려합니다 (디스크 I/O가 잘 병렬화되지 않음). 단일 스레드가 파일을 읽고 병렬 처리를 위해 내용을 작업자 스레드로 보내고 작업자로부터 결과를 수집하도록하는 것이 더 좋습니다. 우수 ExecutorService & c : o를 java.util.concurrent에서 사용하면 스레딩의 세부적인 내용을 잊어 버리고 솔루션을 훨씬 유연하게 만들 수 있습니다.

다음은 간단한 예입니다.

public List<Foo> processFiles(Iterable<File> files){ 
    List<Future<Foo>> futures = new ArrayList<Future<Foo>>(); 
    ExecutorService exec = Executors.newFixedThreadPool(
     Runtime.getRuntime().availableProcessors()); 
    for (File f : files){ 
     final byte[] bytes = readAllBytes(f); // defined elsewhere 
     futures.add(exec.submit(new Callable<Foo>(){ 
      public Foo call(){ 
       InputStream in = new ByteArrayInputStream(bytes); 
       // Read a Foo object from "in" and return it 
      } 
     })); 
    } 
    List<Foo> foos = new List<Foo>(futures.size()); 
    for (Future<Foo> f : futures) foos.add(f.get()); 
    exec.shutdown(); 
    return foos; 
} 

TODO : Foo를 가정하면 파일을 처리하는 결과 또한 그래서 당신은 전화 사이를 재사용 할 수 processFilesExecutorService 외부를 인스턴스화 할 수 있습니다 등 예외 처리를 추가합니다.

0

저는 수천 개의 텍스트 파일을 처리해야하는 비슷한 문제에 대해 작업하고 있습니다. 디렉토리를 폴링하고 디렉토리 (하위 디렉토리 포함)에서 발견 된 파일 목록을 준비하는 파일 폴러를 가지고 있고, 메소드로 말하면 list와 함께 fileFound를 인수로 호출합니다.

fileFound 메서드에서 목록을 반복하고 각 파일에 대해 새 스레드를 만듭니다. ExecutorService를 사용하여 활성 스레드 수를 제어하고 있습니다. 코드는 다음과 같이 진행됩니다

public void fileFound(List<File> fileList) { 
    for (File file : fileList) { 
     FileProcessor fprocessor = new FileProcessor(file); // run() method takes care of implementing business rules for the file. 
     EXECUTOR.submit(fprocessor); //ExecutorService EXECUTOR = Executors.newFixedThreadPool(10); 
    } 
} 

내 관찰 :

  1. 처리 파일 하나 하나, 멀티 스레딩없이 처리 3.5K 파일 (~ 32GB 총), ~ 9 시간이 걸렸습니다.
  2. 멀티 스레딩 사용 :

    하면 (5)에 고정되는 스레드 수 - 1백18분.

    스레드 수가 10 - 75 분으로 고정 된 경우.

    스레드 수가 15 - 72 분으로 고정 된 경우.

+0

10 개의 스레드가 최적 인 CPU 코어 수를 공유 할 수 있습니까? –

+0

@sunny_dev이 기계에는 4 개의 코어가 있습니다. – Amarjeet

+0

은 논리 코어 또는 실제 코어입니다. –

1

Java 8의 경우 parallel streams을 사용하면 쉽게 얻을 수 있습니다. 다음 코드를 참조하십시오 : 병렬 스트림으로

try { 
     Files.walk(Paths.get("some-path")).parallel().forEach(file -> {/*do your processing*/}); 
    } catch (IOException e1) { 
     e1.printStackTrace(); 
    } 

을, 스레드의 필요한 수의 병렬로, 우리의 경우 콜렉션 요소, 파일을 처리하기 위해, CPU의 논리 코어 수를 초과하지 않는를 생성합니다 런타임 . JVM 인수로 전달하여 스레드 수를 제어 할 수도 있습니다.

이 방법의 장점은 스레드를 만들고 유지 관리하는 저수준 작업을 실제로 수행 할 필요가 없다는 것입니다. 당신은 당신의 높은 수준의 문제에 집중합니다.

관련 문제