0

DynamoDB 테이블에 쓰는 응용 프로그램이 있는데 Kinesis에서 내 집계를 수행 한 다음 집계 된 데이터를 다른 DynamoDB 테이블에 기록하려고합니다.Kinesis 스트림에 대한 DynamoDB

스트림은 내 DynamoDB의 테이블 활성화, 다음과 같이 내가 스트림에 Lamdba 트리거를 가지고있다 : 나는 람다 테스트 이벤트에서 세 가지 또는 네 가지 요소가있을 때

'use strict'; 

var AWS = require('aws-sdk'); 
var kinesis = new AWS.Kinesis(); 

exports.handler = (event, context, callback) => { 
    event.Records.forEach((record) => { 

     var myValue = record.dynamodb.NewImage.myValue.N; 
     var partitionKey = record.key.S; 
     var data = '{"VALUE":"' + myValue + '"}'; 

     var recordParams = { 
      Data: data, 
      PartitionKey: partitionKey, 
      StreamName: 'MyStreamName' 
     }; 

     console.log('Try Put to Kinesis Stream'); 

     kinesis.putRecord(recordParams, function(err, data) { 
      if (err) { 
       console.log('Failed Put'); 
      } else { 
       console.log('Successful Put'); 
      } 
     }); 
    }); 
}; 

이 내 운동성 스트림에 성공적으로 기록 .

트리거를 활성화하면 키네시스 스트림에 전혀 쓰지 않습니다. 한 번에 약 100 가지 요소가 들어오는 것으로 보입니다. Cloudwatch에서 'Kinesis Stream에 넣기 시도'메시지가 표시되지만 성공/실패 메시지조차도 볼 수 없습니다.

나는이 문제에 대해 완전히 잘못되었거나 더 나은 접근을하고 있습니까?

DynamoDB의의 스트림이 바로 내 첫 번째 수상 :

답변

2

당신의 실수가 모든 kinesis.putRecord 호출이 완료 될 때까지 람다 함수가 대기하지 않는다는 것입니다 것 운동성 분석에 공급할 수 있다면.

Node.js에는 콜백 프로그래밍 모델이 있습니다. 비동기 요청을하고 요청이 완료되면 콜백이 호출됩니다. 따라서 함수가 반환 될 때 요청이 완료되지 않습니다. 콜백이 호출 될 때 완료됩니다. 이 문제에

두 솔루션 : 호출 된 콜백의

보관할 트랙 자신

'use strict'; 
var AWS = require('aws-sdk'); 
var kinesis = new AWS.Kinesis(); 
exports.handler = (event, context, callback) => { 
    event.Records.forEach((record) => { 
     var myValue = record.dynamodb.NewImage.myValue.N; 
     var partitionKey = record.key.S; 
     var data = '{"VALUE":"' + myValue + '"}'; 
     var recordParams = { 
      Data: data, 
      PartitionKey: partitionKey, 
      StreamName: 'MyStreamName' 
     }; 
     console.log('Try Put to Kinesis Stream'); 
     var i = 0; 
     kinesis.putRecord(recordParams, function(err, data) { 
      if (err) { 
       console.log('Failed Put'); 
       i = event.Records.length; 
      } else { 
       console.log('Successful Put'); 
       i += 1; 
      } 
      if (i === event.Records.length) { 
       console.log('All done'); 
       callback(err); 
      } 
     }); 
    }); 
}; 

또는 비동기 같은 라이브러리를 사용 : https://www.npmjs.com/package/async

+0

작은 데이터 객체 인 {{ "VALUE": "12345"} '에서 작동하지만 일부 속성 (객체가> 6)을 추가하자마자 기록에 스트림이 기록되지 않았습니다. 주말 내내 Kinesis 응용 프로그램을 사용할 수 없게되었고, 지금 다시 시도되었습니다. 내 관심사는 이제 다시이 작업을 수행하고 모든 레코드가 스트림으로 전달되지 않는다는 것입니다. – intanethi

+0

그것은 4 시간 동안 부드럽게 움직 였고, 키네시스 시내에는 아무 것도 기록되지 않았습니다. 함수의 타임 아웃을 늘려 도움을 준 것처럼 보였지만 첫 번째 제한 시간 초과 요청 후에도 모든 후속 요청도 시간 초과되었습니다. 왜 대부분의 일괄 처리가 아주 작았는지 확실하지 않습니다. – intanethi

+0

비동기 라이브러리를 사용해야합니다. 그 함수 안의 비동기 호출을 수행하는 배열에서 forEach를 수행하는 것은 안전하지 않습니다. 당신은 async.mapLimit (event.Records, 5, putIntoKinesis, 콜백)'또는 이와 유사한 어떤 비동기 라이브러리로 모든 것을 조정할 필요가있다. 또 다른 옵션은 아이템을 키네시스에 일괄 적으로 넣을 수 있다는 것입니다. 개별 작성을 할 필요가 없습니다. –

0

그것은 전체 문제의 일부 나에게 보인다 (에 callback을 호출 할 필요가 있음) 및 설명에 설명 된 동작은 Data 값을 작성하는 방법 일 수 있습니다. Data에 대한 JSON 문자열을 수동으로 만드는 대신 JSON.stringify을 사용해보십시오. 그러면 입력이 항상 올바르게 형식화됩니다.

관련 문제