2016-08-30 3 views
0

데이터 흐름을 사용하여 BigQueryIO.Write.to()을 사용하여 BigQuery에 데이터를 쓰고 있습니다.BigflowIO로 작성하는 경우 Dataflow : SocketTimeoutException

때때로, 나는 데이터 흐름에서이 경고를 얻을 :

{ 
metadata: { 
    severity: "WARNING"  
    projectId: "[...]"  
    serviceName: "dataflow.googleapis.com"  
    region: "us-east1-d"  
    labels: { 
    compute.googleapis.com/resource_type: "instance"  
    compute.googleapis.com/resource_name: "dataflow-[...]-08240401-e41e-harness-7dkd"  
    dataflow.googleapis.com/region: "us-east1-d"  
    dataflow.googleapis.com/job_name: "[...]"  
    compute.googleapis.com/resource_id: "[...]"  
    dataflow.googleapis.com/step_id: ""  
    dataflow.googleapis.com/job_id: "[...]"  
    } 
    timestamp: "2016-08-30T11:32:00.591Z"  
    projectNumber: "[...]"  
} 
insertId: "[...]" 
log: "dataflow.googleapis.com/worker" 
structPayload: { 
    message: "exception thrown while executing request"  
    work: "[...]"  
    thread: "117"  
    worker: "dataflow-[...]-08240401-e41e-harness-7dkd"  
    exception: "java.net.SocketTimeoutException: Read timed out 
    at java.net.SocketInputStream.socketRead0(Native Method) 
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
    at java.net.SocketInputStream.read(SocketInputStream.java:170) 
    at java.net.SocketInputStream.read(SocketInputStream.java:141) 
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
    at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) 
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) 
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) 
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) 
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345) 
    at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704) 
    at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647) 
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1535) 
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1440) 
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) 
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) 
    at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37) 
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94) 
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981) 
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) 
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) 
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) 
    at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter$1.call(BigQueryTableInserter.java:229) 
    at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter$1.call(BigQueryTableInserter.java:222) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745)"  
    logger: "com.google.api.client.http.HttpTransport"  
    stage: "F5"  
    job: "[...]"  
} 
} 

나는이 한 다음 어떤 "재시도"로그를 볼 수 없습니다.

내 질문은 : 나는 데이터 손실

  • 건가요? 쓰기 작업이 올바르게 수행되었는지는 확실하지 않습니다. 코드를 올바르게 이해하면 전체 쓰기 배치가 불확실합니다.
  • 그렇다면 정확히 한번 BigQuery에 데이터를 쓸 수있는 방법이 있습니까?
  • 그렇다면 경고 대신 심각도가 ERROR가되어서는 안됩니까? 여기

내 사용의 맥락 약간의 :

  • 내가 스트리밍 모드에서 데이터 흐름을 사용하고, 카프카에서 읽기 KafkaIO.java
  • 사용은 "때때로"에서 0이 될 수 3 회 작업에 따라 시간
  • 당, 내가 BigQuery를
  • AV로 3K 메시지 10K에/s의에서 쓰고 작업에 따라 유형 N1-표준 4
  • 의 2 (36) 근로자를 사용하고 있습니다 erage 메시지 크기가
  • 데이터 흐름 노동자들이 우리-east1-D 영역에로 3kB
  • 입니다 BigQuery에 데이터 세트의 위치는 미국이다

답변

1

당신은 BigQuery에서 스트리밍 서비스에서 오는 일시적인 문제와 관련된 이러한 오류를 볼 수 있습니다. 내 경험에 의하면 당신은 직업의 삶에 이들이 산란 해있는 것을 볼 수 있습니다. 이러한 로그가 대규모로 표시되는 경우 이는 일반적으로 BigQuery 스트리밍 서비스에 오류가 발생했음을 의미합니다.

Cloud Dataflow는 요청에 대한 행을 다시 시도합니다 (여기 코드 BigQuery... line 290 참조). 경고 이후 어느 시점에서 로그 항목이나 테이블의 기록을 볼 수 없다면 다른 오류가 있습니다.

스트리밍 모드에서 서비스는 무한대로 다시 시도합니다. 이 문제로 인해 작업이 실패하지 않는다는 것을 의미합니다. 우리는 영원히 노력하기 때문에 오류인지 경고인지 여부를 묻습니다. 내부적으로이 문제에 대해 토론 할 것이므로 Apache Beam user group에 메모를 게시하여 토론을 진행할 수도 있습니다 .-)

Cloud Logging에서 경고 메시지에 대한 메트릭을 생성하고 이에 대한 조치를 취할 수 있습니다. 우리는 Stackdriver 통합을 더욱 강화하기 위해 노력하고 있으며 이는 좋은 사용 사례입니다.

데이터를 잃지 않고 BigQuery에서의 데이터 도착이 지연됩니다. 몇 가지 간단한 고정 창을 만들고 이벤트 처리 시간을 사용하여 1 분짜리 창을 계산합니다. 그런 다음 신선도를 나타내는 지표로 시간을두고 조사합니다. 고정 윈도우가 워터 마크 뒤쪽에 있으면 인서트에 이상이 있습니다.이 예외 상속 IOException이의 경우 주석

