Event-Driven Data Ingestion with AWS Lambda (S3 to S3)

Let’s say you have data coming into S3 in your AWS environment every 15 minutes and want to ingest it as it comes. The best approach for this near real-time ingestion is to use AWS lambda function. To demonstrate how to develop and deploy lambda function in AWS, we will have a look at a simple use case of moving file from source S3 to target S3 as the file is created in the source.

AWS lambda supports a few different programming languages. I will use Python for this example. To deploy lambda functions, you need to package the modules used in the function. For some reason, modules downloaded and packaged from Windows sometimes do not work (e.g. pyscopg2 never works even if I download the binary version). From my experience, Python lambda development is the best to be done in Linux or Mac.

In this example, we will use Amazon Linux EC2 instance to develop the package and deploy it from there. You can of course use your local Mac or Linux machine for this.

To deploy lambda functions, I usually use the deployment framework called Serverless, which makes deployment much easier. Here, I would like to use the manual deployment from the AWS documentation here. Doing the deployment manually will teach you how the lambda function works. I think it is good to try it at least once before using more automated solutions.

We will also cover the topic on setting the environment variables in order not to hard-code the parameters that change between environments (e.g. database connection url, access details, or bucket names).

Summary Steps

  1. Launch Linux EC2 instance
  2. Install Python 3 and create virtual environment
  3. Install required packages
  4. Code Lambda function
  5. Package function & modules into a zip file
  6. Create Lambda Function Role
  7. Deploy Lambda
  8. Test function manually
  9. Configure S3 source bucket as event source
  10. Execute the function by pushing file to the source bucket

Steps

(1) Launch a Linux EC2 Instance

Micro instance is usually good enough for developing lambda function. After deployment, you can terminate the instance. Make sure to choose the right security group and subnet so that the instance can access the appropriate resources.

(2) Install Python 3 and create virtual environment

This step and the next step can be bootstrapped (How to Launch EC2 With Bootstrap in AWS). For Centos, Redhat or Amazon Linux, follow the steps here: How to Install Python 3 and Create Virtual Environment in Centos, Redhat and Amazon Linux. For Ubuntu, follow the steps here: How to Install Python 3 and Create Virtual Environment In Ubuntu.

(3) Install required packages

Make sure to activate the virtual environment where you want to develop the lambda function. The installed packages will be in the site-packages folder. You need to pack all the site-packages content into a zip file. AWS SDK for Python (like boto3 or botocore) is pre-installed for Lambda function and you do not need to pack them. Although we do not use any other package for this code example, let’s pack psycopg2 and import it in the function to make sure the packaged modules work.

Note that you need to install the binary version for psycopg2.

source your-virtual-environment-path/activate
pip install psycopg2-binary

(4) Code Lambda Function

The PUT API call to the source S3 is the event to trigger the function. In Python, the event data is processed as dictionary.  See how the input file looks in the AWS documentation here. Make sure to change the parameter for your environment. Save this as inputFile.txt.

{  
   "Records":[  
      {  
         "eventVersion":"2.0",
         "eventSource":"aws:s3",
         "awsRegion":"us-west-2",
         "eventTime":"1970-01-01T00:00:00.000Z",
         "eventName":"ObjectCreated:Put",
         "userIdentity":{  
            "principalId":"AIDAJDPLRKLG7UEXAMPLE"
         },
         "requestParameters":{  
            "sourceIPAddress":"127.0.0.1"
         },
         "responseElements":{  
            "x-amz-request-id":"C3D13FE58DE4C810",
            "x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
         },
         "s3":{  
            "s3SchemaVersion":"1.0",
            "configurationId":"testConfigRule",
            "bucket":{  
               "name":"sourcebucket",
               "ownerIdentity":{  
                  "principalId":"A3NL1KOZZKExample"
               },
               "arn":"arn:aws:s3:::sourcebucket"
            },
            "object":{  
               "key":"HappyFace.jpg",
               "size":1024,
               "eTag":"d41d8cd98f00b204e9800998ecf8427e",
               "versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko"
            }
         }
      }
   ]
}

From the event input, you can grab bucket name and key to specify the newly created file in the source bucket.

The function grab the data from S3 and copy to a local tmp folder. Then, push it back to the target bucket. You can add transformation logic here if you need to.

Note that the target bucket name is taken from the environment variable. When we call the create lambda function API, we can set it. Alternatively, you can do it from the management console. This is recommended for the values that change between environment like database connection or access credentials. Make sure to import os to get environment variables.

