Event-Driven Data Ingestion with AWS Lambda (S3 to RDS)
- By : Mydatahack
- Category : Data Engineering, Data Ingestion
- Tags: AWS Lambda, Near Real-Time, Python, RDS, S3
In the previous post, we discussed how to move data from the source S3 bucket to the target whenever a new file is created in the source bucket by using AWS Lambda function. In this post, I will show you how to use Lambda to execute data ingestion from S3 to RDS whenever a new file is created in the source bucket. AWS Lambda supports a few different programming languages. We will use Python 3.6 here.
To access RDS with the lambda function, your lambda function need to access the VPC where RDS reside by giving the right permission to the function. You also need to tell the function which VPC to access and which security group within the VPC to use. It is actually simpler than it sounds and I will cover all of these here. AWS has a documentation on how to configure lambda function to access RDS here, which I referred to write this post.
Scenario
Let’s think about a near real-time ingestion scenario. You have an application that does data dump in S3 every 15 minutes in JSON. Your requirement is to grab the data from S3, transform it and write it to Postgres RDS every time a new file comes to the bucket.
Ingestion Details
As an example, let’s use the JSON example data used here (How Postgres JSON Query Handles Missing Key). This data was also used in the previous Lambda post (Event-Driven Data Ingestion with AWS Lambda (S3 to S3)). Essentially, we will change the target from S3 to Postgres RDS. As an ingestion method, we will load the data as JSON into Postgres. We discussed this ingestion method here (New JSON Data Ingestion Strategy by Using the Power of Postgres).
Prerequisites
- Mac or Linux machine with Python 3.6 development environment.
- Virtual environment for this lambda function with psycopg2-binary installed
- Postgres RDS with target schema and table (e.g. usermanaged.transaction)
- Source S3 Bucket (e.g. lambda.test.source)
If you have a Windows machine, I struggled to get psycopg2 working in lambda when I push the function from there. I recommend to use a Linux virtual box to develop lambda function if you are using Windows. Lambda function runs on Linux EC2 and developing code in the similar OS environment is better for a script language like Python.
To set up Python development environment in Linux, we have instructions.
- How to Install Python 3 and Create Virtual Environment in Centos, Redhat and Amazon Linux
- How to Install Python 3 and Create Virtual Environment In Ubuntu
To launch Postgres RDS, see the post below. You need to create a table with one column with the jsonb data type.
You need to create a table with one column with the jsonb data type. See the post below for the data set and table creation.
Code
We will use the config file for database connection details. For the target schema and table name, we will set environment variables when we deploy the function. Save the file as db_config.py.
1 2 3 4 | db_username = "<user name>" db_password = "<password>" db_name = "<database name>" db_endpoint = "<rds endpoint>" |
The main function is handler(). The file is saved as MoveS3ToPg.py, which will be the lambda function name.
The bucket name and key are retrieved from the event. S3 event is a JSON file that contains bucket name and object key.
The code retrieves the target file and transform it to a csv file. Then, it uploads to Postgres with copy command. For the detailed explanation on this ingestion pattern, refer to New JSON Data Ingestion Strategy by Using the Power of Postgres.
For database parameters, import db_config.py with import statement and retrieve the value. Schema and table names are set as environment variables which can be retrieved by os.environ.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | import boto3 import botocore import psycopg2 import os import json import db_config dbname = db_config.db_name user = db_config.db_username host = db_config.db_endpoint password = db_config.db_password schema = os.environ['targetSchema'] table = os.environ['targetTable'] connection_string = "dbname='{}' user='{}' host='{}' password='{}'"\ .format(dbname, user, host, password) client = boto3.client('s3', 'ap-southeast-2', \ config=botocore.config.Config(s3={'addression_style':'path'})) # check print(dbname, user, host, password, schema, table, connection_string) def pg_load(connection_string, table_name, file_path): try: conn = psycopg2.connect(connection_string) print("Connecting to Database") cur = conn.cursor() # Open the input file for copy f = open(file_path, "r") # Load csv file into the table cur.copy_expert("copy {} FROM STDIN WITH CSV quote e'\x01' delimiter e'\x02'".format(table_name), f) cur.execute("commit;") print("Loaded data into {}".format(table_name)) conn.close() print("DB connection closed.") except Exception as e: print('Error {}'.format(str(e))) def transform_json(input_path, output_path): # Open the input file and load as json input = open(input_path, 'r') json_file = json.load(input) # Open the output file and create csv file for db upload output = open(output_path, 'w') for record in json_file: output.write(json.dumps(record)) output.write('\n') output.close() print('Transformed {} to {}'.format(input_path, output_path)) def handler(event, context): # Get the info from the S3 Put event for record in event['Records']: bucket_name = record['s3']['bucket']['name'] key = record['s3']['object']['key'] local_path = '/tmp/' + key.split('/')[-1] # Download file from S3 client.download_file(bucket_name, key, local_path) print("Downloaded s3 file, {}, to {}".format(key, local_path)) # Transform the file output_path = '/tmp/output.csv' transform_json(local_path, output_path) # Load csv to Postgres pg_load(connection_string, schema+'.'+table, output_path) |
S3 Put Event JSON Example
Whenever the file is created in the source bucket, it will send the JSON file to the lambda function. This is used to test the function manually. You need to change a few parameters (like bucket or object name) to suit your set up.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | { "Records":[ { "eventVersion":"2.0", "eventSource":"aws:s3", "awsRegion":"us-west-2", "eventTime":"1970-01-01T00:00:00.000Z", "eventName":"ObjectCreated:Put", "userIdentity":{ "principalId":"AIDAJDPLRKLG7UEXAMPLE" }, "requestParameters":{ "sourceIPAddress":"127.0.0.1" }, "responseElements":{ "x-amz-request-id":"C3D13FE58DE4C810", "x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD" }, "s3":{ "s3SchemaVersion":"1.0", "configurationId":"testConfigRule", "bucket":{ "name":"sourcebucket", "ownerIdentity":{ "principalId":"A3NL1KOZZKExample" }, "arn":"arn:aws:s3:::sourcebucket" }, "object":{ "key":"HappyFace.jpg", "size":1024, "eTag":"d41d8cd98f00b204e9800998ecf8427e", "versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko" } } } ] } |
Summary Steps
- Create VPC Endpoint for Amazon S3
- Create a custom policy for the function (e.g. s3_to_pg_lambda)
- Attached the policy to the role used for the function (e.g. s3_to_pg_lambda)
- Create a function and config file.
- Package the code with the required libraries and the config file
- Deploy the function
- Test the function by manually invoking it
- Add permission for the function to access S3 bucket
- Configure event in the source bcuket
- Test the function by S3 PUT request.
Steps
(1) Create VPC Endpoint for Amazon S3
To access S3 from Lambda function executed within a VPC, I needed to configure VPC Endpoint for S3. How to access S3 from a VPC depends on your setup. Your VPC might already allows access to S3 without creating an endpoint (e.g. using Internet Gateway or NAT). I am using the environment created here (How to Create Your Own Personal Data Science Computing Environment In AWS). In this set up, the best approach is to create the endpoint.
If the lambda function times out before downloading the file from S3, you have the access configuration issue.
(2) Create a custom policy for the function
Lambda function needs to get data from S3 and access to RDS within a VPC. For RDS access, you need EC2 actions to create ENIs (used to execute the function within the specified VPC) and CloudWatch Logs action to write logs. For this, you can use the pre-made AWSLambdaVPCAccessExecutionRole. The policy looks like below. I simply added a few S3 action permissions to AWSLambdaVPCAccessExecutionRole. Use this to create a custom policy.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface" ], "Resource": "*" } ] } |
(3) Create a new lambda execution role and attach the policy
Once the policy is created, you need to create a new lambda execution role and attach the policy to it.
(4) Create a function and config file
Refer to the config file and code above. The name of the main python file should be the name of the function.
(5) Package the code with the required libraries and the config file
Go to the site-packages folder of the virtual environment where the lambda function was developed. Zip the entire content. Add both config and main function files.
1 2 3 4 5 | cd /home/user/python3/lambda_pg/lib/python3.6/site-packages zip -r9 /home/user/tmp/MoveS3ToPg.zip * cd /home/user/tmp/ zip -g MoveS3ToPg.zip MoveS3ToPg.py zip -g MoveS3ToPg.zip db_config.py |
(6) Deploy the function
Make sure to have the vpc-config parameter with all the subnets of the RDS subnet group. The security group cannot be the same as RDS. This is because AWS resources cannot connect within the same security group. Use the default security group for the VPC. It usually works.
If you have multiple environment variables as in this example, use double-quote to wrap the variables.
The handler should be set as {function-name}.{main-function-name-in-the-code}. The role is the custom role created in step 3.
1 2 3 4 5 6 7 8 9 10 11 | aws lambda create-function \ --region ap-southeast-2 \ --function-name MoveS3ToPg \ --zip-file fileb://MoveS3ToPg.zip \ --role arn:aws:iam::<account no>:role/s3_to_pg_lambda \ --environment Variables="{targetSchema=usermanaged,targetTable=transaction}" \ --vpc-config SubnetIds=subnet-xxxxxx1,subnet-xxxxxx6,SecurityGroupIds=sg-1aaaaa1 \ --handler MoveS3ToPg.handler \ --runtime python3.6 \ --timeout 10 \ --memory-size 1024 |
If you need to update the function, use the update-function-code command below.
1 2 3 4 | aws lambda update-function-code \ --function-name MoveS3ToPg \ --region ap-southeast-2 \ --zip-file fileb://MoveS3ToPg.zip |
If you need to update the config (such as vpc-config or environment variables), use the update-function-configuration command.
1 2 3 4 | aws lambda update-function-configuration \ --function-name MoveS3ToPg \ --region ap-southeast-2 \ --vpc-config SubnetIds=subnet-xxxxxx1,subnet-xxxxxx6,SecurityGroupIds=sg-1aaaaa1 |
(7) Test the function by manually invoking it
Use the lambda invoke command. The payload is the S3 event Json file you created manually from the example above.
1 2 3 4 5 6 | aws lambda invoke \ --invocation-type Event \ --function-name MoveS3ToPg \ --region ap-southeast-2 \ --payload file://inputFile.txt \ outputfile.txt |
(8) Add permission for the function to access S3 bucket
Run lambda add-permission command. Without the permission, you cannot configure lambda event in the source bucket.
(9) Configure event in the source bucket.
Whenever the put request happens, you are sending the event to the lambda function.
(10) Test the function by S3 PUT request.
You can use the aws cli to upload the file to the target bucket and check if the lambda function executes correctly.
1 | aws s3 cp ./source_file.json s3://lambda.test.source/today/source_file.json |
Error Handling
When the permissions, execution role, VPC access to S3, security group or subnets are not configured correctly, your function will timeout. This means the function will max out the default execution time limit and stop execution.
If your function starts timing out after successful executions, you may need to re-deploy the function. Then, it will be fixed.