How To Get Survey Response Data From Qualtrics With Python

In the previous post, we had a look at Python code examples of basic data engineering with AWS infrastructure. By using Qualtrics API, I would like to present a coding example of API data ingestion into S3 and Redshift. This code can be scheduled hourly, daily or weekly in a server or AWS Data Pipeline.

Qualtrics is an online survey software which allows you to send surveys via email or SMS, receive responses and generate reports. The aim of the ingestion is to get the survey response data into Redshift.

API Reference

Qualtrics API is a simple REST-based API. Once you generate an API token, you are pretty much ready to go. They have comprehensive API documentation. In a nutshell, you can use the requests module to make a POST requests with the token in the header to get the data as a csv file. Further API references are here.

I found Qualtrics API was unreliable hard way. My code initially failed randomly because I didn’t have the for loop to keep the request repeating until it connects. In the code example, I set the maximum to 200 (see the bulk_export method). It usually works within 20 times.

Key Points

In this example, we are using truncate & load because the data comes in one csv file with all the responses. We cannot obtain data incrementally. But, this is ok. We can leverage the power of Redshift copy command from S3, which is extremely fast. Truncate & load should be fine unless you have massive volume or other business requirements. If you want to do an incremental load, you can pick the record in the exported file according to the last updated time for insertion, which can be done relatively easily.

The program exports response data in a csv format into a local directory, push it to a specified S3 bucket, and execute copy command after truncating the table. It can ingest responses from multiple surveys. The argument for survey project names has to be concatenated by ‘,’.

The get_project_id method will return a list of survey ids based on the survey project name, which in turn uses to get the survey-specific response data.

The format_colnames method takes care of formatting the column as the exported file has multiple rows for the column names. I am using the pandas package to do the data manipulation.

Note that you also need AWS Access Key Id & Secret Access Key for the copy command.

OK, here comes the code.

Enjoy!

Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import requests
import zipfile
import json
import urllib2
import sys
import os
import pandas as pd
import shutil
import psycopg2
import boto3
import time
'''
The character formatting below is for Python 2.7.
Get rid of them for Python 3 and use encoding ='utf-8' when you open file.abs
The rest should work for both versions.
'''

reload(sys)
sys.setdefaultencoding('utf8')

# List of Argument taken from the command line

project_names = sys.argv[1]

lst = []
for i in project_names.split(','):
    lst.append(i)

target_date = sys.argv[2]
bucket_name = sys.argv[3]
dbname = sys.argv[4]
host = sys.argv[5]
port = sys.argv[6]
user = sys.argv[7]
password = sys.argv[8]
aws_access_key_id = sys.argv[9]  
aws_secret_access_key = sys.argv[10]
target_schema = sys.argv[11]
base_url = sys.argv[12]
secret_token = sys.argv[13]


# (1) Getting a list of ID

def get_project_id(names):
    '''The function takes a list of project names and return a list of IDs
    '''

    report_dic = {}
    url = base_url + '/surveys/'
    header = {'X-API-TOKEN': secret_token}

    # (1) generating the request object
    req = urllib2.Request(url,None,header)

    # (2) Make request
    response = urllib2.urlopen(req)
    data = json.load(response)
    # (3) Find Id for each project
    for name in names:

        # This is necessary because project names sometimes contain 2 spaces instead of 1
        target_name = name.replace(" ", "").lower()

        # It is better to create a table name list separately
        table_key = name.replace(" ", "_").replace("-", "_").lower()

        # print target_name
        for i in data['result']['elements']:
            if i['name'].replace(" ", "").lower() == target_name:
                report_dic[table_key] = i['id']

    return report_dic

# (2) Get Metadata

def get_survey_metadata(report_dic, target_dir):
    '''Takes survey ID and create json file in a specified directory'''

    for k, v in report_dic.items():

        url = base_url + '/surveys/' + v
        header = {'X-API-TOKEN': secret_token}

        req = urllib2.Request(url,None,header)
        response = urllib2.urlopen(req)

        data = json.load(response)
        pretty = json.dumps(data, sort_keys=False, indent=4)
        file = open('./' + target_dir + '/' + k + '_meta.json', 'w')
        file.write(pretty)
        print('Metadata File for %s Generated!' % (k))

# (3) Exporting reports

fileFormat = "csv"
baseUrl = base_url + "/responseexports/"
headers = {"content-type": "application/json","x-api-token": secret_token}

