우리는 약 1 년 동안 Camus를 성공적으로 실행하여 Kafka (av 0.82)에서 avro 페이로드를 가져 와서 몇 가지 카프카 항목을 사용하여 HDFS에 .avro 파일로 저장합니다. 최근 우리 회사의 새로운 팀이 사전 제작 환경에서 약 60 개의 새로운 주제를 등록하고 이러한 주제에 데이터를 보내기 시작했습니다. 팀은 데이터를 kafka 주제로 라우팅 할 때 실수를 저 지르므로 Camus가 이러한 주제에 대한 페이로드를 deserialize 할 때 오류가 발생합니다. Camus 작업이 '실패한 기타 오류'임계 값을 초과하여 실패했습니다. Camus의 결과는 놀라운 것이 었습니다. 나는 다른 개발자들과 함께 우리가 관찰 한 행동이 기대되는지 또는 구현과 관련하여 어떤 문제가 있는지 여부를 확인하고자했습니다.Camus의 예상 된 커밋/롤백 동작은 무엇입니까?
Camus 작업이 '실패한 기타'임계 값을 초과하여 실패한 경우에 나타났습니다. 1. 모든 매퍼 작업이 성공하여 TaskAttempt가 커밋 될 수 있음 - 이는 기록 된 모든 데이터 Camus가 최종 HDFS 위치로 복사했습니다. 2. CamusJob은 % 오류율 (매퍼 커밋 다음에 있음)을 계산할 때 예외를 throw하므로 작업이 실패합니다. 3. 작업이 실패 했으므로 카프카 오프셋이 적용되지 않았습니다
우리는 Camus 작업이 5 분마다 실행되도록 설정되어 있기 때문에 문제가 발생했습니다. 따라서 5 분마다 데이터가 HDFS에 위탁되고 작업이 실패하고 Kafka 오프셋이 업데이트되지 않은 것을 확인했습니다. 이는 디스크가 가득 찼다는 것을 알기 전까지 복제 된 데이터를 기록한다는 것을 의미합니다.
결과를 확인하는 통합 테스트를 작성했습니다.이 테스트는 10 개의 훌륭한 레코드를 주제에 제출하고 10 개의 레코드는 예기치 않은 스키마를 사용하여 동일한 주제에 대해 Camus 작업을 허용 목록에 포함시켜 실행합니다. 10 개의 레코드가 HDFS에 쓰여지고 카프카 오프셋은 진행되지 않습니다. 아래는 해당 테스트의 로그 조각뿐만 아니라 작업을 실행하는 동안 사용한 속성입니다.
도움이 될 것입니다. 이것이 Camus의 예상 동작인지 여부 또는 구현에 문제가 있는지 여부와이 문제 (데이터 복제)를 방지하는 가장 좋은 방법은 무엇인지 잘 모르겠습니다.
감사합니다 ~ 시험을위한 매트
CamusJob 속성 : 테스트에서
etl.destination.path=/user/camus/kafka/data
etl.execution.base.path=/user/camus/kafka/workspace
etl.execution.history.path=/user/camus/kafka/history
dfs.default.classpath.dir=/user/camus/kafka/libs
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder
camus.message.timestamp.format=yyyy-MM-dd HH:mm:ss Z
mapreduce.output.fileoutputformat.compress=false
mapred.map.tasks=15
kafka.max.pull.hrs=1
kafka.max.historical.days=3
kafka.whitelist.topics=advertising.edmunds.admax
log4j.configuration=true
kafka.client.name=camus
kafka.brokers=<kafka brokers>
max.decoder.exceptions.to.print=5
post.tracking.counts.to.kafka=true
monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry
etl.schema.registry.url=<schema repo url>
etl.run.tracking.post=false
kafka.monitor.time.granularity=10
etl.daily=daily
etl.ignore.schema.errors=false
etl.output.codec=deflate
etl.deflate.level=6
etl.default.timezone=America/Los_Angeles
mapred.output.compress=false
mapred.map.max.attempts=2
로그 스 니펫 (snippet), 매퍼 성공 인해 '기타'임계 값을 능가하는 후속 작업 실패 후 행동을 저지 보여주는 :
LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map
[Task] - Task:attempt_local866350146_0001_m_000000_0 is done. And is in the process of committing
[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map
[Task] - Task attempt_local866350146_0001_m_000000_0 is allowed to commit now
[EtlMultiOutputFormat] - work path: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0
[EtlMultiOutputFormat] - Destination base path: /user/camus/kafka/data
[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro
[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.3.2.2.1467979200000.avro
[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro
[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.7.8.8.1467979200000.avro
[Task] - Task 'attempt_local866350146_0001_m_000000_0' done.
[LocalJobRunner] - Finishing task: attempt_local866350146_0001_m_000000_0
[LocalJobRunner] - map task executor complete.
[Job] - map 100% reduce 0%
[Job] - Job job_local866350146_0001 completed successfully
[Job] - Counters: 23
File System Counters
FILE: Number of bytes read=117251
FILE: Number of bytes written=350942
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=10
Map output records=15
Input split bytes=793
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=13
Total committed heap usage (bytes)=251658240
com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
DECODE_SUCCESSFUL=10
SKIPPED_OTHER=10
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=5907
total
data-read=840
decode-time(ms)=123
event-count=20
mapper-time(ms)=58
request-time(ms)=12114
skip-old=0
[CamusJob] - Group: File System Counters
[CamusJob] - FILE: Number of bytes read: 117251
[CamusJob] - FILE: Number of bytes written: 350942
[CamusJob] - FILE: Number of read operations: 0
[CamusJob] - FILE: Number of large read operations: 0
[CamusJob] - FILE: Number of write operations: 0
[CamusJob] - Group: Map-Reduce Framework
[CamusJob] - Map input records: 10
[CamusJob] - Map output records: 15
[CamusJob] - Input split bytes: 793
[CamusJob] - Spilled Records: 0
[CamusJob] - Failed Shuffles: 0
[CamusJob] - Merged Map outputs: 0
[CamusJob] - GC time elapsed (ms): 13
[CamusJob] - Total committed heap usage (bytes): 251658240
[CamusJob] - Group: com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
[CamusJob] - DECODE_SUCCESSFUL: 10
[CamusJob] - SKIPPED_OTHER: 10
[CamusJob] - job failed: 50.0% messages skipped due to other, maximum allowed is 0.1%
응답 해 주셔서 감사합니다. 지금까지는 Camus에서 오류 검사를 사용하지 않도록 설정했습니다 (어떤 이유로 든 실패하지 않기를 바랍니다). 가능한 한 빨리 Camus에서 벗어나려고합니다. – user2994581