2012-07-04 2 views
7

동일한 InputStream을 동시에 처리하는 N 개의 소비자 스레드를 생성해야합니다. 예를 들어, 어떻게 든 변환하고, 체크섬 또는 디지털 서명 등을 계산합니다. 이러한 소비자는 서로 의존하지 않으며 모두 InputStream을 데이터 원본으로 허용하는 타사 라이브러리를 사용하고 있습니다.독립 소비자와 단일 InputStream의 동시 처리

그래서 내가 할 수있는 것입니다 - "부모"스트림의 데이터

  • 차단 해제 소비자의

    • 읽기 덩어리가 모든 소비자까지
    • 대기 전체 덩어리를 읽을의 InputStream의 일부 구현을 작성, 심플 보면서, 그것은 livelo 같은 다양한 문제를 상승 할 수

  • 은 다음 청크를 읽어 ck 특정 소비자가 죽을 때 모든 InputStream 메소드를 구현하고 장벽/래치 등을 사용하여 소비자 자체의 fork/join을 제어하십시오.

    한 명의 친구가 구현하는 데 30 분의 시간이 걸렸으며 내 저녁을 보냈습니다.

    Google 검색 결과가 너무 좋지 않아서 (google-fu는 충분하지 않습니까?) 성숙한 무언가를 사용하거나 전체 "원본"스트림을 임시 파일로 복사하지 마십시오. 이를 데이터 소스로 사용하십시오. 후자의 솔루션은 더 신뢰할만한 것으로 보이지만, 예를 들어 스트리밍 오디오를 처리 할 때 기가 바이트 파일을 생성 할 수도 있습니다.

  • +0

    파일에 데이터를 쓰고 N FileInputStream을 생성 할 수 있습니까? –

    +0

    @JonLin 질문이 끝날 때까지 그는 말했다. –

    답변

    3

    내가 보는 방식으로, 적어도 다른 종류의 소비자가 다른 속도로 스트림을 이동할 수 있도록 모든 종류의 버퍼링이 있어야합니다. 즉, 현재 가장 느린 소비자가 끊임없이 수렁에 빠져들지 않도록해야합니다. 기본적으로 최악의 성능과 동시성의 이점은 거의 없습니다.

    지금까지 사용해온 소비자로 각 청크에 태그를 지정한 다음 완전히 소진 된 소비자를 삭제할 수 있습니다. 아마도 아직 사용하지 않은 각 청크에 대한 참조를 보유한 각 소비자가 GC를 사용하여 자동으로 사용 된 청크를 처리 할 수 ​​있습니다. 제작자는 WeakReference의 목록을 청크에 보관하여 아직 사용할 청크의 수를 처리하고 이에 대한 조절을 기반으로 할 수 있습니다.

    또한 내부적으로 생산자 인 InputStream과 통신하는 스레드 당 별도의 InputStream 인스턴스가 있다고 생각합니다. 이 방법을 사용하면 라이브 록 위험을 쉽게 해결할 수 있습니다. try ... finally { is.close(); } - 죽어가는 소비자는 자체 입력 스트림을 닫습니다. 이것은 생산자에게 전달됩니다.

    소비자 당 ArrayBlockingQueue을 사용하여 아이디어가 있습니다. 생산자가 막히거나 바쁘게 기다리지 않고 모든 소비자가 제대로 먹이를 얻는 데 어려움이 있습니다.

    +0

    나는 5 초의 소비자가 1 초 동안 일하고 한 명의 소비자가 2 초 동안 일하는 동시에 동시 호출은 2 초를주고 순차적으로는 7 초가 주어지는 것은 거의 이득이 없다고 말하지 않을 것입니다. 아니면 여기에 뭔가 빠졌나요? 덩어리와 버퍼에 태그를 붙임으로써 메모리 소비를 피할 수 있습니다. – jdevelop

    +0

    예, 당신이 말하는 것은 불가피합니다. 그러나 소비자가 평균적으로 균형을 잡았지만 성과가 매우 다양하다면, 현재 뒤쳐져있는 각 소비자를 항상 기다리면 동의의 기회를 잃을 것입니다. 버퍼링이 도움이 될 것입니다. 스레드 우선 순위 밸런싱을 도입하면 실제로 이러한 상황을 달성 할 수 있습니다. –

    0

    파이프 스트림 사용을 고려 했습니까? 제작자는 파일에서 읽는 내용이 무엇이 던지 하나 이상의 PipedOuputStream을 가질 수 있습니다. 파이프의 다른쪽에는 해당 소비자 스레드가 해당 PipedInputstream (라이브러리와 공유 할 수있는 InputStream)에서 읽습니다.

    생산자 스레드는 파이프의 다른 쪽에서 읽는 특정 소비자 스레드에 대해 처리 할 데이터를 제공하여 파이프 데이터를 보내야 하는지를 결정할 수 있습니다.

    소비자 스레드에서 데이터를 다시 가져와야하는 경우 반대 방향으로 다른 파이프를 만들어 데이터를 다시 보낼 수 있습니다.

    +1

    'PipedOutputStream'은 소비자가 뒤처 지 자마자 생산자를 차단하고 다른 모든 소비자는 굶어 죽을 것입니다. –

    0

    Apache ActiveMQ과 같은 JMS (Java Messaging Service) 구현을 시도해 볼 수 있습니다.

    귀하의 경우에는 주제 인 (Topics vs. Queues 참조)을 만들어야합니다. 주제는 제작자가 만들고 N 소비자에게 게시되며 동시에 실행될 수 있으며 각 소비자는 정확히 동일한 데이터를 수신합니다.

    InputStream을 사용하고자하므로 send messages are streams에 대한 장이 있습니다.

    일반적으로 프로듀서와 소비자는 네트워크의 다른 컴퓨터에서 실행되는 별도의 프로세스라고 가정합니다. 하나의 JVM에서 완전히 실행되도록 구성 할 수 있다고 생각합니다. 이것은 JMS의 구현에 달려 있습니다. 이것들은 꽤 유명합니다 : HornetQ by JBoss, RabbitMQ, 그리고 다른 많은 것들.