2016-10-11 2 views
1

나는 인터뷰에서이 질문을 받았다.복구 OFFAET을 사용하여 KAFKA에서 분실 한 메시지

패킷이 실패로 인해 손실되었다고 상상해보십시오 (소비자 실패 또는 브로커가 확실하지 않음). 오프셋을 사용하여이 시간 동안 손실 된 메시지를 복구하기 위해 수행해야 할 작업 (코드 구현)은 무엇입니까?

죄송합니다. 비슷한 질문을했기 때문에 제 질문에 명확하지 않을 수 있습니다.

감사

답변

0

카프카는 당신이, 당신은 데이터가 손실되지 않습니다 브로커 실패시 중복 얻을 수있는 의미의 at-least once 메시지 전달의 의미를 다음과 같습니다.

그러나이 속성을 0으로 설정하면 Kafka Producer을 생성 할 때 브로커에 오류가 발생하더라도 다시 보내지 않으므로 한 번만 보내려고합니다. 따라서 브로커가 실패하면 데이터가 손실 될 수 있습니다. 이 Zookeeper에서의 오프셋 (offset)를 갱신합니다,

props.put("retries", 0); 

그래서 당신은 1이 속성 값을 변경할 수 있으므로 메시지가 성공적에만 전달되는 경우, 자동으로 사육사 다시, 또한 offsets가 관리 보내려고합니다.

또한 SPark Streaming에서 언급 했으므로 SPark Streaming은 두 가지 접근 방식을 지원합니다.

수신자 기반 : 오프셋은 사육사에서 처리됩니다.

2. 직접 접근 방식 : 오프셋은 메시지가 저장된 위치에서 로컬로 처리되며,이 방법은 정확히 한 번 메시지 전달을 지원합니다. 당신은 복구 할 메시지의 오프셋을 알고있는 경우

더 많은 정보를 위해이 link

1

을 확인하고는 KafkaConsumer 방법 seek 사용할 수 있습니다 당신에게 속한 어떤 파티션 :

consumer.seek(new TopicPartition("topic-name", partNumber), offsetNumber); 

상세한 here

다음 번에 poll()으로 전화하면 목록에서 처음 들었던 메시지가 나타납니다.

첫 번째 위치에서 직접 오프셋을 관리하는 경우에만 작동합니다. 카프카가 오프셋을 관리하게하는 경우 오프셋 번호와 아마도 두 번 소비 된 메시지로 끝날 가능성이 가장 높을 것입니다 (poll()에 대한 호출이 마지막 커밋 된 오프셋에서 소모되기 시작합니다).

0

기사와 문서 많이 읽은 후에 내가 최고의 대답일지도 모르다 느낌 :

없이 수신기와 새로운 불꽃 카프카 소비자 사용 (스파크 스트리밍 카프카 - 0-10_2.11). 이 접근법에서는 startOffset을 읽을 위치에서 지정할 수 있습니다.

발에 offsetRanges = 배열 ​​(// 주제, 분할, 포괄적 시작 오프셋, 독점 결말,,)
OffsetRange ("테스트", 1, 0 OffsetRange ("테스트", 0, 0, 100 오프셋 100))

발 RDD = KafkaUtils.createDirectStream [문자열, 문자열 (sparkContext, kafkaParams, offsetRanges, PreferConsistent) 메시지 일단

읽고 처리 된, 읽기 오프셋을 얻고 보관 Kafka 또는 Zk 또는 External tra에서 nsactional 데이터베이스.

offsetRanges = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges

우리는 작업을 시작하는 때마다 데이터베이스에서 오프셋을 가져오고 그것이 exacly 메커니즘 번 가지고 createDirectStream을 전달합니다.

더 읽기 http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

관련 문제