Event-Driven Data Ingestion with AWS Lambda (S3 to RDS)

In the previous post, we discussed how to move data from the source S3 bucket to the target whenever a new file is created in the source bucket by using AWS Lambda function. In this post, I will show you how to use Lambda to execute data ingestion from S3 to RDS whenever a new file is created in the source bucket. AWS Lambda supports a few different programming languages. We will use Python 3.6 here.

To access RDS with the lambda function, your lambda function need to access the VPC where RDS reside by giving the right permission to the function. You also need to tell the function which VPC to access and which security group within the VPC to use. It is actually simpler than it sounds and I will cover all of these here. AWS has a documentation on how to configure lambda function to access RDS here, which I referred to write this post.

Scenario

Let’s think about a near real-time ingestion scenario. You have an application that does data dump in S3 every 15 minutes in JSON. Your requirement is to grab the data from S3, transform it and write it to Postgres RDS every time a new file comes to the bucket.

Ingestion Details

As an example, let’s use the JSON example data used here (How Postgres JSON Query Handles Missing Key). This data was also used in the previous Lambda post (Event-Driven Data Ingestion with AWS Lambda (S3 to S3)). Essentially, we will change the target from S3 to Postgres RDS. As an ingestion method, we will load the data as JSON into Postgres. We discussed this ingestion method here (New JSON Data Ingestion Strategy by Using the Power of Postgres).

Prerequisites

  • Mac or Linux machine with Python 3.6 development environment.
  • Virtual environment for this lambda function with psycopg2-binary installed
  • Postgres RDS with target schema and table (e.g. usermanaged.transaction)
  • Source S3 Bucket (e.g. lambda.test.source)

If you have a Windows machine, I struggled to get psycopg2 working in lambda when I push the function from there. I recommend to use a Linux virtual box to develop lambda function if you are using Windows. Lambda function runs on Linux EC2 and developing code in the similar OS environment is better for a script language like Python.

To set up Python development environment in Linux, we have instructions.

To launch Postgres RDS, see the post below. You need to create a table with one column with the jsonb data type.

You need to create a table with one column with the jsonb data type. See the post below for the data set and table creation.

Code

We will use the config file for database connection details. For the target schema and table name, we will set environment variables when we deploy the function. Save the file as db_config.py.

1
2
3
4
db_username = "<user name>"
db_password = "<password>"
db_name = "<database name>"
db_endpoint = "<rds endpoint>"

The main function is handler(). The file is saved as MoveS3ToPg.py, which will be the lambda function name.

The bucket name and key are retrieved from the event. S3 event is a JSON file that contains bucket name and object key.

The code retrieves the target file and transform it to a csv file. Then, it uploads to Postgres with copy command. For the detailed explanation on this ingestion pattern, refer to New JSON Data Ingestion Strategy by Using the Power of Postgres.

For database parameters, import db_config.py with import statement and retrieve the value. Schema and table names are set as environment variables which can be retrieved by os.environ.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import boto3
import botocore
import psycopg2
import os
import json
import db_config

dbname = db_config.db_name
user = db_config.db_username
host = db_config.db_endpoint
password = db_config.db_password
schema = os.environ['targetSchema']
table = os.environ['targetTable']

connection_string = "dbname='{}' user='{}' host='{}' password='{}'"\
    .format(dbname, user, host, password)

client = boto3.client('s3', 'ap-southeast-2', \
config=botocore.config.Config(s3={'addression_style':'path'}))

# check
print(dbname, user, host, password, schema, table, connection_string)

def pg_load(connection_string, table_name, file_path):
    try:
        conn = psycopg2.connect(connection_string)
        print("Connecting to Database")
        cur = conn.cursor()
        # Open the input file for copy
        f = open(file_path, "r")
        # Load csv file into the table
        cur.copy_expert("copy {} FROM STDIN WITH CSV quote e'\x01' delimiter e'\x02'".format(table_name), f)
        cur.execute("commit;")
        print("Loaded data into {}".format(table_name))
        conn.close()
        print("DB connection closed.")

    except Exception as e:
        print('Error {}'.format(str(e)))

