2017-09-18 3 views
0

흥미로운 데이터가있는 페이지가 많이있는 웹 사이트를 긁어 내고 싶지만 소스가 매우 크기 때문에 다중 스레드를 사용하여 과부하를 제한하려고합니다. Parallel.ForEach을 사용하여 10 개의 작업으로 구성된 각 청크를 시작하고 활성 스레드 수가 임계 값 아래로 떨어질 때까지 기본 for 루프를 기다립니다. 이를 위해 WebClient이라는 새 스레드를 시작할 때 활성 스레드의 카운터를 사용하고 WebClientDownloadStringCompleted 이벤트가 트리거 될 때 감소합니다.웹 사이트의 여러 페이지를 다듬기위한 병렬 요청

원래 질문은 DownloadString 대신 DownloadStringTaskAsync을 사용하고 Parallel.ForEach에서 시작된 각 스레드가 완료 될 때까지 기다리는 것이 좋습니다. 이 문제는 해결 방법으로 해결되었습니다. 카운터가 (activeThreads)이고 기본 foor 루프에 Thread.Sleep이 있습니다.

DownloadString 데이터가 도착할 때까지 기다리는 동안 스레드를 해제하여 DownloadString 대신 모든 속도를 향상 시키려면 await DownloadStringTaskAsync을 사용하고 있습니까?

원래의 질문으로 돌아가려면 TPL을 사용하여 카운터를 포함시키지 않고보다 우아하게이 방법을 사용할 수 있습니까?

private static volatile int activeThreads = 0; 

public static void RecordData() 
{ 
    var nbThreads = 10; 
    var source = db.ListOfUrls; // Thousands urls 
    var iterations = source.Length/groupSize; 
    for (int i = 0; i < iterations; i++) 
    { 
    var subList = source.Skip(groupSize* i).Take(groupSize); 
    Parallel.ForEach(subList, (item) => RecordUri(item)); 
    //I want to wait here until process further data to avoid overload 
    while (activeThreads > 30) Thread.Sleep(100); 
    } 
} 

private static async Task RecordUri(Uri uri) 
{ 
    using (WebClient wc = new WebClient()) 
    { 
     Interlocked.Increment(ref activeThreads); 
     wc.DownloadStringCompleted += (sender, e) => Interlocked.Decrement(ref iterationsCount); 
     var jsonData = ""; 
     RootObject root; 
     jsonData = await wc.DownloadStringTaskAsync(uri); 
     var root = JsonConvert.DeserializeObject<RootObject>(jsonData); 
     RecordData(root) 
    } 
} 

답변

1

을 기억하십시오. 죽은 사람은 간단합니다.

var source = db.ListOfUrls; // Thousands urls 

var query = 
    from uri in source.ToObservable() 
    from jsonData in Observable.Using(
     () => new WebClient(), 
     wc => Observable.FromAsync(() => wc.DownloadStringTaskAsync(uri))) 
    select new { uri, json = JsonConvert.DeserializeObject<RootObject>(jsonData) }; 

IDisposable subscription = 
    query.Subscribe(x => 
    { 
     /* Do something with x.uri && x.json */ 
    }); 

그게 전부입니다. 멋지게 멀티 쓰레드이며 통제하에 있습니다.

그냥 NuGet "System.Reactive"로 비트를 가져옵니다.

+0

추가 연장없이 무언가를 갖고 싶었지만 이것은 흥미 롭습니다. 왜 그런지는 잘 모르지만 그대로있는 것은 아닙니다. 그것은 나에게 비어있는 json을 준다 – sofsntp

+0

@sofsntp - 테스트했을 때 잘 돌아갔다. subscribe 메소드에서 JSON이 아니라 URI가 있는지 확인 했습니까? 구독을 할 때까지 JSON을 deserialize하지 않도록 코드를 변경하십시오. – Enigmativity

+0

그래 그래도 작동하지만이 일의 요점은 무엇입니까? 무엇이 그것을 우아하게 하는가? – sofsntp

-1
Parallel.ForEach 

는 Enumerable에서 소스의 각 항목에 대한 기능을 실행 ProcessorCount 작업을 만듭니다. 많은 작업이 없으며 모든 항목과 작업이 실행될 때까지 기다릴 것입니다.

Task.WhenAll 

주어진 작업을 기다리는 동안에 만 실행됩니다. 그것들을 손에 들고 적절한 방법으로 실행하고 한 번에 많은 사람들에게하지 말라.

그러나 코드에 결함이 있습니다. 함수 RecordUri은 기다려야 만하는 작업을 반환합니다. ForEach는 현재 작업이 완료된 시점을 함수가 알 수 없으므로 더 많은 작업을 만듭니다. 또한 문제는 태스크에서 태스크를 작성하고 첫 번째 태스크는 아무것도 수행하지 않고 처음 태스크를 기다리는 것입니다.

또한 편집Parallel.ForEach https://msdn.microsoft.com/en-us/library/dd782934(v=vs.110).aspx

의이 오버로드를 살펴 할 수 있습니다

사용하는 스레드를 해제하여 대신에 모든 속도로 향상되어 DownloadString의 DownloadStringTaskAsync을 기다리고 있습니다 DownloadString 데이터가 도착하기를 기다리는 동안?

아니요. 작업이 외부 리소스를 기다리는 것과 마찬가지로 Suspended 상태 (일부 오래된/더티 반복 대기를 사용하지 않는 Windows API)로 전환됩니다. 그래서 큰 차이는 없습니다. 비동기 코드를 컴파일 할 때 컴파일러가 생성하는 오버 헤드가 다릅니다. DownloadStringTaskAsync은 긴 작업이 포함 된 작업을 만듭니다. 기다리는 것을 사용하면, 당신은 그 일에 자신을 붙일 것입니다 (ContinueWith에 의해). 그래서 당신은 다른 것을 기다리기위한 Task를 생성합니다. 이것은 내가 위의 텍스트에서 말하고 있던 오버 헤드입니다.

내 접근 방식은 다음과 같습니다. synchronous method을 Parallel.ForEach 내부에서 사용하십시오. 스레딩은 PLinq에 의해 수행되며 당신은 자유롭게 갈 수 있습니다.당신은 당신이 마이크로 소프트의 반응성 프레임 워크를 사용한다 우아한 솔루션을 원하는 경우에

은 "KISS"

+0

감사합니다. 그러나 관련 리소스를 읽었습니다. 나는 더 명확하게 나의 질문을 편집했다. 내 문제는 DownloadStringTaskAsync를 사용하면 메서드가 비동기가된다는 의미입니다. parell.foreach()에서 .wait()을 사용할까요? 하지만 이것을 피하기 위해 읽었습니다 – sofsntp

+0

@sofsntp가 추가되었습니다. – JPVenson

+0

무엇을이 대답에 대해? https://stackoverflow.com/a/19391324/2132352 – sofsntp

관련 문제