def bulk_exports(report_dic):
    '''This function takes a list of ids and create data export'''

    if os.path.exists('./Exported'):
        shutil.rmtree('./Exported')

    for key, val in report_dic.items():
        # Step 1: Creating Data Export
        print(key, val)

        downloadRequestUrl = baseUrl
        downloadRequestPayload = '{"format":"' + fileFormat + '","surveyId":"' + val + '"}'
        downloadRequestResponse = requests.request("POST", downloadRequestUrl, \
        data=downloadRequestPayload, headers=headers)
        progressId = downloadRequestResponse.json()["result"]["id"]

        # Step 3: Downloading file
        requestDownloadUrl = baseUrl + progressId + '/file'
        requestDownload = requests.request("GET", requestDownloadUrl, headers=headers, stream=True)
        for i in range(0, 200):
            print(str(requestDownload))
            if str(requestDownload) == '<Response [200]>':
                # Step 4: Unziping file
                with open("RequestFile.zip", "wb") as f:
                    for chunk in requestDownload.iter_content(chunk_size=1024):
                        f.write(chunk)
                    f.close()
                zipfile.ZipFile("RequestFile.zip").extractall('Exported')
                print('Completed Export for {}'.format(key))
                os.remove("./RequestFile.zip")
                break
            else:
                time.sleep(10)
                requestDownload = requests.request("GET", requestDownloadUrl, headers=headers, stream=True)
   
    for filename in os.listdir("Exported"):
        print(filename)
        os.rename('./Exported/'+filename, './Exported/'+filename.replace(" ", "_").replace("-", "_").lower())
        # os.rename('./'+filename, './'+filename.replace(" ", "_").replace("-", "_").lower())
   
# (4) Create the folder before moving to S3

def create_dir(target_date):
    direc = "./" + target_date

    if not os.path.exists(direc):
        os.makedirs(direc)
        print('New directory %s has been created' % (target_date))
    else:
        shutil.rmtree(direc)
        os.makedirs(direc)
        print('New directory %s has been created' % (target_date))

# (5) Reformat csv file and put into the right local folder created in (4)

def format_colnames(output_dir):
    '''This function takes the file and rename its columns with the right format,
    and generate csv file with the right column names'''


    for filename in os.listdir("./Exported"):
        # (1) Read csv file
        df = pd.read_csv("./Exported/" + filename, skiprows=[0,1], low_memory=False)

        columns = df.columns
        new_cols = []

        # (2) Reformat the column names
        for name in columns:
            new_name = name.replace('{', '').replace('}', '').split(':')[1].replace('\'', '').\
            replace('-', '_').replace(' ', '')
            new_cols.append(new_name)
       
        # print new_cols
        df.columns = new_cols

        # (3) Create CSV file into the output directory
        df.to_csv('./' + output_dir + '/' + filename, doublequote=True, sep='|', index=False)
        print('Reformateed and moved %s' % (filename))

# (6) Uploading to S3

def upload_files(local_path, s3_path, bucket_name):
    '''Search all the files from specified directory and push to S3'''
    s3 = boto3.resource('s3')

    for (root, dirs, files) in os.walk(local_path):
        for filename in files:
            print("File: {}".format(filename))
            s3_filename = s3_path + filename
            print('Uploading to %s...' % (s3_filename))
            s3.meta.client.upload_file(local_path + filename, bucket_name, s3_filename)
            print('Done')

# (7) Truncate & Load to Redshift

def truncate_load_tables(report_dict):
    con = psycopg2.connect(dbname=dbname, host=host, port=port, user=user, password=password)
    print("Connection to Redshift Successful!")
    cur = con.cursor()
    for k, v in report_dict.items():
        target_table = target_schema + '.' + k
        file_name = 's3://' + bucket_name + '/Qualtrics/data_export/' + k + '.csv'

        sql = """
        Truncate %s;Commit;
        copy %s from '%s'  dateformat 'auto' credentials
        'aws_access_key_id=%s;aws_secret_access_key=%s' CSV QUOTE '"' DELIMITER '|'
        ACCEPTINVCHARS EMPTYASNULL COMPUPDATE OFF IGNOREHEADER 1;
        Commit;
        """
% (target_table, target_table, file_name, aws_access_key_id, aws_secret_access_key)

        print(sql)
        cur.execute(sql)
        print("Copy Command executed successfully for %s" % (target_table))
    con.close()

# Execution #

# (1) get the dictionary with report name and report id to export
reports = get_project_id(lst)

# (2) Create a directory for S3 transfer
create_dir(target_date)

# (3) Do Bulk Export
bulk_exports(reports)

# (4) Get metadata and prep for S3 transfer
get_survey_metadata(reports, target_date)

# (5) Transfer
format_colnames(target_date)

# (6) Move to S3
upload_files('./' + target_date + '/', 'Qualtrics/data_export/', bucket_name)

# (7) Load Table
truncate_load_tables(reports)
Data Engineering
Sending XML Payload and Converting XML Response to JSON with Python

If you need to interact with a REST endpoint that takes a XML string as a payload and returns another XML string as a response, this is the quick guide if you want to use Python. If you want to do it with Node.js, you can check out the post …

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Node.js

Here is the quick Node.js example of interacting with a rest API endpoint that takes XML string as a payload and return with XML string as response. Once we get the response, we will convert it to a JSON object. For this example, we will use the old-school QAS (Quick …

Data Engineering
Downloading All Public GitHub Gist Files

I used to use plug-ins to render code blocks for this blog. Yesterday, I decided to move all the code into GitHub Gist and inject them from there. Using a WordPress plugin to render code blocks can be problematic when update happens. Plugins might not be up to date. It …