def transform_json(input_path, output_path):
    # Open the input file and load as json
    input = open(input_path, 'r')
    json_file = json.load(input)
    # Open the output file and create csv file for db upload
    output = open(output_path, 'w')
    for record in json_file:
        output.write(json.dumps(record))
        output.write('\n')
    output.close()
    print('Transformed {} to {}'.format(input_path, output_path))

def handler(event, context):
    # Get the info from the S3 Put event
    for record in event['Records']:
        bucket_name = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        local_path = '/tmp/' + key.split('/')[-1]
        # Download file from S3
        client.download_file(bucket_name, key, local_path)
        print("Downloaded s3 file, {}, to {}".format(key, local_path))
        # Transform the file
        output_path = '/tmp/output.csv'
        transform_json(local_path, output_path)
        # Load csv to Postgres
        pg_load(connection_string, schema+'.'+table, output_path)

S3 Put Event JSON Example

Whenever the file is created in the source bucket, it will send the JSON file to the lambda function. This is used to test the function manually. You need to change a few parameters (like bucket or object name) to suit your set up.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{  
   "Records":[  
      {  
         "eventVersion":"2.0",
         "eventSource":"aws:s3",
         "awsRegion":"us-west-2",
         "eventTime":"1970-01-01T00:00:00.000Z",
         "eventName":"ObjectCreated:Put",
         "userIdentity":{  
            "principalId":"AIDAJDPLRKLG7UEXAMPLE"
         },
         "requestParameters":{  
            "sourceIPAddress":"127.0.0.1"
         },
         "responseElements":{  
            "x-amz-request-id":"C3D13FE58DE4C810",
            "x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
         },
         "s3":{  
            "s3SchemaVersion":"1.0",
            "configurationId":"testConfigRule",
            "bucket":{  
               "name":"sourcebucket",
               "ownerIdentity":{  
                  "principalId":"A3NL1KOZZKExample"
               },
               "arn":"arn:aws:s3:::sourcebucket"
            },
            "object":{  
               "key":"HappyFace.jpg",
               "size":1024,
               "eTag":"d41d8cd98f00b204e9800998ecf8427e",
               "versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko"
            }
         }
      }
   ]
}

Summary Steps

  1. Create VPC Endpoint for Amazon S3
  2. Create a custom policy for the function (e.g. s3_to_pg_lambda)
  3. Attached the policy to the role used for the function (e.g. s3_to_pg_lambda)
  4. Create a function and config file.
  5. Package the code with the required libraries and the config file
  6. Deploy the function
  7. Test the function by manually invoking it
  8. Add permission for the function to access S3 bucket
  9. Configure event in the source bcuket
  10. Test the function by S3 PUT request.

Steps

(1) Create VPC Endpoint for Amazon S3

To access S3 from Lambda function executed within a VPC, I needed to configure VPC Endpoint for S3. How to access S3 from a VPC depends on your setup. Your VPC might already allows access to S3 without creating an endpoint (e.g. using Internet Gateway or NAT). I am using the environment created here (How to Create Your Own Personal Data Science Computing Environment In AWS). In this set up, the best approach is to create the endpoint.

If the lambda function times out before downloading the file from S3, you have the access configuration issue.

(2) Create a custom policy for the function

Lambda function needs to get data from S3 and access to RDS within a VPC. For RDS access, you need EC2 actions to create ENIs (used to execute the function within the specified VPC) and CloudWatch Logs action to write logs. For this, you can use the pre-made AWSLambdaVPCAccessExecutionRole. The policy looks like below. I simply added a few S3 action permissions to AWSLambdaVPCAccessExecutionRole. Use this to create a custom policy.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "ec2:CreateNetworkInterface",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DeleteNetworkInterface"
            ],
            "Resource": "*"
        }
    ]
}

(3) Create a new lambda execution role and attach the policy

