Event-Driven S3 Data Ingestion With Node.js Lambda Function and Deploy it with Serverless

Ingesting data upon the file creating on S3 bucket enables near real-time data ingestion. For example, you may need to ingest log files from applications or API monitoring tools as soon as they land on the bucket. Just to get it started, let’s move the file from the source bucket to the target as soon as it gets created in the source bucket by using the node.js lambda function.

We covered the same ingestion pattern with Python in the past. Here, we are using node.js for the lambda function and serverless for deployment including the creation of source and target bucket.

Prerequisites

I won’t cover basic stuff because they are already covered in the past. If you need to brush up your knowledge, you can follow the link that was covered in my posts.

Event Payload

By configuring the event notification on S3, it will send the event as the JSON format to the lambda function. The event in the lambda function argument looks like the Json object below (see further information here).

{  
    "Records":[  
       {  
          "eventVersion":"2.0",
          "eventSource":"aws:s3",
          "awsRegion":"us-west-2",
          "eventTime":"1970-01-01T00:00:00.000Z",
          "eventName":"ObjectCreated:Put",
          "userIdentity":{  
             "principalId":"AIDAJDPLRKLG7UEXAMPLE"
          },
          "requestParameters":{  
             "sourceIPAddress":"127.0.0.1"
          },
          "responseElements":{  
             "x-amz-request-id":"C3D13FE58DE4C810",
             "x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
          },
          "s3":{  
             "s3SchemaVersion":"1.0",
             "configurationId":"testConfigRule",
             "bucket":{  
                "name":"",
                "ownerIdentity":{  
                   "principalId":"A3NL1KOZZKExample"
                },
                "arn":"arn:aws:s3:::sourcebucket"
             },
             "object":{  
                "key":"object name such as data/data.json",
                "size":1024,
                "eTag":"d41d8cd98f00b204e9800998ecf8427e",
                "versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko"
             }
          }
       }
    ]
 }

Lambda Function

This should be relatively straight forward. The source bucket and file names are retrieved from the payload object (for further information, see Uploading and Downloading Files in S3 with Node.js).

Lambda Function Deployment

With Serverless, we can define function, events and resource creations with config files or parameters in the serverless.yml file. It is a simplified version of CloudFormation template for deploying lambda function.

In the previous post, we only deployed the function with serverless.yml. This means we had to create source and target buckets and configure event on the source as well as creating lambda permissions for the bucket to invoke the function.

To create resources, you can use the raw CloudFormation template syntax in YAML under the resources section. Whatever you defined in serverless.yml gets converted into the CloudFormation template which can be found in the .serveless folder.

In this way, we can create all the appropriate resources and permissions when we deploy the function and update on any resource or permission setting can be changed when you update the serverless.yml file.

However, there is a catch to this. Because of the way CloudFormation creates resources, we cannot add the event notification to the source bucket upon creation. With the NotificationConfiguration property in the source bucket, you will get the error, unable to validate the following destination configuration. You can read more about this issue here.

The easiest way to avoid this error is to deploy the function without NoficationConfiguration first. Once you deploy the lambda function first time, you can add it to the resource and deploy it again. The second deployment is seen as a stack update and it will add the event notification to the source bucket.

There will be a better way than deploying the function twice with update. But, I think it is the simplest workaround.

Here is the template. For the IAM roles statement, we separated it into a permissions.yml file in the config folder (see here for getting values from config files for serverless.yml).

serverless.yml

service: nodejs-s3-file
custom:
  env: test
  targetBucket: your.target.bucket.name
  sourceBucket: your.source.bucket.name
provider:
  name: aws
  runtime: nodejs6.10
  region: ap-southeast-2
  iamRoleStatements: ${file(config/permissions.yaml)}
  deploymentBucket:
    name: ${self:custom.env}.lambdafunction.bucket
    serverSideEncryption: AES256
  environment:
    TargetBucket: ${self:custom.targetBucket}
functions:
  s3Uploader:
    handler: handler.s3Uploader
    name: nodejs-transfer-S3
    events:
      - s3:
        bucket: ${self:custom.sourceBucket}
        event: s3:ObjectCreated:*
resources:
  Resources:
    sourceBucket:
      Type: AWS::S3::Bucket
      Properties:
        BucketName: ${self:custom.sourceBucket}
        # Add this section after creation of this stack
        # It will be added as stack update.
        NotificationConfiguration:
          LambdaConfigurations:
            -
              Function: 
                "Fn::GetAtt":
                  - S3UploaderLambdaFunction
                  - Arn
              Event: "s3:ObjectCreated:*"
    targetBucket:
      Type: AWS::S3::Bucket
      Properties:
        BucketName: ${self:custom.targetBucket}
    lambdaPermission:
      Type: AWS::Lambda::Permission
      Properties:
        Action: lambda:InvokeFunction
        SourceArn: 
          "Fn::GetAtt":
            - sourceBucket
            - Arn
        FunctionName: 
          "Fn::GetAtt":
            - S3UploaderLambdaFunction
            - Arn
        Principal: s3.amazonaws.com

permission.yml

- Effect: Allow
  Action:
    - s3:GetObject
    - s3:ListBucket
    - s3:PutObject
    - logs:CreateLogGroup
    - logs:CreateLogStream
    - logs:PutLogEvents
    - ec2:CreateNetworkInterface
    - ec2:DescribeNetworkInterfaces
    - ec2:DeleteNetworkInterface
  Resource: "*"
Data Engineering
Node Module to Retrieve Multiple Parameters from AWS Parameter Store

I created a node module to retrieve multiple parameters as the parameterName-parameterValue Json object from AWS Parameter Store. It is called aws-ssm-parameters and is available from Npm. The module is namespaced. So, you need to add the namespace @mdhnpm when you install it. npm i @mdhnpm/aws-ssm-parameters The main motivation for creating …

Data Engineering
Downloading All Public GitHub Gist Files

I used to use plug-ins to render code blocks for this blog. Yesterday, I decided to move all the code into GitHub Gist and inject them from there. Using a WordPress plugin to render code blocks can be problematic when update happens. Plugins might not be up to date. It …

Data Engineering
6
Loading Data Frame to Relational Database with R

Once you create a data frame with R, you may need to load it to a relational database for data persistence. You might have a data transformation batch job written in R and want to load database in a certain frequency. Here, I created a function to load data into …