2017-10-07 3 views
0

D에서 std.container.dlist을 사용하여 대기열 유형을 만드는 것만으로도 충분합니다.대기열을 사용하여 스레드 간 통신

나는 여러 개의 스레드를 갖고 싶지만 메시지 전달 (https://tour.dlang.org/tour/en/multithreading/message-passing)이 아닌 대기열과 통신해야합니다. 내가 이해하는 바와 같이 메시지는 코드의 특정 지점에서 항상 데이터를 수신하도록 설계되었습니다. 수신 스레드는 예상 데이터가 수신 될 때까지 차단됩니다.

(수정 : 나는 receiveTimeout에 대한 정보를 받았지만 시간 초과가없고 단지이 경우에는 더 적절하다. (아마도 0의 타임 아웃 일까?). 또한 여러 경우에 메시지 API가 무엇을 할 것인지 확신 할 수 없다. 메시지는 어떤 수신되기 전에. 내가 그와 함께 플레이해야합니다 전송됩니다.)

void main() { 
    spawn(&worker, thisTid); 

    // This line will block until the expected message is received. 
    receive (
     (string message) { 
      writeln("Received the message: ", text); 
     }, 
    ) 
} 

는 내가 필요로하고하는 것은 몇 가지가있는 경우 단순히 데이터를 수신하는 것입니다. 이런 식으로 뭔가 :

void main() { 
    Queue!string queue// custom `Queue` type based on DList 

    spawn(&worker, queue); 

    while (true) { 
     // Go through any messages (while consuming `queue`) 
     for (string message; queue) { 
      writeln("Received a message: ", text); 
     } 
     // Do other stuff 
    } 
} 

내가 shared 변수 (https://tour.dlang.org/tour/en/multithreading/synchronization-sharing)를 사용하여 시도했지만 DMD는 불평한다 "변경 가능한 스레드 로컬 데이터에 대한 별칭은 허용되지 않습니다." 또는 일부 다른 오류가 발생합니다.

D에서 어떻게 수행됩니까? 아니면 이러한 종류의 의사 소통을 위해 메시지를 사용하는 방법이 있습니까?

답변

0

.

간단히 말하자면 std.concurrency 대신 core.thread을 사용하십시오. std.concurrency은 메시지를 관리하며 사용자가 직접 관리 할 수는 없습니다. core.thread은 내부적으로 std.concurrency이 사용하는 것입니다.

답변이 더 길수록, 여기에 제가 어떻게 완벽하게 구현했는지가 나와 있습니다.

Singly Linked List을 기반으로하지만 마지막 요소의 포인터를 유지하는 Queue 형식을 만들었습니다. Queue도 표준 구성 요소 인 inputRange 및 outputRange (또는 최소한 생각합니다.)를 월터 뷰즈 비전 (https://www.youtube.com/watch?v=cQkBOCo8UrE)을 사용합니다. Queue은 또한 한 스레드가 쓰기를 허용하고 다른 스레드가 내부적으로 매우 작은 뮤텍스를 사용하여 읽을 수 있도록 제작되었으므로 속도가 빨라야합니다.

Queue!string inputQueue = new Queue!string; 
ThreadInput threadInput = new ThreadInput(inputQueue); 
threadInput.start; 

while (true) { 
    foreach (string value; inputQueue) { 
     writeln(value); 
    } 
} 

ThreadInput는 이와 같이 정의된다 :

class ThreadInput : Thread { 
    private Queue!string queue; 

    this(Queue!string queue) { 
     super(&run); 
     this.queue = queue; 
    } 

    private void run() { 
     while (true) { 
      queue.put(readln); 
     } 
    } 
} 

코드 https://pastebin.com/w5jwRVrL
Queue
큐는 I 번째 스레드가 입력을 읽게 여기 https://pastebin.com/ddyPpLrp

간단한 구현 공유 다시 https://pastebin.com/ddyPpLrp

1

이 특정 질문에 대답하지 않고 TI는 내가 생각하는 메시지 전달 API의 오해까지 명확하지 ...

단지 대신 일반 receive

http://dpldocs.info/experimental-docs/std.concurrency.receiveTimeout.html

+0

아니요,이게 제가 필요로하는 것은 아니지만'receiveTimeout'을 놓쳤습니다. 잘 모르겠습니다. 내가 다른 일을 할 수 없다면 아마도 내가 필요로하는 것을하기 위해'receiveTimeout'을 사용할 수있을 것입니다. –

+0

receiveTimout에 -1과 같은 음수 값을 지정하면 원하는 것을 수행 할 수 있습니다. 참조 : https://stackoverflow.com/a/31624806/2026276 – Bauss

0
receiveTimeout 전화

나는이 사용 : 내가 필요로하는 답을 얻었을

shared class Queue(T) { 

    private T[] queue; 

    synchronized void opOpAssign(string op)(T object) if(op == "~") { 
     queue ~= object; 
    } 

    synchronized size_t length(){ 
     return queue.length; 
    } 

    synchronized T pop(){ 
     assert(queue.length, "Please check queue length, is 0"); 
     auto first = queue[0]; 
     queue = queue[1..$]; 
     return first; 
    } 

    synchronized shared(T[]) consume(){ 
     auto copy = queue; 
     queue = []; 
     return copy; 
    } 

}