2016-12-16 2 views
1

현재 1 단계를 제외한 각 단계가 실행중인 소비자 및 생산자 async 인 파이프 라인 데이터 흐름에 대해 작업하고 있습니다. 나는 물건을 참조하는 파이프 라인을 통해 "흐르는"물체를 가지고있다. 3 단계에서는 특수 조건 (스테이지 루프)을 충족하는 모든 객체를 루프하고 버퍼링하려고합니다.TPL 데이터 흐름 - 조건 루프

다른 개체가 현재 버퍼링 (스테이지 루프)되어있는 동안 새로운 개체가 들어 오면 (참조 3 단계) 참조 항목과 일치하는지 확인하고 스테이지 루프의 BufferBlock에 게시합니다.

질문 : 스테이지 3에서 스테이지 루프의 모든 객체에 대한 참조 항목을 어떻게 확인할 수 있습니까?

파이프 라인은 좀 다음과 같습니다

Incoming objects -> 
    BufferBlock1 -> Parsing (Stage2) -> 
    BufferBlock2 -> Processing (Stage3) -> 
    BufferBlock3 -> Stage Loop -> 
    Back to BufferBlock 2 

답변

0

당신은 정말 필요하지 않는 당신의 체인에 많은 BufferBlock의. TPL Dataflow에는 BufferBloсkActionBlock 로직을 캡슐화하고 처리 된 메시지에 대한 출력 블록이있는 TransformBlock이 들어 있습니다. 검사를 통과하지 못한 및 처리해야 메시지 큐가

루프에 관해서는, 당신은 서로 with static extension method 사이에 블록을 연결할 수 있습니다, 그래서 이것은 제레 stage4

stage2.LinkTo(stage3, CheckForExistingProcessing); 
stage2.LinkTo(stage4); 

같은 모습이 될 수 루프. ActionBlock을 추가로 설정하거나 TransformBlock을 사용하여 메시지를 적절한 단계로 다시 보낼 수도 있습니다. 일부 메시지는 아마도 약간의 이유로 처리 할 수 ​​없으므로 재시도 확인을 도입 할 수도 있다고 생각합니다.

// asynchronously wait for a sending with resending attempts 
await stage1.SendAsync(m); 
// asynchronously wait for a sending with resending attempts with possible cancellation 
await stage2.SendAsync(m, token); 

Post 방법은 동기 및 입니다 : 당신은 당신이 async 로직, 당신은 아마 SendAsync 메시지보다는 Post 그들 (당신은 또한 CancellationToken에 과부하를 사용할 수 있습니다)한다을 가지고 말했듯이 또한

, 메시지가 대상에 의해 받아 들여지지 않는 경우 메시지를 전달합니다. 대상을 지금 받아 들일 수 없더라도 메시지를 전달하려고 시도하는 SendAsync 메서드를 비교합니다.

+0

나는 타이밍 문제로 들어가고, object1이 들어오는 곳은 CheckForExistingProcessing 내에서 거절되고 stage4로 전달된다고 생각합니다. object2가 들어오고 Check를 전달하는 동안 두 객체의 참조 항목이 모두 릴리스되었으므로 그것에 자물쇠. 그래서 나는 심지어 검사를하기 전에 동일한 아이템을 참조하는 stage4에 객체가 있는지 확인해야합니다. 필요한 것은 대기열에 대한 검사에 기반하여 객체를 연기하고 대기열에있는 항목이 이미 같은 항목 인 경우 대기열을 유지하지만 대기열을 다시 사용할 수있을 때까지 객체를 유지하는 Queue입니다. – Peter

+0

이 단계에서 BlockingCollection을 사용해 볼 수 있습니다. – VMAtm

+0

루프를 제거하고 파이프 라인에서 잠글 수 있었고 이미 테스트 클래스를 만들었습니다. 하나의 질문을 tho, 내가 들어오는 순서대로 파일을 가져올 필요가 있다면 나는 "한 번에 하나의 파일"의 기지에 파이프 라인을 만들었습니다. SendAsync를 호출하면 어떻게 처리되는지를 어떻게 확인할 수 있습니까? 나는 파이프 라인 전에 BufferBlock을 추가 할 수 있는데, 여기서 소비자는 주문을 지키지 만 더 지능적인 방법이 있는가? – Peter

관련 문제