2014-12-11 2 views
0

우리는 키네 시스를 평가하고 있으며 다음과 같은 행동을 발견했습니다. 정확성과 기본 기능을 테스트하기 위해 Kinesis를 사용하여 간단한 테스트를했습니다.Kinesis 보낸 레코드가 소모 된 레코드와 동일하지 않습니다

@Override 
public void processRecords(List<Record> records, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) 
{ 
    logger.debug("Received a list of records for processing with size:" + records.size()); 

    for (Record record : records) 
    { 
     UsageServiceStatistics.instance().getKinesisConsumed().increase(); 
     logger.debug("Kinesis consumed:" + UsageServiceStatistics.instance().getKinesisConsumed()); 
     if (!processRecord(record)) 
     { 
      logger.error("Couldn't process record " + record + ". Skipping the record."); 
     } 
    } 

    checkpointManager.checkpoint(iRecordProcessorCheckpointer); 
} 

내가 생산의 수 사이의 불일치가 발생하고 그런 다음

PutRecordRequest putRecordRequest = new PutRecordRequest(); 
    putRecordRequest.setStreamName(streamName); 
    putRecordRequest.setData(ByteBuffer.wrap(event.getBytes())); 
    putRecordRequest.setPartitionKey(message.getEventList().getEvents().get(0).getLicenseKey()); 

    UsageServiceStatistics.instance().getKinesisSent().increase(); 
    PutRecordResult putRecordResult = kinesisManager.getConnection().putRecord(putRecordRequest); 

내가 다음과 같이 아마존 운동성 클라이언트 라이브러리 (KCL)를 사용하여 다음과 같이

시험은 스트림으로 제품을 생산 소비 된 레코드의 수와 비교합니다.

Kinesis sent:counter=2000 
Kinesis consumed:1999 

Kinesis sent:counter=4000 
Kinesis consumed:counter=3994 

Kinesis sent:counter=6000 
Kinesis consumed:counter=5999 

은 왜 소비 대 생산의 숫자를 정확하게 표시되지 않습니다 : 예를 들어 내가 다음을 참조 행 2000 개 항목 3 회 시리즈를 보낼 때? 두 번째 실행 후 왜 6 개의 항목이 누락 되었는가와 2 번 실행과 3 번 실행 사이에 적어도 2 분간 기다렸지 만 2006 년 3 번 기록을 사용했습니다.

마지막으로이 테스트 전에 일련의 테스트를 수행했습니다. 검사 점의 빈도가 높아지고 불일치가 더 커졌습니다. Amazon KCL이 레코드를 conumer로 보내기 위해 사용하는 규칙은 무엇입니까? 왜 대기열에 항목을 보내고 보관하는 것을 멈출까요? (2에서 3 등으로)? 전송 된 6000의 마지막 항목은 어디에 있습니까?

미리 Thx를 입력하십시오.

+0

키네시스 사건 작성시 예외 사항을 확인 했습니까? 일부 put_record 호출에 대해 제한 될 수 있습니다. – Guy

+0

kinesis 로그 수준을 디버깅 할 때도 예외가 있는지 확인했습니다. –

+0

어떻게 키네 시스 로그 수준을 디버깅에 넣었습니까? 당신이 만든 샘플/샘플을 올리시겠습니까? –

답변

2

근본 원인을 발견했습니다.

내 코드의 버그입니다.

KCL은 특정 스트림의 샤드 (shard) 수와 동일한 수의 레코드 프로세서를 만듭니다.

그러나 멀티 스레드 환경에서 동일한 Checkpointer 엔터티를 사용함으로써 버그가 발생했습니다. 각 레코드 프로세서에 자체 체크 포인터가있는 것으로 고정 시켰을 때 완벽하게 작동했으며 개수가 일정했습니다.

관련 문제