Data Engineering in S3 and Redshift with Python

AWS offers a nice solution to data warehousing with their columnar database, Redshift, and an object storage, S3. Python and AWS SDK make it easy for us to move data in the ecosystem.

In this post, I will present code examples for the scenarios below:

  • Uploading data from S3 to Redshift
  • Unloading data from Redshift to S3
  • Uploading data to S3 from a server or local computer

The best way to load data to Redshift is to go via S3 by calling a copy command because of its ease and speed. You can upload data into Redshift from both flat files and json files.

You can also unload data from Redshift to S3 by calling an unload command. Boto3 (AWS SDK for Python) enables you to upload file into S3 from a server or local computer.

Preparation

I usually encourage people to use Python 3. When it comes to AWS, I highly recommend to use Python 2.7. It will make your life much easier. For example, if you want to deploy a Python script in an EC2 instance or EMR through Data Pipeline to leverage their serverless archtechture, it is faster and easier to run code in 2.7. The code examples are all written 2.7, but they all work with 3.x, too.

You need to install boto3 and psycopg2 (which enables you to connect to Redshift).

pip install boto3
pip install psycopg2

Finally, you need to install the AWS Command Line Interface (see Installing the AWS Command Line Interface) and configure it (see Configuring the AWS CLI) in the server you are running your program or the local machine. This is not necessary if you are running the code through Data Pipeline. This is pre-installed in the EC2 instance. Boto3 leverages the credentials stored in AWS CLI. Once AWS CLI is configured, you do not need to enter any AWS credentials in the code to move data to and from S3.

Let’s get down to the business!

Code Examples

Example 1: Upload a file into Redshift from S3

There are many options you can specify. In this case, the data is a pipe separated flat file. You can upload json, csv and so on. For further reference on Redshift copy command, you can start from here.

schema = sys.argv[2]
dbname = sys.argv[3]
port = sys.argv[4]
user = sys.argv[5]
password = sys.argv[6]
host_url = sys.argv[7]
file_path = sys.argv[8]
aws_access_key_id = sys.argv[9]
aws_secret_access_key = sys.argv[10]

def main():
    '''This method will unload redshift table into S3'''
    conn_string = "dbname='{}' port='{}' user='{}' password='{}' host='{}'"\
        .format(dbname,port,user,password,host_url)  
    sql="""copy {}.{} from '{}'\
        credentials \
        'aws_access_key_id={};aws_secret_access_key={}' \
        DELIMITER '|' ACCEPTINVCHARS EMPTYASNULL ESCAPE COMPUPDATE OFF;commit;"""
\
        .format(schema, table, file_path, aws_access_key_id, aws_secret_access_key)

    try:
        con = psycopg2.connect(conn_string)
        print("Connection Successful!")
    except:
        print("Unable to connect to Redshift")

    cur = con.cursor()
    try:
        cur.execute(sql)
        print("Copy Command executed successfully")
    except:
        print("Failed to execute copy command")
    con.close()

if __name__ == "__main__":
    main()

Example 2: Unload data from Redshift into S3

In this example, the data is unloaded as gzip format with manifest file. This is the recommended file format for unloading according to AWS. Unloading also has many options and you can create a different file formats according to your requirements. For further information, you can start from here.

schema_name = sys.argv[2]
dbname = sys.argv[3]
port = sys.argv[4]
user = sys.argv[5]
password = sys.argv[6]
host_url = sys.argv[7]
s3_bucket_name = sys.argv[8]
aws_access_key_id = sys.argv[9]
aws_secret_access_key = sys.argv[10]

def main():
    '''This method will unload redshift table into S3'''
    conn_string = "dbname='{}' port='{}' user='{}' password='{}' host='{}'"\
        .format(dbname,port,user,password,host_url)  
    sql="""UNLOAD ('select * from %s.%s') TO 's3://%s/%s/%s.csv' \
        credentials 'aws_access_key_id=%s;aws_secret_access_key=%s' \
        MANIFEST GZIP ALLOWOVERWRITE;Commit;"""
\
        % (schema,table,s3_bucket_nameschema,table,aws_access_key_id,\
        aws_secret_access_key)

    con = psycopg2.connect(conn_string)
    cur = con.cursor()
    cur.execute(sql)

if __name__ == "__main__":
    main()

Example 3: Upload files into S3 with Boto3

You need to have AWS CLI configured to make this code work. Whatever the credentials you configure is the environment for the file to be uploaded.

import boto3
import os

def upload_files(local_path, s3_path, bucket_name):
    '''Search all the files from specified directory and push to S3'''
    s3 = boto3.resource('s3')

    for (root, dirs, files) in os.walk(local_path):
        for filename in files:
            print("File: {}".format(filename))
            s3_filename = s3_path + filename
            print('Uploading to %s...' % (s3_filename))
            s3.meta.client.upload_file(local_path + filename, bucket_name, s3_filename)
            print('Done')

upload_files(<local path e.g. /tmp/data/>, <S3_path e.g. /test/>,\
 <bucket name e.g. datalake.bucket.data>)

Example 4: Upload files into S3 by calling AWS command in Python

All files in the specified local directory will be recursively copied to S3 by using aws cli.

import os
local_dir = <local_dir>
s3_dir = <s3_bucket_name+folder_dir>
os.system('aws s3 cp {} {} --recur'.format(local_dir, s3_dir))

If you are interested in connecting to S3 and downloading files, check out this post: Comprehensive Guide to Download Files From S3 with Python

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Python

If you need to interact with a REST endpoint that takes a XML string as a payload and returns another XML string as a response, this is the quick guide if you want to use Python. If you want to do it with Node.js, you can check out the post …

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Node.js

Here is the quick Node.js example of interacting with a rest API endpoint that takes XML string as a payload and return with XML string as response. Once we get the response, we will convert it to a JSON object. For this example, we will use the old-school QAS (Quick …

Data Engineering
Downloading All Public GitHub Gist Files

I used to use plug-ins to render code blocks for this blog. Yesterday, I decided to move all the code into GitHub Gist and inject them from there. Using a WordPress plugin to render code blocks can be problematic when update happens. Plugins might not be up to date. It …