PaAWS documentation about setting environment variables in Lambda functions is here. It uses Node.js as an example. It is the same in Python. You can also check out the blog about setting environment variables for Python here.

import boto3
import botocore
import psycopg2
import os

client = boto3.client('s3')

def upload_file(local_path, key):
   
    target_bucket = os.environ['targetBucket']
    client.upload_file(Filename=local_path, Bucket=target_bucket, Key=key)
    print('Completed Uploading {} to {}/{}'.format(local_path, target_bucket, key))

def handler(event, context):

    for record in event['Records']:
        bucket_name = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        local_path = '/tmp/' + key.split('/')[-1]
        client.download_file(bucket_name, key, local_path)
        print("Downloaded s3 file, {}, to {}".format(key, local_path))
        upload_file(local_path, key)

Once the function is created, use the input file (inputFile.txt) to test the function. You can add this for local testing.

f = open('inputFile.txt', 'r')
event = json.load(f)
target_bucket = 'lambda.test.target'
download_file(event, 'hello')

(5) Package function & modules into a zip file

Now that your function is working, you can package the modules and code. You need to go to the site-packages folder for your virtual environment first then pack all the modules and then pack the python file.

cd /home/ec2-user/python3/lambda/lib/python3.6/site-packages
zip -r9 /tmp/MoveS3Data.zip *
zip -g MoveS3Data.zip MoveS3Data.py

(6) Create Lambda Function Role

By using IAM, you need to create a role for Lambda function. Follow the steps in the documentation here.

(7) Deploy Lambda

Once you have the zip file and lambda role, you can deploy the function. Make sure that you have the correct role and region selected. Handler has to be your-python-file-name.main-function-name.

This is where you can define environment variables. In this example, we are getting the target bucket name from the environment variable.

cd /tmp/

aws lambda create-function \
--region ap-southeast-2 \
--function-name MoveS3Data \
--zip-file fileb://MoveS3Data.zip \
--role arn:aws:iam::<account number>:role/lambda-s3-test \
--environments Variables={targetBucket=lambda.test.target} \
--handler MoveS3Data.handler \
--runtime python3.6 \
--timeout 10 \
--memory-size 1024

If you want to update the existing function, run the update script.

aws lambda update-function-code \
--function-name MoveS3Data \
--region ap-southeast-2 \
--zip-file fileb://MoveS3Data.zip

If you want to update the list of environment variables, run the update config script.

aws lambda update-function-configuration \
--function-name MoveS3Data \
--region ap-southeast-2 \
--environment Variables={targetBucket=lambda.prod.target}

(8) Test the function manually

When your deployment is successful, invoke the function with the input file (inputFile.txt) set as payload.

aws lambda invoke \
--invocation-type Event \
--function-name MoveS3Data \
--region ap-southeast-2 \
--payload file://inputFile.txt \
outputfile.txt

(9) Configure S3 source bucket as event source

First of all, you need to add the permission to s3 (see here).

aws lambda add-permission \
  --function-name <your function name> \
  --region <your region> \
  --statement-id Id-123 \
  --action "lambda:InvokeFunction" \
  --principal s3.amazonaws.com \
  --source-arn arn:aws:s3:::<source-bucket> \
  --source-account <account number> \

Go to the source bucket and configure event as below. In this example, my source bucket is lambda.test.source.

(10) Execute the function by pushing file to the source bucket

Let’s push a file to S3 with AWS console and check if the function moved the data into the target bucket. Here is the s3 copy command reference.

aws s3 cp your_file_name.json s3://lambda.test.source/2018-03-01/your_file_name.json

You can check the execution outcome in CloudWatch, too.

Yay!

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Python

If you need to interact with a REST endpoint that takes a XML string as a payload and returns another XML string as a response, this is the quick guide if you want to use Python. If you want to do it with Node.js, you can check out the post …

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Node.js

Here is the quick Node.js example of interacting with a rest API endpoint that takes XML string as a payload and return with XML string as response. Once we get the response, we will convert it to a JSON object. For this example, we will use the old-school QAS (Quick …

Data Engineering
Downloading All Public GitHub Gist Files

I used to use plug-ins to render code blocks for this blog. Yesterday, I decided to move all the code into GitHub Gist and inject them from there. Using a WordPress plugin to render code blocks can be problematic when update happens. Plugins might not be up to date. It …