Once the policy is created, you need to create a new lambda execution role and attach the policy to it.

(4) Create a function and config file

Refer to the config file and code above. The name of the main python file should be the name of the function.

(5) Package the code with the required libraries and the config file

Go to the site-packages folder of the virtual environment where the lambda function was developed. Zip the entire content. Add both config and main function files.

1
2
3
4
5
cd /home/user/python3/lambda_pg/lib/python3.6/site-packages
zip -r9 /home/user/tmp/MoveS3ToPg.zip *
cd /home/user/tmp/
zip -g MoveS3ToPg.zip MoveS3ToPg.py
zip -g MoveS3ToPg.zip db_config.py

(6) Deploy the function

Make sure to have the vpc-config parameter with all the subnets of the RDS subnet group. The security group cannot be the same as RDS. This is because AWS resources cannot connect within the same security group. Use the default security group for the VPC. It usually works.

If you have multiple environment variables as in this example, use double-quote to wrap the variables.

The handler should be set as {function-name}.{main-function-name-in-the-code}. The role is the custom role created in step 3.

1
2
3
4
5
6
7
8
9
10
11
aws lambda create-function \
--region ap-southeast-2 \
--function-name MoveS3ToPg \
--zip-file fileb://MoveS3ToPg.zip \
--role arn:aws:iam::<account no>:role/s3_to_pg_lambda \
--environment Variables="{targetSchema=usermanaged,targetTable=transaction}" \
--vpc-config SubnetIds=subnet-xxxxxx1,subnet-xxxxxx6,SecurityGroupIds=sg-1aaaaa1 \
--handler MoveS3ToPg.handler \
--runtime python3.6 \
--timeout 10 \
--memory-size 1024

If you need to update the function, use the update-function-code command below.

1
2
3
4
aws lambda update-function-code \
--function-name MoveS3ToPg \
--region ap-southeast-2 \
--zip-file fileb://MoveS3ToPg.zip

If you need to update the config (such as vpc-config or environment variables), use the update-function-configuration command.

1
2
3
4
aws lambda update-function-configuration \
--function-name MoveS3ToPg \
--region ap-southeast-2 \
--vpc-config SubnetIds=subnet-xxxxxx1,subnet-xxxxxx6,SecurityGroupIds=sg-1aaaaa1

(7) Test the function by manually invoking it

Use the lambda invoke command. The payload is the S3 event Json file you created manually from the example above.

1
2
3
4
5
6
aws lambda invoke \
--invocation-type Event \
--function-name MoveS3ToPg \
--region ap-southeast-2 \
--payload file://inputFile.txt \
outputfile.txt

(8) Add permission for the function to access S3 bucket

Run lambda add-permission command. Without the permission, you cannot configure lambda event in the source bucket.

(9) Configure event in the source bucket.

Whenever the put request happens, you are sending the event to the lambda function.

(10) Test the function by S3 PUT request.

You can use the aws cli to upload the file to the target bucket and check if the lambda function executes correctly.

1
aws s3 cp ./source_file.json s3://lambda.test.source/today/source_file.json

Error Handling

When the permissions, execution role, VPC access to S3, security group or subnets are not configured correctly, your function will timeout. This means the function will max out the default execution time limit and stop execution.

If your function starts timing out after successful executions, you may need to re-deploy the function. Then, it will be fixed.

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Python

If you need to interact with a REST endpoint that takes a XML string as a payload and returns another XML string as a response, this is the quick guide if you want to use Python. If you want to do it with Node.js, you can check out the post …

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Node.js

Here is the quick Node.js example of interacting with a rest API endpoint that takes XML string as a payload and return with XML string as response. Once we get the response, we will convert it to a JSON object. For this example, we will use the old-school QAS (Quick …

Data Engineering
Downloading All Public GitHub Gist Files

I used to use plug-ins to render code blocks for this blog. Yesterday, I decided to move all the code into GitHub Gist and inject them from there. Using a WordPress plugin to render code blocks can be problematic when update happens. Plugins might not be up to date. It …