How To Get Twitter Data With Python

In this post, we will discuss how to use Python to grab publicly available Twitter post data (from any user you specify) and convert it into a tabular format so that we can analyse the data through Excel or insert them into a relational database. Python has a package that is a wrapper around the Twitter API (python-twitter). The package is easy to use and works fine. In this post, I used the generic requests package to make API calls to the endpoint for timeline.

Twitter API

Twitter API is a REST-based API and use both OAuth1.0 and OAuth2.0 for authentication depending on the application you are making. We use OAuth1.0 here.

To obtain the API credentials, you need your Twitter account and go to the Twitter Apps page. Then, click ‘Create New App’ and enter Name, Description and Website. Callback URL can be left empty. You can see Consumer Key (API Key) and Consumer Secret (API Secret) once you create an app. You will need to generate access token and secret through the UI by clicking the button.

How the code works

It takes 9 arguments, including the API credentials, Twitter user (whom you want to get the data from), number of records to ingest, directory path (without file name) to create json response files, dir_path/filename for csv files. The program creates one timeline table for tweet and another table with the latest user information.

The code runs in both Python 2.7 and 3. The major challenge for backward compatibility is the way Python 2.7 handles Unicode (it is quite different from Python 3). Therefore, you need to uncomment or replace a few lines as indicated in the code.

Example call

python twitter_timeline_scraper.py <api_key> <pi_secret> <access_token> <access_token_secret>
<screen_name (e.g. CocaCola)> <record_no (e.g. 3000)> <json_path (e.g. /tmp/twitter/raw/)>
<timeline_file_path (e.g. /tmp/twitter/timeline.csv)> <user_file_path (e.g. /tmp/twitter/user.csv)>

Key Points

For the user_timeline data, 200 is the maximum number of the record you can retrieve at a time. To obtain more than 200 records, we need to use the max_id parameter to specify which record is the first for each iteration.

The possibly_sensitive field may be missing. Therefore, use the key error exception to assign no value in case it is missing.

Enjoy!

Python Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
import requests_oauthlib
import requests
import os
import json
import numpy as np
import sys

'''
# for Python 2.7
reload(sys)
sys.setdefaultencoding('utf8')
'''


class TwitterScraper:
    '''This class is used to scrape Twitter Timeline'''

    def __init__(self, api_key, api_secret, access_token, access_token_secret):

        self.api_key = api_key
        self.api_secret = api_secret
        self.access_token = access_token
        self.access_token_secret = access_token_secret

    @staticmethod
    def export_with_url(auth, url, json_path, json_file_name):
        '''Take url string and auth object and return json object & generate json files'''
        r = requests.get(url, auth=auth)
        if r.status_code == 200:
            print("Connection Successful. Status Code: {}".format(r.status_code))

            # Convert to json object
            data = json.loads(r.text)
            # Write Json file
            f = open(json_path + json_file_name, "w")
            f.write(json.dumps(data, indent=4))
            # Return json object
            return data
        else:
            print("Connection Unsuccessful. Status Code: {}".format(r.status_code))
            quit()

    @staticmethod
    def get_max_id(json_data):
        '''Return max_id from twitter timeline data'''
        lst = []
        for i in range(len(json_data)):
            id = json_data[i]["id"]
            lst.append(id)
        array = np.array(lst)
        max_id = min(array)
        print ("The oldest Id in the record set is: {}".format(str(max_id)))
        return max_id

    def get_data(self, screen_name, record_no, json_path):
        '''Return a list of json object depending on the no of records'''
        counter = 0
        max_id = None
        data_list = []
        auth = requests_oauthlib.OAuth1(self.api_key, self.api_secret,\
        self.access_token, self.access_token_secret)

        if int(record_no) <= 200:
            url = 'https://api.twitter.com/1.1/statuses/\
            user_timeline.json?screen_name={}&count={}'
\
            .format(screen_name, record_no)
            data = self.export_with_url(auth, url, json_path, "timeline.json")
            data_list.append(data)
            return data_list

        else:
            while(counter < int(record_no)):
                file_name = "timeline_{}.json".format(counter)
                if max_id is None:
                    url = 'https://api.twitter.com/1.1/statuses/\
                    user_timeline.json?screen_name={}&count=200'
