2017-04-18 2 views
1

카프카를 통해 데이터를 스트리밍하고 있습니다. 그리고 나는이 메시지를 소비하는 스트리밍을 시작합니다.카프카 OffsetOutOfRangeException

kafka.common.OffsetOutOfRangeException 

지금 나는이 오류가 무엇을 의미하는지 알고 : 기본적으로 선 아래로, 스파크 스트리밍이 오류가 발생합니다. 그래서 보존 정책을 5 일로 변경했습니다. 그러나 나는 여전히 같은 문제에 직면했다. 그런 다음 kafka에서 --from-beginning를 사용하여 주제에 대한 모든 메시지를 나열했습니다. 분명히 카프카 스트리밍 파트의 ​​시작 부분에서 나온 많은 메시지가 없었으며 스파크 스트리밍이 카프카 스트리밍 파트 뒤에 약간 있기 때문에 스파크 스트리밍은 카프카가 삭제 한 메시지를 소비하려고합니다. 그러나 내가 알아서 할 보존 정책을 변경하는 생각 :

--add-config retention.ms=.... 

나는 새 메시지를 (우리는 데이터의 톤을 스트리밍하기 때문에) 카프카 공간을 확보하기 위해 주제에서 메시지를 삭제하는 것을 무슨 일이 일어나고 의심 무엇 . kafka가 이전 메시지를 삭제하기 전에 저장할 수있는 데이터의 바이트 수를 지정하는 속성을 구성 할 수 있습니까? 유 통해 주제 구성 등록 retention.bytes를 사용하여 항목을 만들 때

+0

당신은 오래된 소비자 또는 새로운 소비자를 사용 했을 삭제하기 시작하는 때 카프카에 신호를? 또한'auto.offset.reset'을 확인하십시오 – amethystic

+0

방금 ​​직접 만든 스파크 다이렉트 스트림 – Ahmed

+0

내 대답이 귀하의 질문에 답변을 했습니까? –

답변

0

당신은 주제의 최대 크기를 설정할 수 있습니다 콘솔과 같은 :

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 --replication-factor 1 --config retention.bytes=10485760 --config 

또는 u가 최대 크기를 설정하는 글로벌 브로커 구성 등록 log.retention.bytes을 사용할 수 있습니다 모든 주제. 아는 것이 중요한 것이 무엇인지

log.retention.bytes이 주제 크기의 하드 제한을 적용하지 않는다는 것입니다,하지만 그것은 단지 오래된 메시지

관련 문제