1

저는 C#에서 분산 된 깊이 우선 검색을 구현하려고 시도해 왔습니다. 나는 특정 시점까지 성공했지만 동기화 오류가 발생했습니다. 나는 오류를 바로 잡을 수 없다. 내가 뭘 하려는지 각 노드가 작업 병렬 데이터 흐름을 사용하여 다른 하나와 통신하게함으로써 DFS에서 병렬 처리를 달성합니다. 다음은 내 코드입니다 :분산 된 DFS를 구축하기 위해 C#을 스레딩

public class DFS 
{ 
static List<string> traversedList = new List<string>(); 

static List<string> parentList = new List<string>(); 
static Thread[] thread_array; 
static BufferBlock<Object> buffer1 = new BufferBlock<Object>(); 

public static void Main(string[] args) 
{ 

    int N = 100; 
    int M = N * 4; 
    int P = N * 16; 

    Stopwatch stopwatch = new Stopwatch(); 
    stopwatch.Start(); 

    List<string> global_list = new List<string>(); 

    StreamReader file = new StreamReader(args[args.Length - 2]); 


    string text = file.ReadToEnd(); 

    string[] lines = text.Split('\n'); 



    string[][] array1 = new string[lines.Length][]; 

    for (int i = 0; i < lines.Length; i++) 
    { 
     lines[i] = lines[i].Trim(); 
     string[] words = lines[i].Split(' '); 

     array1[i] = new string[words.Length]; 

     for (int j = 0; j < words.Length; j++) 
     { 
      array1[i][j] = words[j]; 
     } 
    } 

    StreamWriter sr = new StreamWriter("E:\\Newtext1.txt"); 

    for (int i = 0; i < array1.Length; i++) 
    { 
     for (int j = 0; j < array1[i].Length; j++) 
     { 
      if (j != 0) 
      { 
       sr.Write(array1[i][0] + ":" + array1[i][j]); 
       Console.WriteLine(array1[i][0] + ":" + array1[i][j]); 
       sr.Write(sr.NewLine); 
      } 
     } 

    } 
    int start_no = Convert.ToInt32(args[args.Length - 1]); 
    thread_array = new Thread[lines.Length]; 
    string first_message = "root"; 
    buffer1.Post(first_message); 
    buffer1.Post(array1); 
    buffer1.Post(start_no); 
    buffer1.Post(1); 

    for (int t = 1; t < lines.Length; t++) 
    { 
     Console.WriteLine("thread" + t); 
     thread_array[t] = new Thread(new ThreadStart(thread_run)); 
     thread_array[t].Name = t.ToString(); 
     lock (thread_array[t]) 
     { 
      Console.WriteLine("working"); 
      thread_array[t].Start(); 
      thread_array[t].Join(); 
     } 

    } 
    stopwatch.Stop(); 

    Console.WriteLine(stopwatch.Elapsed); 
    Console.ReadLine(); 
} 

private static void dfs(string[][] array, int point) 
{ 
    for (int z = 1; z < array[point].Length; z++) 
    { 
     if ((!traversedList.Contains(array[point][z]))) 
     { 
      traversedList.Add(array[point][z]); 
      parentList.Add(point.ToString()); 
      dfs(array, int.Parse(array[point][z])); 
     } 

    } 
    return; 


} 
public static void thread_run() 
{ 
    try 
    { 
     string parent; 
     string[][] array1; 
     int point; 
     int id; 
     parent = (string)buffer1.Receive(); 
     array1 = (string[][])buffer1.Receive(); 
     point = (int)buffer1.Receive(); 
     id = (int)buffer1.Receive(); 
     object value; 
     Console.WriteLine("times"); 

     if (Thread.CurrentThread.Name.Equals(point.ToString())) 
     { 
      if (!traversedList.Contains(point.ToString())) 
      { 
       Console.WriteLine("Node:" + point + " Parent:" + parent + " Id:" + id); 
       traversedList.Add(point.ToString()); 
       parent = point.ToString(); 
       for (int x = 1; x < array1[point].Length; x++) 
       { 
        Console.WriteLine("times"); 
        if (buffer1.TryReceive(out value)) 
        { 
         array1 = (string[][])value; 
        } 
        if (buffer1.TryReceive(out value)) 
        { 
         id = (int)buffer1.Receive(); 
        } 
        id++; 
        buffer1.Post(parent); 
        buffer1.Post(array1); 
        buffer1.Post(x); 
        buffer1.Post(id); 
        Console.WriteLine("times"); 
        Monitor.PulseAll(Thread.CurrentThread); 
       } 

       //return; 
      } 
      else 
      { 
       buffer1.Post(parent); 
       buffer1.Post(array1); 
       buffer1.Post(point); 
       buffer1.Post(id); 
       Console.WriteLine("working 1"); 
       Monitor.PulseAll(Thread.CurrentThread); 
      } 
     } 
     else 
     { 
      Console.WriteLine("working 2"); 
      Monitor.Wait(Thread.CurrentThread); 
     } 
     //Console.WriteLine(parent); 
    } 
    catch (Exception ex) 
    { 
     Console.WriteLine(ex.Message); 
    } 

} 

} 

