제공된 상위 수준 개요를 살펴 보니 마지막으로 ActionBlock
에서 다시 시도하는 것이 가장 쉽다고 생각합니다. ActionBlock
은 게시하기 전에 성공을 확인하고 종료합니다. 데이터의 양에 따라이 방법을 사용하면 삭제 된 항목에 너무 많은 관심을 두지 않고도 필요에 따라 ActionBlock
개까지 스풀링 할 수 있습니다. 실제로는 없어야합니다. 단일 또는 다수의 ActionBlock
인스턴스가 게시에 실패하면 항목이 계속 스트리밍되고 유선 연결 기회를 기다리는 버퍼 및 입력 대기열을 설정 한 방법과 관련하여 배포됩니다.
편집 그냥 몇 가지 의사 코드는, 그러나 이것은 데이터 포인트의는 IEnumerable의 배치를 가지고, 그들에게 다섯 번 게시하려고합니다. 제한된 용량으로 인해 처리기의 각 인스턴스가 1000 배치를 대기열에 넣고 병렬 처리가 작업 블록 사이에 배치를 배포합니다. 선택적으로 모든 들어오는 배치를 보관하기 위해 ActionBlock
앞에 무제한 Buffer
을 추가 할 수 있습니다. 제작자 인 스트림이 소비자 REST 서비스를 크게 벗어나지 않도록주의해야합니다.
public void ConfigureFinalActionBlock() {
var dataPointBuffer = new BufferBlock<IEnumerable<Datapoint>>(new DataflowBlockOptions() {
BoundedCapacity = DataflowBlockOptions.Unbounded
});
var options = new ExecutionDataflowBlockOptions() {
BoundedCapacity = 1000,
MaxDegreeOfParallelism = Environment.ProcessorCount
};
var restBlock = new ActionBlock<IEnumerable<Datapoint>>(async (data) => {
var success = false;
var attempts = 0;
while (!success && attempts < 5) {
attempts++;
success = await MyApiPostAsync(data);
}
}, options);
dataPointBuffer.LinkTo(restBlock, new DataflowLinkOptions() {
PropagateCompletion = true
});
이 경우를 생각해 보겠습니다. 나는 100000 개의 데이터 포인트 버퍼를 받았고 actionblock에서 다시 시도했다. 다음 초에만 다시 actionblock post 요청을 사용하여 또 다른 100,000 개의 datapoint를 받게 될 것입니다. 따라서 몇 가지 포인트를 놓칠 수 있습니다. 입력 대기열을 사용하여 여러 액션 블록을 설정하는 방법. – Abhay
어떻게 포인트를 놓칠 수 있습니까? 버퍼에는 큐가 무제한이며, 새 메시지는 처리 된 메시지를 기다리거나 최대 병렬 처리 수준을 지정하면 다른 스레드에서 동시에 실행됩니다. – VMAtm