.format(screen_name)
                    data = self.export_with_url(auth, url, json_path, file_name)
                    data_list.append(data)
                    max_id = self.get_max_id(data) - 1
                    counter += 200
                   
                else:
                    url = 'https://api.twitter.com/1.1/statuses/\
                    user_timeline.json?screen_name={}&count=200&max_id={}'
\
                    .format(screen_name, max_id)
                    data = self.export_with_url(auth, url, json_path, file_name)
                    data_list.append(data)
                    max_id = self.get_max_id(data) - 1
                    counter += 200
            return data_list


    def create_timeline_table(self, data_list, file_path):
        '''Generate timeline csv file'''

        # timeline = open(file_path, "w") # for Python 2.7
        timeline = open(file_path, 'w', encoding='utf-8')

        header = '"contributors","truncated","text","is_quote_status","in_reply_to_status_id",\
        "id","favorite_count","source","retweeted","coordinates",\
        "in_reply_to_screen_name","in_reply_to_user_id","retweet_count","id_str","favorited",\
        "geo","in_reply_to_user_id_str","possibly_sensitive","lang","created_at",\
        "in_reply_to_status_id_str","place"'


        timeline.write(header + '\n')

        for data in data_list:
            for i in range(len(data)):
                row_string = ""
                row = data[i]
                contributors = row["contributors"] or ''
                truncated = str(row["truncated"])
                # for Python 2.7
                # text = row["text"].encode('latin1', 'ignore').replace('\n', ' ')
                text = row["text"].replace('\n', ' ')
                is_quote_status = str(row["is_quote_status"])
                in_reply_to_status_id = row["in_reply_to_status_id"] or ''
                id = str(row["id"])
                favorite_count = str(row["favorite_count"])
                source = row["source"].encode('latin1', 'ignore') or ''
                retweeted = str(row["retweeted"])
                coordinates = str(row["coordinates"])
                in_reply_to_screen_name = row["in_reply_to_screen_name"] or ''
                in_reply_to_user_id = str(row["in_reply_to_user_id"]) or ''
                retweet_count = str(row["retweet_count"])
                id_str = row["id_str"]
                favorited = str(row["favorited"])
                geo = row["geo"] or ''
                in_reply_to_user_id_str = row["in_reply_to_user_id_str"] or ''
                try:
                    possibly_sensitive = str(row["possibly_sensitive"])
                except KeyError:
                    possibly_sensitive = ''
                lang = row["lang"]
                created_at = row["created_at"]
                in_reply_to_status_id_str = row["in_reply_to_status_id_str"] or ''
                place = row["place"] or ''

                row_string = '"{0}","{1}","{2}","{3}","{4}","{5}","{6}","{7}","{8}","{9}",\
                {10},"{11}","{12}","{13}","{14}","{15}","{16}","{17}","{18}","{19}","{20}","{21}"\n'
\
                .format(contributors,truncated,text,is_quote_status,in_reply_to_status_id,\
                id,favorite_count,source,retweeted,coordinates,in_reply_to_screen_name,\
                in_reply_to_user_id,retweet_count,\
                id_str,favorited,geo,in_reply_to_user_id_str,possibly_sensitive,\
                lang,created_at,in_reply_to_status_id_str,place)
               
                timeline.write(row_string)

        timeline.close()
        print("Generated timeline.csv")

    def create_user_table(self, data_list, file_path):
        '''Create user table from user info in the first timeline'''
        data = data_list[0]
        # user_csv = open(file_path, "w") # for Python 2.7
        user_csv = open(file_path, 'w', encoding='utf-8')
        user = data[0]["user"]
        user_string = ''
        follow_request_sent = str(user["follow_request_sent"])
        has_extended_profile = str(user["has_extended_profile"])
        profile_use_background_image = str(user["profile_use_background_image"])
        default_profile_image = str(user["default_profile_image"])
        id = str(user["id"])
        profile_background_image_url_https = user["profile_background_image_url_https"]
        verified = str(user["verified"])
        translator_type = user["translator_type"] or ''
        profile_text_color = user["profile_text_color"]
        profile_image_url_https = user["profile_image_url_https"]
        profile_sidebar_fill_color = user["profile_sidebar_fill_color"]
        followers_count = str(user["followers_count"])
        profile_sidebar_border_color = user["profile_sidebar_border_color"]
        id_str = user["id_str"]
        profile_background_color = user["profile_background_color"]
        listed_count = str(user["listed_count"])
        is_translation_enabled = str(user["is_translation_enabled"])
        utc_offset = str(user["utc_offset"])
        statuses_count = str(user["statuses_count"])
        description = user["description"]
        friends_count = str(user["friends_count"])
        location = user["location"]
        profile_link_color = user["profile_link_color"]
        profile_image_url = user["profile_image_url"]
        following = str(user["following"])
        geo_enabled = str(user["geo_enabled"])
        profile_banner_url = user["profile_banner_url"]
        profile_background_image_url = user["profile_background_image_url"]
        screen_name = user["screen_name"]
        lang = user["lang"]
        profile_background_tile = str(user["profile_background_tile"])
        favourites_count = str(user["favourites_count"])
        name = user["name"]
        notifications = str(user["notifications"])
        url = user["url"]
        created_at = user["created_at"]
        contributors_enabled = str(user["contributors_enabled"])
        time_zone = user["time_zone"]
        protected = str(user["protected"])
        default_profile = str(user["default_profile"])
        is_translator = str(user["is_translator"])

        user_string = '"{0}","{1}","{2}","{3}","{4}","{5}","{6}","{7}","{8}","{9}","{10}",\
        "{11}","{12}","{13}","{14}","{15}","{16}","{17}","{18}","{19}","{20}",\
        "{21}","{22}","{23}","{24}","{25}","{26}","{27}","{28}","{29}","{30}",\
        "{31}","{32}","{33}","{34}","{35}","{36}","{37}","{38}","{39}","{40}"\n'
