DynamoDB 테이블에 쓰는 응용 프로그램이 있는데 Kinesis에서 내 집계를 수행 한 다음 집계 된 데이터를 다른 DynamoDB 테이블에 기록하려고합니다.Kinesis 스트림에 대한 DynamoDB
스트림은 내 DynamoDB의 테이블 활성화, 다음과 같이 내가 스트림에 Lamdba 트리거를 가지고있다 : 나는 람다 테스트 이벤트에서 세 가지 또는 네 가지 요소가있을 때
'use strict';
var AWS = require('aws-sdk');
var kinesis = new AWS.Kinesis();
exports.handler = (event, context, callback) => {
event.Records.forEach((record) => {
var myValue = record.dynamodb.NewImage.myValue.N;
var partitionKey = record.key.S;
var data = '{"VALUE":"' + myValue + '"}';
var recordParams = {
Data: data,
PartitionKey: partitionKey,
StreamName: 'MyStreamName'
};
console.log('Try Put to Kinesis Stream');
kinesis.putRecord(recordParams, function(err, data) {
if (err) {
console.log('Failed Put');
} else {
console.log('Successful Put');
}
});
});
};
이 내 운동성 스트림에 성공적으로 기록 .
트리거를 활성화하면 키네시스 스트림에 전혀 쓰지 않습니다. 한 번에 약 100 가지 요소가 들어오는 것으로 보입니다. Cloudwatch에서 'Kinesis Stream에 넣기 시도'메시지가 표시되지만 성공/실패 메시지조차도 볼 수 없습니다.
나는이 문제에 대해 완전히 잘못되었거나 더 나은 접근을하고 있습니까?
DynamoDB의의 스트림이 바로 내 첫 번째 수상 :
작은 데이터 객체 인 {{ "VALUE": "12345"} '에서 작동하지만 일부 속성 (객체가> 6)을 추가하자마자 기록에 스트림이 기록되지 않았습니다. 주말 내내 Kinesis 응용 프로그램을 사용할 수 없게되었고, 지금 다시 시도되었습니다. 내 관심사는 이제 다시이 작업을 수행하고 모든 레코드가 스트림으로 전달되지 않는다는 것입니다. – intanethi
그것은 4 시간 동안 부드럽게 움직 였고, 키네시스 시내에는 아무 것도 기록되지 않았습니다. 함수의 타임 아웃을 늘려 도움을 준 것처럼 보였지만 첫 번째 제한 시간 초과 요청 후에도 모든 후속 요청도 시간 초과되었습니다. 왜 대부분의 일괄 처리가 아주 작았는지 확실하지 않습니다. – intanethi
비동기 라이브러리를 사용해야합니다. 그 함수 안의 비동기 호출을 수행하는 배열에서 forEach를 수행하는 것은 안전하지 않습니다. 당신은 async.mapLimit (event.Records, 5, putIntoKinesis, 콜백)'또는 이와 유사한 어떤 비동기 라이브러리로 모든 것을 조정할 필요가있다. 또 다른 옵션은 아이템을 키네시스에 일괄 적으로 넣을 수 있다는 것입니다. 개별 작성을 할 필요가 없습니다. –