2017-01-27 2 views
0

TPL 데이터 흐름과 함께 .net reactive 확장명을 사용하고 있습니다. 다음은 내 파이프 라인입니다.RPL 사용/사용하지 않음 tpl 데이터 흐름

데이터 소스를 일부 외부 소스에서 스트림으로 가져 오는 중 dataflow TransformBlocks를 사용하여 데이터 포인트를 변환합니다. 이 후에 Rx 버퍼를 사용하여 1 초 동안 버퍼링 된 점을 버퍼링하고 마지막으로 데이터 흐름 Actionblock을 사용하여 REST 엔드 포인트에 버퍼링 된 데이터 포인트를 게시합니다.

임시 오류에 대한 REST 게시 작업을 다시 시도하고 싶습니다. 다시 시도해야하는 부분 :

  1. 버퍼 이후?
  2. 내부 조치 블록?
  3. 다시 시도 할 때 연속 스트리밍이 어떻게됩니까? 나는 어떤 자료도 놓치고 싶지 않다.

답변

1

제공된 상위 수준 개요를 살펴 보니 마지막으로 ActionBlock에서 다시 시도하는 것이 가장 쉽다고 생각합니다. ActionBlock은 게시하기 전에 성공을 확인하고 종료합니다. 데이터의 양에 따라이 방법을 사용하면 삭제 된 항목에 너무 많은 관심을 두지 않고도 필요에 따라 ActionBlock 개까지 스풀링 할 수 있습니다. 실제로는 없어야합니다. 단일 또는 다수의 ActionBlock 인스턴스가 게시에 실패하면 항목이 계속 스트리밍되고 유선 연결 기회를 기다리는 버퍼 및 입력 대기열을 설정 한 방법과 관련하여 배포됩니다.

편집 그냥 몇 가지 의사 코드는, 그러나 이것은 데이터 포인트의는 IEnumerable의 배치를 가지고, 그들에게 다섯 번 게시하려고합니다. 제한된 용량으로 인해 처리기의 각 인스턴스가 1000 배치를 대기열에 넣고 병렬 처리가 작업 블록 사이에 배치를 배포합니다. 선택적으로 모든 들어오는 배치를 보관하기 위해 ActionBlock 앞에 무제한 Buffer을 추가 할 수 있습니다. 제작자 인 스트림이 소비자 REST 서비스를 크게 벗어나지 않도록주의해야합니다.

public void ConfigureFinalActionBlock() { 
     var dataPointBuffer = new BufferBlock<IEnumerable<Datapoint>>(new DataflowBlockOptions() { 
      BoundedCapacity = DataflowBlockOptions.Unbounded 
     }); 

     var options = new ExecutionDataflowBlockOptions() { 
      BoundedCapacity = 1000, 
      MaxDegreeOfParallelism = Environment.ProcessorCount 
     }; 
     var restBlock = new ActionBlock<IEnumerable<Datapoint>>(async (data) => { 
      var success = false; 
      var attempts = 0; 
      while (!success && attempts < 5) { 
       attempts++; 
       success = await MyApiPostAsync(data); 
      } 
     }, options); 

     dataPointBuffer.LinkTo(restBlock, new DataflowLinkOptions() { 
      PropagateCompletion = true 
     }); 
+0

이 경우를 생각해 보겠습니다. 나는 100000 개의 데이터 포인트 버퍼를 받았고 actionblock에서 다시 시도했다. 다음 초에만 다시 actionblock post 요청을 사용하여 또 다른 100,000 개의 datapoint를 받게 될 것입니다. 따라서 몇 가지 포인트를 놓칠 수 있습니다. 입력 대기열을 사용하여 여러 액션 블록을 설정하는 방법. – Abhay

+0

어떻게 포인트를 놓칠 수 있습니까? 버퍼에는 큐가 무제한이며, 새 메시지는 처리 된 메시지를 기다리거나 최대 병렬 처리 수준을 지정하면 다른 스레드에서 동시에 실행됩니다. – VMAtm

관련 문제