2017-10-09 1 views
0

Apache Beam을 사용하여 스트리밍 BigQuery 테이블 위에 구현 한 작은 델타 작업에 이상한 문제가 있습니다.BigQuery의 델타 작업에서 레코드가 누락됩니다.

데이터를 BigQuery 테이블에 스트리밍하고 매 시간마다 스트리밍 테이블의 새 레코드를 조정 된 테이블에 복사하는 작업을 실행합니다. 델타는 스트리밍 테이블에 도입 된 CreateDatetime 열 위에 구축됩니다. 레코드가 스트리밍 테이블에로드되면 현재 UTC 타임 스탬프를 가져옵니다. 따라서 델타는 당연히 배치가 실행되는 현재 시간까지 지난 시간보다 새로운 CreateDatetime을 갖는 모든 레코드를 가져옵니다.

1. Start: LastDeltaDate = 2017-01-01 00:00:00 
2. 1st Delta Run: 
- NowUTC = 2017-10-01 06:00:00 
- LastDeltaDate = 2017-01-01 00:00:00 
- at the end of the successful run LastDeltaDate = NowUTC 
3. 2nd Delta Run: 
- NowUTC = 2017-10-01 07:00:00 
- LastDeltaDate = 2017-10-01 06:00:00 
- at the end of the successful run LastDeltaDate = NowUTC 
... 

를 지금은 내 스트리밍 테이블에 있지만 내 화해 테이블에 도착하지 않을 기록을 찾아 매일 다음과 같이

CreatedDatetime >= LastDeltaDate AND 
CreatedDatetime < NowUTC 

LastDeltaDate의 논리이다. 내가 타임 스탬프를 확인하면 배치 실행과 멀리 떨어져 있으며 Google Datflow 로그를 확인할 때 그 당시에 쿼리에 대해 반환 된 레코드가없는 것을 볼 수 있지만 지금 같은 쿼리를 실행할 때 기록. 스트리밍 된 레코드가 질의에 늦게 도착할 수있는 방법이 있습니까? 아니면 아파치 빔이 레코드를 처리하고 있지만 오랫동안 쓰지 않았을 가능성이 있습니까? 나는 창 전략을 적용하지 않을거야.

아이디어가 있으십니까?

+0

어떻게 LastDeltaDate를 결정합니까? –

+0

@BenChambers LastDeltaDate 결정 방법에 대한 설명으로 내 질문을 업데이트했습니다. – jimmy

답변

1

스트리밍 삽입을 수행 할 때 문서 data availability에 설명 된대로 일괄 내보내기에 사용할 수있는 행의 속도가 지연됩니다.

따라서 T2 시간에 스트리밍 버퍼에 저장된 행 수가 BigQuery로 스트리밍되었을 수 있습니다. 그런 다음 시간 T1에서 T2까지 배치 작업을 실행하지만 최대 T2- 버퍼까지의 행만 볼 수 있습니다. 결과적으로 각 델타 런에 대해 버퍼에있는 모든 행이 삭제됩니다.

다음이 버퍼 내에있는 프로세스 행을 실행하도록 NowUTC에서 스트리밍 버퍼를 인식하도록 선택해야 할 수 있습니다.

+0

감사. 나는 어딘가에 쿼리를 위해 스트리밍 버퍼의 데이터를 즉시 사용할 수 있지만 BigQueryIO는 쿼리로 간주되지 않는다고 생각한다. 델타 날짜는 BigQuery에서 작동하지 않을 것입니다. 불행히도 BigQuery 테이블에는 기본값이 없으므로 타임 스탬프에 의존 할 수 없기 때문입니다. 내가 볼 수있는 유일한 다른 옵션은 항상 스트리밍 테이블을 조정 된 테이블과 비교하여 새로운 레코드를 복사하는 것입니다. 다른 점이 있었나요? – jimmy

+0

지금까지 스캔하는 대신 테이블에있는 내용 만 스캔 할 수 있습니다. 이 시간은 최대 90 분입니다. 또한 table.get 호출을 실행하면 스트리밍 버퍼에 "streamingBuffer.oldestEntryTime"(가장 오래된 항목의 타임 스탬프 포함) 속성이 제공됩니다. NowUTC 대신에 이것을 사용했다면 현재 버퍼에있는 행을 건너 뛰지 않고 다음에 스캔해야합니다. –

+0

Batch에서 BigQueryIO는 내보내기 작업을 수행 한 다음 GCS에서 파일을 읽습니다. 이로 인해 병렬 처리가 향상됩니다. –