enter image description here

+1

이 질문과 다른 질문은 3 일 전 @ http://stackoverflow.com/questions/10852317/depth-first-search-in-a-distributed-way? –

+0

@JamesManning : 마지막 질문에서 순차 구현을했는데 분산 구현 (스레드 사용)을위한 방법을 생각해 보았습니다. 여기에이 오류가 붙어 있습니다.자바에서는 우리가 기본적으로 synchronized 키워드를 사용하기 때문에 더 쉽지만 C#에서는 아무 것도 찾을 수 없다. –

+0

이 오류는 어떤 코드 행에서 발생합니까? – Faraday

답변

1

문제는 스레드가 대기 호출하기 위해 모니터를 소유 할 필요가 있다는 것입니다. 따라서 Monitor.PulseAll과 Monitor.Wait를 잠글 필요가 있습니다. 이렇게하면 더 이상 오류가 발생하지 않습니다.

내가 당신에게 설명 할 필요가있는 경우, 다른 질문을 열고 그 질문을 모두 설명해 드리겠습니다! :)

+0

어떻게하면됩니까? 샘플 코드를 줄 수 있습니까? –

0

편집 : 내 게시물을 무시 -

이 컴파일되지 않습니다 ... 대신 @PanagiotisKanavos의 게시물을 읽을 수 있지만 사용 잠금에 대한 올바른 방향을 설정합니다 :

using System; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.IO; 
using System.Threading; 

public class DFS 
{ 
    static List<string> traversedList = new List<string>(); 

    static List<string> parentList = new List<string>(); 
    static Thread[] thread_array; 
    //static BufferBlock<Object> buffer1 = new BufferBlock<Object>(); 

    public static void Main(string[] args) 
    { 

     int N = 100; 
     int M = N * 4; 
     int P = N * 16; 

     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 

     List<string> global_list = new List<string>(); 

     StreamReader file = new StreamReader(args[args.Length - 2]); 


     string text = file.ReadToEnd(); 

     string[] lines = text.Split('\n'); 



     string[][] array1 = new string[lines.Length][]; 

     for (int i = 0; i < lines.Length; i++) 
     { 
      lines[i] = lines[i].Trim(); 
      string[] words = lines[i].Split(' '); 

      array1[i] = new string[words.Length]; 

      for (int j = 0; j < words.Length; j++) 
      { 
       array1[i][j] = words[j]; 
      } 
     } 

     StreamWriter sr = new StreamWriter("E:\\Newtext1.txt"); 

     for (int i = 0; i < array1.Length; i++) 
     { 
      for (int j = 0; j < array1[i].Length; j++) 
      { 
       if (j != 0) 
       { 
        sr.Write(array1[i][0] + ":" + array1[i][j]); 
        Console.WriteLine(array1[i][0] + ":" + array1[i][j]); 
        sr.Write(sr.NewLine); 
       } 
      } 

     } 
     int start_no = Convert.ToInt32(args[args.Length - 1]); 
     thread_array = new Thread[lines.Length]; 
     string first_message = "root"; 
     //buffer1.Post(first_message); 
     //buffer1.Post(array1); 
     //buffer1.Post(start_no); 
     //buffer1.Post(1); 

     for (int t = 1; t < lines.Length; t++) 
     { 
      Console.WriteLine("thread" + t); 
      thread_array[t] = new Thread(new ThreadStart(thread_run)); 
      thread_array[t].Name = t.ToString(); 
      lock (thread_array[t]) 
      { 
       Console.WriteLine("working"); 
       thread_array[t].Start(); 
       thread_array[t].Join(); 
      } 

     } 
     stopwatch.Stop(); 

     Console.WriteLine(stopwatch.Elapsed); 
     Console.ReadLine(); 
    } 

    private static void dfs(string[][] array, int point) 
    { 
     for (int z = 1; z < array[point].Length; z++) 
     { 
      if ((!traversedList.Contains(array[point][z]))) 
      { 
       traversedList.Add(array[point][z]); 
       parentList.Add(point.ToString()); 
       dfs(array, int.Parse(array[point][z])); 
      } 

     } 
     return; 


    } 

    bool busy; 
    private readonly object syncLock = new object(); 

