2016-07-09 2 views
1

우리는 약 1 년 동안 Camus를 성공적으로 실행하여 Kafka (av 0.82)에서 avro 페이로드를 가져 와서 몇 가지 카프카 항목을 사용하여 HDFS에 .avro 파일로 저장합니다. 최근 우리 회사의 새로운 팀이 사전 제작 환경에서 약 60 개의 새로운 주제를 등록하고 이러한 주제에 데이터를 보내기 시작했습니다. 팀은 데이터를 kafka 주제로 라우팅 할 때 실수를 저 지르므로 Camus가 이러한 주제에 대한 페이로드를 deserialize 할 때 오류가 발생합니다. Camus 작업이 '실패한 기타 오류'임계 값을 초과하여 실패했습니다. Camus의 결과는 놀라운 것이 었습니다. 나는 다른 개발자들과 함께 우리가 관찰 한 행동이 기대되는지 또는 구현과 관련하여 어떤 문제가 있는지 여부를 확인하고자했습니다.Camus의 예상 된 커밋/롤백 동작은 무엇입니까?

Camus 작업이 '실패한 기타'임계 값을 초과하여 실패한 경우에 나타났습니다. 1. 모든 매퍼 작업이 성공하여 TaskAttempt가 커밋 될 수 있음 - 이는 기록 된 모든 데이터 Camus가 최종 HDFS 위치로 복사했습니다. 2. CamusJob은 % 오류율 (매퍼 커밋 다음에 있음)을 계산할 때 예외를 throw하므로 작업이 실패합니다. 3. 작업이 실패 했으므로 카프카 오프셋이 적용되지 않았습니다

우리는 Camus 작업이 5 분마다 실행되도록 설정되어 있기 때문에 문제가 발생했습니다. 따라서 5 분마다 데이터가 HDFS에 위탁되고 작업이 실패하고 Kafka 오프셋이 업데이트되지 않은 것을 확인했습니다. 이는 디스크가 가득 찼다는 것을 알기 전까지 복제 된 데이터를 기록한다는 것을 의미합니다.

결과를 확인하는 통합 테스트를 작성했습니다.이 테스트는 10 개의 훌륭한 레코드를 주제에 제출하고 10 개의 레코드는 예기치 않은 스키마를 사용하여 동일한 주제에 대해 Camus 작업을 허용 목록에 포함시켜 실행합니다. 10 개의 레코드가 HDFS에 쓰여지고 카프카 오프셋은 진행되지 않습니다. 아래는 해당 테스트의 로그 조각뿐만 아니라 작업을 실행하는 동안 사용한 속성입니다.

도움이 될 것입니다. 이것이 Camus의 예상 동작인지 여부 또는 구현에 문제가 있는지 여부와이 문제 (데이터 복제)를 방지하는 가장 좋은 방법은 무엇인지 잘 모르겠습니다.

감사합니다 ~ 시험을위한 매트

CamusJob 속성 : 테스트에서

etl.destination.path=/user/camus/kafka/data 
etl.execution.base.path=/user/camus/kafka/workspace 
etl.execution.history.path=/user/camus/kafka/history 
dfs.default.classpath.dir=/user/camus/kafka/libs 

etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider 
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder 

camus.message.timestamp.format=yyyy-MM-dd HH:mm:ss Z 
mapreduce.output.fileoutputformat.compress=false 

mapred.map.tasks=15 
kafka.max.pull.hrs=1 
kafka.max.historical.days=3 

kafka.whitelist.topics=advertising.edmunds.admax 
log4j.configuration=true 

kafka.client.name=camus 
kafka.brokers=<kafka brokers> 
max.decoder.exceptions.to.print=5 
post.tracking.counts.to.kafka=true 
monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka 
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry 
etl.schema.registry.url=<schema repo url> 
etl.run.tracking.post=false 
kafka.monitor.time.granularity=10 

etl.daily=daily 
etl.ignore.schema.errors=false 

