2013-06-17 2 views
0

현재 작업중인 프로젝트는 연속적으로 실행되는 한 스레드의 개체 집합을 주 스레드로 효율적으로 전달하는 방법을 구현해야합니다. 현재 설정은 다음과 같습니다.스레드간에 개체 집합 전달

새 스레드를 만드는 주 스레드가 있습니다. 이 새 스레드는 지속적으로 작동하며 타이머를 기반으로 메서드를 호출합니다. 이 메서드는 온라인 소스에서 메시지 그룹을 가져 와서 TreeSet에서 구성합니다.

이 트리 집합을 주 스레드로 다시 전달해야 메시지에 포함 된 메시지를 반복 타이머와 독립적으로 처리 할 수 ​​있습니다.

더 나은 참고로 내 코드처럼 보이는 다음

// Called by the main thread on start. 
void StartProcesses() 
{ 
    if(this.IsWindowing) 
    { 
     return; 
    } 

    this._windowTimer = Executors.newSingleThreadScheduledExecutor(); 

    Runnable task = new Runnable() { 
     public void run() { 
      WindowCallback(); 
     } 
    }; 

    this.CancellationToken = false; 
    _windowTimer.scheduleAtFixedRate(task, 
      0, this.SQSWindow, TimeUnit.MILLISECONDS); 

    this.IsWindowing = true; 
} 

///////////////////////////////////////////////////////////////////////////////// 

private void WindowCallback() 
{ 
    ArrayList<Message> messages = new ArrayList<Message>(); 

    //TODO create Monitor 
    if((!CancellationToken)) 
    { 
     try 
     { 
      //TODO fix epochWindowTime 
      long epochWindowTime = 0; 
      int numberOfMessages = 0; 
      Map<String, String> attributes; 

      // Setup the SQS client 
      AmazonSQS client = new AmazonSQSClient(new 
        ClasspathPropertiesFileCredentialsProvider()); 

      client.setEndpoint(this.AWSSQSServiceUrl); 

      // get the NumberOfMessages to optimize how to 
      // Receive all of the messages from the queue 

      GetQueueAttributesRequest attributesRequest = 
        new GetQueueAttributesRequest(); 
      attributesRequest.setQueueUrl(this.QueueUrl); 
      attributesRequest.withAttributeNames(
        "ApproximateNumberOfMessages"); 
      attributes = client.getQueueAttributes(attributesRequest). 
        getAttributes(); 

      numberOfMessages = Integer.valueOf(attributes.get(
        "ApproximateNumberOfMessages")).intValue(); 

      // determine if we need to Receive messages from the Queue 
      if (numberOfMessages > 0) 
      { 

       if (numberOfMessages < 10) 
       { 
        // just do it inline it's less expensive than 
        //spinning threads 
        ReceiveTask(numberOfMessages); 
       } 
       else 
       { 
        //TODO Create a multithreading version for this 
        ReceiveTask(numberOfMessages); 
       } 
      } 

      if (!CancellationToken) 
      { 

       //TODO testing 
       _setLock.lock(); 

       Iterator<Message> _setIter = _set.iterator(); 
       //TODO 
       while(_setIter.hasNext()) 
       { 
        Message temp = _setIter.next(); 

        Long value = Long.valueOf(temp.getAttributes(). 
          get("Timestamp")); 
        if(value.longValue() < epochWindowTime) 
        { 
         messages.add(temp); 
         _set.remove(temp); 
        } 
       } 

       _setLock.unlock(); 

       // TODO deduplicate the messages 

       // TODO reorder the messages 

       // TODO raise new Event with the results 
      } 

      if ((!CancellationToken) && (messages.size() > 0)) 
      { 
       if (messages.size() < 10) 
       { 
        Pair<Integer, Integer> range = 
          new Pair<Integer, Integer>(Integer.valueOf(0), 
            Integer.valueOf(messages.size())); 
        DeleteTask(messages, range); 
       } 
       else 
       { 
        //TODO Create a way to divide this work among 
        //several threads 
        Pair<Integer, Integer> range = 
          new Pair<Integer, Integer>(Integer.valueOf(0), 
            Integer.valueOf(messages.size())); 
        DeleteTask(messages, range); 
       } 
      } 
     }catch (AmazonServiceException ase){ 
      ase.printStackTrace(); 
     }catch (AmazonClientException ace) { 
      ace.printStackTrace(); 
     } 
    } 
} 
메시지가있는 경우이 타이머 스레드에서 이벤트를 만드는 것입니다 처리 할 수있는 주석, 나의 현재 선호하는 방법 중 일부 볼 수 있습니다

로 . 그런 다음 주 스레드가이 이벤트를 수신하여 적절하게 처리합니다.

현재 Java에서 이벤트를 처리하는 방법이나 생성/수신하는 방법에 익숙하지 않습니다. 또한 이벤트를 생성하고 그 안에 포함 된 정보를 스레드간에 전달할 수 있는지 여부를 알지 못합니다.

내 방법이 가능한지에 대한 조언이나 통찰력을 누군가 주시겠습니까? 그렇다면 현재의 검색 시도가 결실을 맺지 못하여 구현 방법에 대한 정보를 어디에서 찾을 수 있습니까?

그렇지 않다면 가능하면 소켓을 관리하지 않아도된다는 것을 명심하면서 어떻게해야하는지에 대한 제안을받을 수 있습니까?

편집 1 :

메인 스레드는 또한 수신하는 메시지를 기반으로 명령을 실행하거나 필요한 정보를 얻기 위해 명령을 실행에 대한 책임이 있습니다. 이러한 이유로 메인 쓰레드는 메시지 수신을 기다릴 수없고, 이벤트 기반 방식으로 처리해야한다.

답변

1

생산자 - 소비자 패턴 :

하나의 스레드 (생산자) continuosly 큐에있는 개체 (메시지) 스택. 다른 스레드 (소비자)가 큐에서 개체를 읽고 제거합니다.

문제가 해결되면 "BlockingQueue"를 시도해보십시오. http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html

쉽고 효율적입니다.

큐가 비어 있으면 소비자가 "차단"됩니다. 즉, 프로듀서가 일부 개체를 넣을 때까지 스레드가 대기합니다 (CPU 시간을 사용하지 않음). 그렇지 않으면 cosumer는 연속적으로 객체를 소비합니다. 그리고 대기열이 가득 차면 소비자가 대기열에 방을 만들기 위해 일부 객체를 소비 할 때까지 prducer가 차단됩니다. 반대의 경우도 마찬가지입니다.여기

은 예입니다 :


(프로듀서 스레드)

Message message = createMessage(); 
queue.put(message); 

(소비자 스레드)

Message message = queue.take(); 
handleMessage(message); 
+0

(큐는 생산자와 소비자 모두 동일한 개체를해야한다) 불행히도 BlockingQueue는 작동하지 않을 것입니다. 메시지를 소비하는 메인 쓰레드는 반드시 gener 다른 것들 중에서도 봉사에 대한 명령을 받았다. – JME

+0

그냥 2 개의 대기열을 요구하지 않습니까? Erlang과 Scala의 Actor 모델과 같은 언어로 전달되는 메시지는 스레드 당 메일 함/메시지 대기열을가집니다. – selig

+0

두 개의 대기열을 만드는 것만 큼 간단하지는 않지만 주 스레드는 "메시지 수집"스레드에 정보를 보내지 않고 시작된 후에 만 ​​정보를 수신합니다. 그런 다음 메인 스레드는 외부 서비스에 대한 HTTP 연결을 생성하여 수집 할 "메시지 수집"스레드에 대한 메시지를 생성하거나 생성하지 않을 수 있습니다. – JME