2017-04-06 1 views
2

우리는 TPL Dataflow 프레임 워크를 사용하려고하는 데이터 처리 파이프 라인을 가지고 있습니다.데이터 흐름을위한 블록 디자인에서 차단됨

파이프 라인의

기본 요점 : 파일 시스템에 CSV 파일을 통해

  1. 반복 처리가 (10,000)
  2. 우리는 하나의 콘텐츠를 통해
  3. 반복 처리를 무시할 경우 우리가 내용을 가져 오지 않은 확인 CSV 파일 (20,000-120,000 행)을 만들고 필요에 맞는 데이터 구조를 만듭니다.
  4. 이러한 새 dataStructure 항목 중 100 개를 일괄 처리하여 데이터베이스에 넣습니다.
  5. CSV 파일을 가져올 것으로 표시합니다.

이제 우리는 매우 느린 & 고통스러운 방법으로 위의 모든 작업을 수행 기존의 파이썬 파일이 - 코드가 엉망이다.

제 생각은 TPL Dataflow입니다.

  1. BufferBlock<string>

  2. TransformBlock<string, SensorDataDto>는 CSV 파일을 통해 읽고 작성이 파일을 가져올 지 여부를 감지하는
  3. TransformBlock<string, SensorDataDto> 조건으로 모든 파일을 게시 할 수 SensorDataDto 구조
  4. BatchBlock<SensorDataDto>TransformBlock을 사용 최대 100 개의 요청을 배치하도록 위임합니다.

    4.5. ActionBlock<SensorDataDto>을 사용하여 100 개의 레코드를 데이터베이스에 저장합니다.

  5. ActionBlock 가져온 CSV로 표시하십시오.

나는 처음 몇 작업을 만든 그들은 최선을 다하고 (BufferBlock를 ->TransformBlock + Predicate & & 프로세스 경우가 없습니다)하지만 난 게시 할 수 있도록 흐름을 계속하는 방법을 확실 해요 100을 TransformBlock에있는 BatchBlock으로 변경하고 다음 작업을 연결하십시오.

이 모양이 맞습니까? 기본 요령이며 TPL 데이터가 흐릿한 방식으로 BufferBlock 비트를 어떻게 처리합니까? 내가 BatchBlock.Post<..>(...)를 호출하고있어 normaliseData 방법 내부

bufferBlock.LinkTo(readCsvFile, ShouldImportFile) 
bufferBlock.LinkTo(DataflowBlock.NullTarget<string>()) 
readCsvFile.LinkTo(normaliseData) 
normaliseData.LinkTo(updateCsvImport) 
updateCsvImport.LinkTo(completionBlock) 

batchBlock.LinkTo(insertSensorDataBlock) 

bufferBlock.Completion.ContinueWith(t => readCsvFile.Complete()); 
readCsvFile.Completion.ContinueWith(t => normaliseData.Complete()); 
normaliseData.Completion.ContinueWith(t => updateCsvImport.Complete()); 
updateCsvImport.Completion.ContinueWith(t => completionBlock.Complete()); 

batchBlock.Completion.ContinueWith(t => insertSensorDataBlock.Complete()); 

, 즉 좋은 패턴 또는 다르게 구성되어야 하는가? 내 문제는 모든 레코드를 밀어 넣은 후에 파일을 가져올 때만 표시 할 수 있다는 것입니다. 우리가 80 경우 밀어 무엇 100의 배치가있는 경우

Task.WhenAll(bufferBlock.Completion, batchBlock.Completion).Wait(); 

, 마지막 80을 배출하는 방법은 무엇입니까?

주 파이프 라인에 BatchBlock을 연결해야하는지 확실하지 않았지만 둘 다 완료 될 때까지 기다립니다.모든

답변

1

먼저, 당신은 당신이 링크 중에 PropagateCompletion 속성을 사용할 수 있습니다, 그 문제에 Completion를 사용할 필요가 없습니다 : 문제에 다시 이제

// with predicate 
bufferBlock.LinkTo(readCsvFile, new DataflowLinkOptions { PropagateCompletion = true }, ShouldImportFile); 
// without predicate 
readCsvFile.LinkTo(normaliseData, new DataflowLinkOptions { PropagateCompletion = true }); 

을, 배치와 함께. 어쩌면 여기에 JoinBlock<T1, T2> 또는 BatchedJoinBlock<T1, T2>을 파이프 라인에 연결하고 조인 결과를 수집하여 작업을 완료 할 수 있습니다. 어쩌면 자신의 ITargetBlock<TInput>을 구현하여 자신의 방식으로 메시지를 사용할 수 있습니다.

official docs에 따르면, 블록은 욕심이 많으며 사용 가능한 즉시 결합 된 블록에서 데이터를 수집하므로 하나의 대상이 준비되어 있고 다른 블록이 준비되지 않았거나 조인 블록이 일괄 처리 블록의 80% 일 때 결합 블록이 멈추거나, 그래서 당신은 그것을 당신의 마음에 넣어야합니다. 자신의 구현의 경우 ITargetBlock<TInput>.OfferMessage 메서드를 사용하여 소스에서 정보를 얻을 수 있습니다.

는 욕심 및 비 탐욕 모드에서 모두 실행할 수 있습니다. 기본 욕심 꾸러기 형태에서는, 어떤 수의 근원에서 구획에 제안 된 모든 메시지는 받아 들여지고 뭉치로 바뀌기 위하여 완충된다.

비 탐욕 모드에서는 배치를 만들기에 충분한 소스가 메시지를 블록에 제공 할 때까지 모든 메시지가 소스에서 연기됩니다. 따라서 BatchBlock<T>N 개의 소스, N 개의 요소, 1 개의 소스 및 수많은 중간 옵션 중에서 각각 1 개 요소를 수신하는 데 사용할 수 있습니다.

관련 문제