카프카를 통해 데이터를 스트리밍하고 있습니다. 그리고 나는이 메시지를 소비하는 스트리밍을 시작합니다.카프카 OffsetOutOfRangeException
kafka.common.OffsetOutOfRangeException
지금 나는이 오류가 무엇을 의미하는지 알고 : 기본적으로 선 아래로, 스파크 스트리밍이 오류가 발생합니다. 그래서 보존 정책을 5 일로 변경했습니다. 그러나 나는 여전히 같은 문제에 직면했다. 그런 다음 kafka에서 --from-beginning를 사용하여 주제에 대한 모든 메시지를 나열했습니다. 분명히 카프카 스트리밍 파트의 시작 부분에서 나온 많은 메시지가 없었으며 스파크 스트리밍이 카프카 스트리밍 파트 뒤에 약간 있기 때문에 스파크 스트리밍은 카프카가 삭제 한 메시지를 소비하려고합니다. 그러나 내가 알아서 할 보존 정책을 변경하는 생각 :
--add-config retention.ms=....
나는 새 메시지를 (우리는 데이터의 톤을 스트리밍하기 때문에) 카프카 공간을 확보하기 위해 주제에서 메시지를 삭제하는 것을 무슨 일이 일어나고 의심 무엇 . kafka가 이전 메시지를 삭제하기 전에 저장할 수있는 데이터의 바이트 수를 지정하는 속성을 구성 할 수 있습니까? 유 통해 주제 구성 등록 retention.bytes
를 사용하여 항목을 만들 때
당신은 오래된 소비자 또는 새로운 소비자를 사용 했을 삭제하기 시작하는 때 카프카에 신호를? 또한'auto.offset.reset'을 확인하십시오 – amethystic
방금 직접 만든 스파크 다이렉트 스트림 – Ahmed
내 대답이 귀하의 질문에 답변을 했습니까? –