etl.output.codec=deflate 
etl.deflate.level=6 
etl.default.timezone=America/Los_Angeles 
mapred.output.compress=false 
mapred.map.max.attempts=2 

로그 스 니펫 (snippet), 매퍼 성공 인해 '기타'임계 값을 능가하는 후속 작업 실패 후 행동을 저지 보여주는 :

LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map 

[Task] - Task:attempt_local866350146_0001_m_000000_0 is done. And is in the process of committing 

[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map 

[Task] - Task attempt_local866350146_0001_m_000000_0 is allowed to commit now 

[EtlMultiOutputFormat] - work path: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0 

[EtlMultiOutputFormat] - Destination base path: /user/camus/kafka/data 

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro 

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.3.2.2.1467979200000.avro 

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro 

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.7.8.8.1467979200000.avro 

[Task] - Task 'attempt_local866350146_0001_m_000000_0' done. 
[LocalJobRunner] - Finishing task: attempt_local866350146_0001_m_000000_0 
[LocalJobRunner] - map task executor complete. 
[Job] - map 100% reduce 0% 
[Job] - Job job_local866350146_0001 completed successfully 
[Job] - Counters: 23 
File System Counters 
FILE: Number of bytes read=117251 
FILE: Number of bytes written=350942 
FILE: Number of read operations=0 
FILE: Number of large read operations=0 
FILE: Number of write operations=0 
Map-Reduce Framework 
Map input records=10 
Map output records=15 
Input split bytes=793 
Spilled Records=0 
Failed Shuffles=0 
Merged Map outputs=0 
GC time elapsed (ms)=13 
Total committed heap usage (bytes)=251658240 
com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG 
DECODE_SUCCESSFUL=10 
SKIPPED_OTHER=10 
File Input Format Counters 
Bytes Read=0 
File Output Format Counters 
Bytes Written=5907 
total 
data-read=840 
decode-time(ms)=123 
event-count=20 
mapper-time(ms)=58 
request-time(ms)=12114 
skip-old=0 
[CamusJob] - Group: File System Counters 
[CamusJob] - FILE: Number of bytes read: 117251 
[CamusJob] - FILE: Number of bytes written: 350942 
[CamusJob] - FILE: Number of read operations: 0 
[CamusJob] - FILE: Number of large read operations: 0 
[CamusJob] - FILE: Number of write operations: 0 
[CamusJob] - Group: Map-Reduce Framework 
[CamusJob] - Map input records: 10 
[CamusJob] - Map output records: 15 
[CamusJob] - Input split bytes: 793 
[CamusJob] - Spilled Records: 0 
[CamusJob] - Failed Shuffles: 0 
[CamusJob] - Merged Map outputs: 0 
[CamusJob] - GC time elapsed (ms): 13 
[CamusJob] - Total committed heap usage (bytes): 251658240 
[CamusJob] - Group: com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG 
[CamusJob] - DECODE_SUCCESSFUL: 10 
[CamusJob] - SKIPPED_OTHER: 10 
[CamusJob] - job failed: 50.0% messages skipped due to other, maximum allowed is 0.1% 

답변

0

나는 꽤 비슷한 문제에 직면 해있다 : 나의 Kafka/Camus 파이프 라인은 약 1 년 동안 잘 작동했지만 최근에 나는 복제를 고수했다. 원격 브로커의 처리를 매우 불안정한 연결 및 잦은 작업 실패로 통합하면서 문제를 해결합니다.

오늘 Gobblin documentation을 조사 할 때 Camus sweeper은 아마도 우리가 찾고있는 도구임을 깨달았습니다. 그것을 파이프 라인에 통합하십시오.

나는 또한 가까운 장래에 Gobblin (Camus 후임자)로 마이그레이션하는 것이 좋을 것이라고 생각합니다.

+0

응답 해 주셔서 감사합니다. 지금까지는 Camus에서 오류 검사를 사용하지 않도록 설정했습니다 (어떤 이유로 든 실패하지 않기를 바랍니다). 가능한 한 빨리 Camus에서 벗어나려고합니다. – user2994581

관련 문제