현재 작업중인 프로젝트는 연속적으로 실행되는 한 스레드의 개체 집합을 주 스레드로 효율적으로 전달하는 방법을 구현해야합니다. 현재 설정은 다음과 같습니다.스레드간에 개체 집합 전달
새 스레드를 만드는 주 스레드가 있습니다. 이 새 스레드는 지속적으로 작동하며 타이머를 기반으로 메서드를 호출합니다. 이 메서드는 온라인 소스에서 메시지 그룹을 가져 와서 TreeSet에서 구성합니다.
이 트리 집합을 주 스레드로 다시 전달해야 메시지에 포함 된 메시지를 반복 타이머와 독립적으로 처리 할 수 있습니다.
더 나은 참고로 내 코드처럼 보이는 다음
// Called by the main thread on start.
void StartProcesses()
{
if(this.IsWindowing)
{
return;
}
this._windowTimer = Executors.newSingleThreadScheduledExecutor();
Runnable task = new Runnable() {
public void run() {
WindowCallback();
}
};
this.CancellationToken = false;
_windowTimer.scheduleAtFixedRate(task,
0, this.SQSWindow, TimeUnit.MILLISECONDS);
this.IsWindowing = true;
}
/////////////////////////////////////////////////////////////////////////////////
private void WindowCallback()
{
ArrayList<Message> messages = new ArrayList<Message>();
//TODO create Monitor
if((!CancellationToken))
{
try
{
//TODO fix epochWindowTime
long epochWindowTime = 0;
int numberOfMessages = 0;
Map<String, String> attributes;
// Setup the SQS client
AmazonSQS client = new AmazonSQSClient(new
ClasspathPropertiesFileCredentialsProvider());
client.setEndpoint(this.AWSSQSServiceUrl);
// get the NumberOfMessages to optimize how to
// Receive all of the messages from the queue
GetQueueAttributesRequest attributesRequest =
new GetQueueAttributesRequest();
attributesRequest.setQueueUrl(this.QueueUrl);
attributesRequest.withAttributeNames(
"ApproximateNumberOfMessages");
attributes = client.getQueueAttributes(attributesRequest).
getAttributes();
numberOfMessages = Integer.valueOf(attributes.get(
"ApproximateNumberOfMessages")).intValue();
// determine if we need to Receive messages from the Queue
if (numberOfMessages > 0)
{
if (numberOfMessages < 10)
{
// just do it inline it's less expensive than
//spinning threads
ReceiveTask(numberOfMessages);
}
else
{
//TODO Create a multithreading version for this
ReceiveTask(numberOfMessages);
}
}
if (!CancellationToken)
{
//TODO testing
_setLock.lock();
Iterator<Message> _setIter = _set.iterator();
//TODO
while(_setIter.hasNext())
{
Message temp = _setIter.next();
Long value = Long.valueOf(temp.getAttributes().
get("Timestamp"));
if(value.longValue() < epochWindowTime)
{
messages.add(temp);
_set.remove(temp);
}
}
_setLock.unlock();
// TODO deduplicate the messages
// TODO reorder the messages
// TODO raise new Event with the results
}
if ((!CancellationToken) && (messages.size() > 0))
{
if (messages.size() < 10)
{
Pair<Integer, Integer> range =
new Pair<Integer, Integer>(Integer.valueOf(0),
Integer.valueOf(messages.size()));
DeleteTask(messages, range);
}
else
{
//TODO Create a way to divide this work among
//several threads
Pair<Integer, Integer> range =
new Pair<Integer, Integer>(Integer.valueOf(0),
Integer.valueOf(messages.size()));
DeleteTask(messages, range);
}
}
}catch (AmazonServiceException ase){
ase.printStackTrace();
}catch (AmazonClientException ace) {
ace.printStackTrace();
}
}
}
메시지가있는 경우이 타이머 스레드에서 이벤트를 만드는 것입니다 처리 할 수있는 주석, 나의 현재 선호하는 방법 중 일부 볼 수 있습니다
로 . 그런 다음 주 스레드가이 이벤트를 수신하여 적절하게 처리합니다.
현재 Java에서 이벤트를 처리하는 방법이나 생성/수신하는 방법에 익숙하지 않습니다. 또한 이벤트를 생성하고 그 안에 포함 된 정보를 스레드간에 전달할 수 있는지 여부를 알지 못합니다.
내 방법이 가능한지에 대한 조언이나 통찰력을 누군가 주시겠습니까? 그렇다면 현재의 검색 시도가 결실을 맺지 못하여 구현 방법에 대한 정보를 어디에서 찾을 수 있습니까?
그렇지 않다면 가능하면 소켓을 관리하지 않아도된다는 것을 명심하면서 어떻게해야하는지에 대한 제안을받을 수 있습니까?
편집 1 :
메인 스레드는 또한 수신하는 메시지를 기반으로 명령을 실행하거나 필요한 정보를 얻기 위해 명령을 실행에 대한 책임이 있습니다. 이러한 이유로 메인 쓰레드는 메시지 수신을 기다릴 수없고, 이벤트 기반 방식으로 처리해야한다.
(큐는 생산자와 소비자 모두 동일한 개체를해야한다) 불행히도 BlockingQueue는 작동하지 않을 것입니다. 메시지를 소비하는 메인 쓰레드는 반드시 gener 다른 것들 중에서도 봉사에 대한 명령을 받았다. – JME
그냥 2 개의 대기열을 요구하지 않습니까? Erlang과 Scala의 Actor 모델과 같은 언어로 전달되는 메시지는 스레드 당 메일 함/메시지 대기열을가집니다. – selig
두 개의 대기열을 만드는 것만 큼 간단하지는 않지만 주 스레드는 "메시지 수집"스레드에 정보를 보내지 않고 시작된 후에 만 정보를 수신합니다. 그런 다음 메인 스레드는 외부 서비스에 대한 HTTP 연결을 생성하여 수집 할 "메시지 수집"스레드에 대한 메시지를 생성하거나 생성하지 않을 수 있습니다. – JME