Is there are way to push data from a Lambda function to a Kinesis stream? I have searched the internet but have not found any examples related to it.
Thanks.
Yes, you can send information from Lambda to Kinesis Stream and it is very simple to do. Make sure you are running Lambda with the right permissions.
const AWS = require('aws-sdk');
const kinesisConstant = require('./kinesisConstants'); //Keep it consistent
const kinesis = new AWS.Kinesis({
apiVersion: kinesisConstant.API_VERSION, //optional
//accessKeyId: '<you-can-use-this-to-run-it-locally>', //optional
//secretAccessKey: '<you-can-use-this-to-run-it-locally>', //optional
region: kinesisConstant.REGION
});
const savePayload = (payload) => {
//We can only save strings into the streams
if( typeof payload !== kinesisConstant.PAYLOAD_TYPE) {
try {
payload = JSON.stringify(payload);
} catch (e) {
console.log(e);
}
}
let params = {
Data: payload,
PartitionKey: kinesisConstant.PARTITION_KEY,
StreamName: kinesisConstant.STREAM_NAME
};
kinesis.putRecord(params, function(err, data) {
if (err) console.log(err, err.stack);
else console.log('Record added:',data);
});
};
exports.save = (payload) => {
const params = {
StreamName: kinesisConstant.STREAM_NAME,
};
kinesis.describeStream(params, function(err, data) {
if (err) console.log(err, err.stack);
else {
//Make sure stream is able to take new writes (ACTIVE or UPDATING are good)
if(data.StreamDescription.StreamStatus === kinesisConstant.STATE.ACTIVE
|| data.StreamDescription.StreamStatus === kinesisConstant.STATE.UPDATING ) {
savePayload(payload);
} else {
console.log(`Kinesis stream ${kinesisConstant.STREAM_NAME} is ${data.StreamDescription.StreamStatus}.`);
console.log(`Record Lost`, JSON.parse(payload));
}
}
});
};
module.exports = {
STATE: {
ACTIVE: 'ACTIVE',
UPDATING: 'UPDATING',
CREATING: 'CREATING',
DELETING: 'DELETING'
},
STREAM_NAME: '<your-stream-name>',
PARTITION_KEY: '<string-value-if-one-shard-anything-will-do',
PAYLOAD_TYPE: 'String',
REGION: '<the-region-where-you-have-lambda-and-kinesis>',
API_VERSION: '2013-12-02'
}
const kinesis = require('./kinesis');
exports.handler = (event, context, callback) => {
console.log('LOADING handler');
const done = (err, res) => callback(null, {
statusCode: err ? '400' : '200',
body: err || res,
headers: {
'Content-Type': 'application/json',
},
});
kinesis.save(event); // here we send it to the stream
done(null, event);
}