에 따라 추가 해명

  • 편집이 경로는 ApiErrorExtractor()이 기인 경우 속도 제한 문제를 테스트 부른다.

    이 경우 SocketTimeout은 속도 제한으로 인한 것이 아니므로 발신자에게 예외가 발생합니다. 발신자는 finishBundle의 BigQuery.IO 행 2308입니다. IOException을 캐치하고 RuntimeException을 throw하는 flushRows()를 호출합니다.

    김이 모드에서는이 방식으로 실패한 모든 번들이 무한 재 시도됩니다. 참고 : 배치 모드에서는 주자가 4 번 시도한 다음 실패합니다.

    이 경우 (속도 제한 없음) 행 로그를 재 시도하지 않습니다.

    데이터를 손실하지 않고 번들을 재 시도 할 때 데이터가 지연됩니다.

    최악의 시나리오는 모든 작업자가이 문제를 겪고있어 파이프 라인을 진행할 수 없다는 것입니다. BigQuery 스트리밍 서비스가 다운되거나 모든 연결이 끊어지는 경우를 예로들 수 있습니다. 이제 BiqQuery 인제 스트 서비스가 안정화되고 번들이 통과되면 속도 제한 케이스가 시작될 수 있지만 코드를 다시 사용하면 이러한 오류를 줄일 수 있습니다.

    최악의 경우는 들어오는 파이프 라인 데이터 전송률이 BigQuery 스트리밍 수신 서비스에서 관리하는 최대 쓰기 속도 (전송률 제한 속도)에 근접하도록 지속적으로 상승한다는 것입니다. 따라서 재시도 (일시적 또는 기타)로 인해 백 로그가 발생하면 파이프 라인이 따라 잡을 수 없습니다.

    데이터 흐름을 스트리밍하는 드레인 기능이있어 들어오는 데이터의 처리를 중지 한 다음 파이프 라인을 진행하여 모든 처리되지 않은 창을 효율적으로 처리합니다. 그러나 Drain은 finishBundle()이 성공해야합니다. 따라서이 경우 (SocketTimeout) 드레인이 걸릴 것입니다. 파이프 라인 대 파이프 라인을 종료 한 경우 미완료 번들에 대한 데이터 손실이 발생합니다.

    원하는 경우 BigQuery.IO 논리를 재정의하고 다른 곳에서 오류가있는 데이터를 파이프 할 수 있습니다. 이렇게 할 수는 있지만 BigQuery 스트리밍 서비스를 사용하여 터미널 중단이 발생하지 않도록하십시오. 그렇다면 속도 제한 속도 근처에서 지속적으로 운영되고 복구 불가능한 백 로그 처리에 민감한 경우 속도 제한 문제를 피하기 위해 다른 감축 또는 샤딩 메커니즘을 구현하는 것이 좋습니다.

    백 로그 복구와 관련된 다른 제안 중 하나는 스트리밍 소스로 이벤트 흐름을 중단시킬 수 있다는 것입니다. 예를 들어, Pub/Sub에서 주제에 쓰는 것을 중지하십시오. 구독으로 다른 주제에 대한 글쓰기를 시작할 수 있습니다. 기존의 Dataflow 파이프 라인은 기존 주제에 대해 소진됩니다. 새로운 구독에서 새로운 백 로그를 처리하는 방법을 여전히 다루어야하지만 적어도 기존 파이프 라인 내의 모든 데이터를 잃지 않도록 보장해야합니다.

    이벤트 시간 처리를 사용하지 않는 경우이 접근법은 매우 유효 할 수 있습니다. 그러나, 이벤트 시간 처리를 사용하는 경우 Windows가 ONTIME으로 표시되는 중복 출력을 갖습니다.

    유스 케이스와 관련하여 내 생각에는 많은 가정이 있지만 데이터 손실에 대해 생각할 때 다른 아키텍처 개념을 제기 할 때 귀하의 질문을 공유하고 싶습니다.

    희망이 도움이됩니다.

+0

답장을 보내 주셔서 감사합니다. 그러나 나는 Dataflow가이 배치에 대해 재 시도 할 것이라고 확신하지 못합니다. 예외가 발생하기 때문에 BigQuery에 의해 반환 된 오류 (있는 경우)는 'futures' [(# L221)] (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/)에 추가되지 않습니다. ee25e238e65fc71b5db7ba0dace4b45d19dbf07a/sdk/src/main/java/co.kr/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java # L221) 목록에 있습니다. 그러므로'allErrors' (# L283)는 비어 있고 재 시도가 없습니다. – A21z

+0

나는 던져진 예외가 호출자에 의해 어떻게 처리되는지 나중에 살펴보고 오늘 나중에 다시 돌아올 것입니다. –

+0

A21z - 귀하의 의견에 회신하여 추가 정보를 추가했습니다. 이것이 도움이되지 않으면 알려주세요. –

관련 문제