\
        .format(follow_request_sent,has_extended_profile,profile_use_background_image,default_profile_image,\
        id,profile_background_image_url_https,verified,translator_type,profile_text_color,profile_image_url_https,\
        profile_sidebar_fill_color,followers_count,profile_sidebar_border_color,id_str,profile_background_color,\
        listed_count,is_translation_enabled,utc_offset,statuses_count,description,friends_count,location,\
        profile_link_color,profile_image_url,following,geo_enabled,profile_banner_url,profile_background_image_url,\
        screen_name,lang,profile_background_tile,favourites_count,name,notifications,url,created_at,\
        contributors_enabled,time_zone,protected,default_profile,is_translator)

        header = '"follow_request_sent","has_extended_profile","profile_use_background_image",\
        "default_profile_image","id","profile_background_image_url_https","verified","translator_type",\
        "profile_text_color","profile_image_url_https","profile_sidebar_fill_color","followers_count",\
        "profile_sidebar_border_color","id_str","profile_background_color","listed_count",\
        "is_translation_enabled","utc_offset","statuses_count","description","friends_count","location",\
        "profile_link_color","profile_image_url","following","geo_enabled","profile_banner_url",\
        "profile_background_image_url","screen_name","lang","profile_background_tile","favourites_count",\
        "name","notifications","url","created_at","contributors_enabled","time_zone","protected",\
        "default_profile","is_translator"'

        user_csv.write(header + '\n')

        # print user_string
        user_csv.write(user_string)
        user_csv.close()
        print ("Generated user.csv")

if __name__ == "__main__":

    # Get variable from arg input
    api_key = sys.argv[1]
    api_secret = sys.argv[2]
    access_token = sys.argv[3]
    access_token_secret = sys.argv[4]
    screen_name = sys.argv[5]
    record_no = sys.argv[6]
    json_path = sys.argv[7]
    timeline_file_path = sys.argv[8]
    user_file_path = sys.argv[9]

    twitter = TwitterScraper(api_key, api_secret, access_token, access_token_secret)
    data_list = twitter.get_data(screen_name, record_no, json_path)
    twitter.create_timeline_table(data_list, timeline_file_path)
    twitter.create_user_table(data_list, user_file_path)
Data Engineering
Sending XML Payload and Converting XML Response to JSON with Python

If you need to interact with a REST endpoint that takes a XML string as a payload and returns another XML string as a response, this is the quick guide if you want to use Python. If you want to do it with Node.js, you can check out the post …

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Node.js

Here is the quick Node.js example of interacting with a rest API endpoint that takes XML string as a payload and return with XML string as response. Once we get the response, we will convert it to a JSON object. For this example, we will use the old-school QAS (Quick …

Data Engineering
Downloading All Public GitHub Gist Files

I used to use plug-ins to render code blocks for this blog. Yesterday, I decided to move all the code into GitHub Gist and inject them from there. Using a WordPress plugin to render code blocks can be problematic when update happens. Plugins might not be up to date. It …