2016-08-23 2 views
1

나는 Storm의 초보자입니다. 나는 그것을 대학 프로젝트에 사용하고 있습니다.Storm에서 튜플 처리를 중지하고 다른 코드를 실행하는 방법

Spout이 MySql 데이터베이스에 링크되어 있고 토폴로지가 2 개 있고 볼트가 2 개 있습니다. 스파우트에 연결된 첫 번째 볼트는 튜플에 필요하지 않은 정보를 준비하고 제거합니다. 두 번째는 튜플을 필터링합니다.

로컬 모드에서 작업하고 있습니다.

내 질문 : 토폴로지를 실행 한 후 내 콘솔에 다음과 같은 출력이 표시되는 이유는 무엇입니까?

38211 [Thread-14-movie-SPOUT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30] 
67846 [Thread-10-__acker] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
67846 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
67852 [Thread-10-__acker] INFO backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=0, write_pos=1, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {}]>]] 
67853 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.task - Emitting: cleaning-genre-bolt __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {default=1680}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1621, write_pos=1622, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {default=1680}]> #<DataPoint [__execute-latency = {movie-SPOUT:default=0.15476190476190477}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=1680, write_pos=1680, capacity=1024, population=0}]> #<DataPoint [__execute-count = {movie-SPOUT:default=1680}]>]] 
67854 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
67855 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.task - Emitting: filtering-genre-BOLT __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1681, write_pos=1682, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {cleaning-genre-bolt:default=0.08333333333333333}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {cleaning-genre-bolt:default=1680}]>]] 

는 I 처리의 마지막 튜플 후 다음 행을 정상으로 간주되도록 읽었다. 그렇지 않니?

토폴로지를 제출 한 후에 다른 코드를 어떻게 실행할 수 있습니까? 예를 들어, HashMap에 저장된 두 번째 볼트에서 수행 된 필터링 결과를 인쇄하려고합니다. submitTopology() 메서드가 포함 된 줄 뒤에 코드를 삽입하면 코드는 튜플 완료 전에 실행됩니다.

두 번째와 마지막 질문은 왜 스톰의 모든 예에서 나는 르네

"(1000)에 Thread.sleep"에서 볼?

아마 내 첫 번째 질문에 연결되어 있습니다.

제 질문에 대한 답변을 드리겠습니다. 미리 감사드립니다.

답변

0

처리 된 마지막 튜플 이후의이 줄은 정상적인 것으로 간주됩니다. 그렇지 않니?

이들은 단지 INFO 메시지입니다. 따라서 걱정할 필요가 없습니다.

submitTopology() 메소드가 포함 된 줄 다음에 코드를 삽입하면 코드는 튜플 완료 전에 실행됩니다.

토폴로지를 제출하면 토폴로지가 백그라운드에서 실행됩니다 (예 : 멀티 스레드). 이는 토폴로지가 "영원히"실행될 때 필요합니다 (명시 적으로 중지하거나 로컬 모드를 실행하면서 Java 응용 프로그램이 종료 될 때까지).

Strom이 스트리밍 시스템이고 "처리가 끝나지 않았습니다"(입력 스트림이 무한대로 처리되므로 영원히 처리되므로) "실행 한 토폴로지 완료 후"실행 코드는 Storm 개념과 일치하지 않습니다. 유한 데이터 세트를 처리하려면 Flink 또는 Spark와 같은 배치 처리 프레임 워크를 고려해야합니다.

따라서 Storm에서이 작업을 수행하려면 모든 데이터가 처리 된 시점을 결정할 수 있어야합니다. 따라서 토폴로지 제출 후 모든 데이터가 처리 된 후 명시 적으로 차단 및 대기합니다.

그러나 사용 사례의 경우 마지막 볼트 내에서 결과를 인쇄하지 않는 이유는 무엇입니까?

Thread.sleep() 나는 당신이 어떤 견해를 언급하는지 잘 모르겠습니다. 아무도 왜 생산을 위해 그것을 넣어야할지 모르겠다. 어쩌면 인위적으로 처리 속도를 늦추는 목적을 데모 할 수 있습니다.

+0

철저한 답장을 보내 주셔서 감사합니다. –

+0

이 질문에 대한 답변을 보내 주시면 답변을 수락하고/또는 upvote 해주십시오. –

관련 문제