    public static void thread_run() 
    { 
     try 
     { 
      string parent; 
      string[][] array1; 
      int point; 
      int id; 
      //parent = (string)buffer1.Receive(); 
      //array1 = (string[][])buffer1.Receive(); 
      //point = (int)buffer1.Receive(); 
      //id = (int)buffer1.Receive(); 
      object value; 
      Console.WriteLine("times"); 

      if (Thread.CurrentThread.Name.Equals("Point.ToString()")) 
      { 
       if (!traversedList.Contains("Point.ToString()")) 
       { 
        for (int x = 1; x < 99999; x++) 
        { 
         Console.WriteLine("times"); 
         //if (buffer1.TryReceive(out value)) 
         //{ 
         // array1 = (string[][])value; 
         //} 
         //if (buffer1.TryReceive(out value)) 
         //{ 
         // id = (int)buffer1.Receive(); 
         //} 
         //id++; 
         //buffer1.Post(parent); 
         //buffer1.Post(array1); 
         //buffer1.Post(x); 
         //buffer1.Post(id); 
         Console.WriteLine("times"); 

         lock (syncLock) 
         { 
          while (busy) 
          { 
           busy = false; 
           Monitor.PulseAll(Thread.CurrentThread); 
          } 
          busy = true; // we've got it! 
         } 


        } 

        //return; 
       } 
       else 
       { 
        //buffer1.Post(parent); 
        //buffer1.Post(array1); 
        //buffer1.Post(point); 
        //buffer1.Post(id); 
        lock (syncLock) 
        { 
         while (busy) 
         { 
          busy = false; 
          Monitor.PulseAll(Thread.CurrentThread); 
         } 
         busy = true; // we've got it! 
        } 
       } 
      } 
      else 
      { 
       Console.WriteLine("working 2"); 
       lock (syncLock) 
       { 
        while (busy) 
        { 
         Monitor.Wait(Thread.CurrentThread); 
        } 
        busy = true; // we've got it! 
       } 

      } 
      //Console.WriteLine(parent); 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine(ex.Message); 
     } 

    } 

} 
+0

코드에서 내가 뭘하는지는 동일한 함수를 호출하기 위해 10 개의 다른 스레드를 만들고 있다는 것입니다. 위의 아이디어는 효과가없는 것 같습니다. –

+0

모니터를 사용하려면 제대로 잠 가야합니다 ... 누군가 다른 사람이이 문제를 해결할 수 있습니다. – Faraday

+0

모니터와 별도로 다른 작업이 있습니까? –

3

코드에 여러 가지 문제가 있습니다.

여러 스레드에서 traversedList를 잠그고 "만지는"방법이 잘못되었습니다. 가장 명백한 문제입니다.

더 중요한 것은 코드가 실제로 데이터 흐름을 사용하지 않고 ConcurrentQueue 또는 다른 동시 수집과 유사한 방식으로 BufferBlock을 사용한다는 것입니다. 데이터 흐름의 요점은 처리를 단순화하기 위해 스레드 대신 ActionBlocks을 사용하는 것입니다. 기본적으로 작업 블록은 처리를 위해 하나의 스레드 만 사용하지만 DataflowBlockOptions 클래스를 통해 원하는만큼의 스레드를 지정할 수 있습니다.

ActionBlocks에는 자체 입력 및 출력 버퍼가 있으므로 버퍼링을 위해 추가로 BufferBlock을 추가 할 필요가 없습니다.

블록과 관련된 여러 값을 전달하면 오류가 발생하여 코드가 혼란 스럽기 때문에 또 다른 문제가 있습니다. 모든 값을 보유하는 데이터 구조를 작성하는 데에는 비용이 들지 않습니다. 당신은이 같은이 메시지를 처리하는 ActionBlock를 만들 수 있습니다

public class PointMessage 
    { 
     public string Message { get; set; } 
     public string[][] Lines{get;set;} 
     public int Point { get; set; } 
     public int ID { get; set; } 
    } 

:

static ActionBlock<PointMessage> _block; 
... 
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExecutionDataflowBlockOptions.Unbounded }; 
_block=new ActionBlock<PointMessage>(msg=>ProcessMessage(msg),options); 

을 그리고이 같은 각 메시지 처리 :

당신이 처리하는 메시지를 길게이 클래스를 사용하는 가정

private static void ProcessMessage(PointMessage arg) 
    { 
     if (...) 
     { 
      ... 
      arg.ID++; 
      _block.Post(arg); 
     } 
     else 
     { 
      ... 
      _block.Post(arg); 
     } 
    } 

함수가 값을 반환하면 TransformBlock을 사용할 수 있습니다. ActionBlock 대신에.

코드가하는 일을 이해할 수 없어 DataFlow를 사용하여 다시 작성하지 않겠습니다. 약간 정리하면 더 쉽게 도움이 될 것입니다.

+0

그래프에있는 각 노드에서 알림을 보내고 확인을 받기만하면됩니다. 그것이 배우 모델로 작업 병렬 라이브러리를 사용하는 이유입니다. –

+0

요점은 배우를 사용하고 있지 않다는 것입니다. BufferBlock은 메시지, ActionBlock 또는 TransformBlock을 처리하지 않습니다. 게다가, 당신은 배우들에 의해 무엇을 의미합니까? 각 노드를 액터로 변환하려고합니까? 그것은 엄청난 자원 낭비입니다. Parallel.For 루프의 각 노드에 대해 Contains 함수를 평가하면 런타임에서 허용되는 수의 thred를 선택할 수 있습니다. –

+0

그러나 bufferblock은 내가 그것을 사용하고있는 곳에서 기능을 보내고받습니다 ... 예! 기본적인 아이디어는 각 노드가 독립적 인 시스템처럼 행동하고 서로 통신한다는 것입니다. –

관련 문제