0

유스 케이스에 대한 AWS 서비스를 배우고 있습니다. 문서를 살펴본 후 간단한 흐름을 보았습니다. Streams API와 KPL을 사용하여 Kinesis 스트림에 데이터를 가져 오려고합니다. 예제 putRecord 메서드를 사용하여 스트림에 데이터를 수집합니다.Kinesis lambda DynamoDB

{"userid":1234,"username":"jDoe","firstname":"John","lastname":"Doe"} 

데이터가 난 putRecordResult에서 다음과 같은 응답받을 섭취하면 - - 나는 이러한 데이터를 얻을로 밀어 람다 함수를 작성 지금

Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318638512162631666140828401666} 
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318645765717549353915876638722} 
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318649392495008197803400757250} 

을 나는 스트림이 JSON을 섭취하고 DynamoDB 테이블 어떻게 든 내가 람다 함수 실행에 console.logs을 볼 수 아니다

console.log('Loading function'); 
var AWS = require('aws-sdk'); 
var tableName = "sampleTable"; 
var doc = require('dynamodb-doc'); 
var db = new doc.DynamoDB(); 

exports.handler = (event, context, callback) => { 
    //console.log('Received event:', JSON.stringify(event, null, 2)); 
    event.Records.forEach((record) => { 
     // Kinesis data is base64 encoded so decode here 
     const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii'); 
     console.log('Decoded payload:', payload); 
     var userid = event.userid; 
     var username = event.username; 
     var firstname = event.firstname; 
     console.log(userid + "," + username +","+ firstname); 

     var item = { 
      "userid" : userid, 
      "username" : username, 
      "firstname" : firstname 
     }; 

     var params = { 
      TableName : tableName, 
      Item : item 
     }; 
     console.log(params); 

     db.putItem(params, function(err, data){ 
      if(err) console.log(err); 
      else console.log(data); 
     }); 

    }); 
    callback(null, `Successfully processed ${event.Records.length} records.`); 
}; 

- 여기 내 람다 기능입니다. 스트림 페이지에서 스트림에 putRecord가 표시되는 것을 볼 수 있지만 어떻게 든 Lambdafunction 페이지 나 DynamoDB 테이블에서 아무것도 볼 수 없습니다.

키네시스로 데이터를 처리하기위한 Java 코드에 대한 IAM 정책과 lambda-kinesis-execution-role 인 Lambda 기능에 대한 IAM 정책 및 DynamoDB가 테이블에 데이터를 수집하는 정책이 있습니다.

올바른 방법으로 어떻게 수행되는지 보여주는 자습서가 있습니까? 이 프로세스에서 많은 포인트를 놓치고 있다는 느낌이 들었습니다. 예를 들어 모든 IAM 정책을 연결하여 동기화하도록하는 방법이 있습니다. 데이터가 스트림에 입력되면 Lambda에서 처리되어 Dynamo에서 끝납니다.

모든 도움말과 도움을 주시면 감사하겠습니다.

+1

람다 함수가 전혀 호출되지 않았습니까? 언급하지 않으므로 Kinesis의 데이터를 함수로 전달하는 AWS Lambda 이벤트를 활성화했는지 궁금합니다. http://docs.aws.amazon.com/lambda/latest/dg/with-kinesis -example-configure-event-source.html – devonlazarus

+0

댓글을 주셔서 감사합니다. 예 람다 함수의 이벤트 소스 탭에 Kinesis 스트림을 추가했으며 State-enabled 및 Details as - Batch 크기 : 100, 마지막 결과 : OK를 표시합니다. Kinesis 샘플 이벤트 템플릿을 사용하여 테스트 이벤트를 구성하고 테스트하면 Item : {id : undefined, username : undefined, firstname : undefined} – Dan

+0

이라는 오류가 표시됩니다. 위의 코드는 사용중인 코드의 직접 복사입니다 , 당신은'event.userid'를 참조하고 있지만'payload.userid'를 사용해야합니다. Kinesis 레코드를'payload' 변수에 디코딩했습니다. – devonlazarus

답변

1

위의 코드가 사용중인 코드의 직접 복사물 인 경우 event.userid을 참조하고 있지만 payload.userid을 사용해야합니다. Kinesis 레코드를 페이로드 변수에 디코딩했습니다.

0

당신은 람다 기능을 사용할 수

운동성 및 DynamoDB의

2.Now 모두

1.Create IAM 역할 DynamoDB의 프로세스 스트림의 블루 프린트에서 람다 함수를 만듭니다

3.Select 우리는 기능 만들기 4.Click IAM

에서

을 만들어 실행 역할은 이제 코드 섹션을 편집하는 이동하여 다음 코드를 작성

const AWS =require('aws-sdk'); 
const docClient =new AWS.DynamoDB.DocumentClient({region : 'us-east-1'}); 

exports.handler = (event, context, callback) => { 
    event.Records.forEach((record) => { 

    var params={ 
    Item :{ 
      ROWTIME:Date.now().toString(),//Dynamodb column name 
    DATA:new Buffer(record.kinesis.data, base64').toString('ascii')//Dynamodb column name 
       }, 
    TableName:'mytable'//Dynamodb Table Name 
    }; 

docClient.put(params,function(err,data){ 
    if(err){ 
     callback(err,null); 
    } 
    else{ 
     callback(null,data); 
    } 
}); 
}); 
}; 
관련 문제