How To Get Twitter Data With Python
- By : Mydatahack
- Category : Data Engineering, Data Ingestion
- Tags: Python API Data Ingestion
In this post, we will discuss how to use Python to grab publicly available Twitter post data (from any user you specify) and convert it into a tabular format so that we can analyse the data through Excel or insert them into a relational database. Python has a package that is a wrapper around the Twitter API (python-twitter). The package is easy to use and works fine. In this post, I used the generic requests package to make API calls to the endpoint for timeline.
Twitter API
Twitter API is a REST-based API and use both OAuth1.0 and OAuth2.0 for authentication depending on the application you are making. We use OAuth1.0 here.
To obtain the API credentials, you need your Twitter account and go to the Twitter Apps page. Then, click ‘Create New App’ and enter Name, Description and Website. Callback URL can be left empty. You can see Consumer Key (API Key) and Consumer Secret (API Secret) once you create an app. You will need to generate access token and secret through the UI by clicking the button.
How the code works
It takes 9 arguments, including the API credentials, Twitter user (whom you want to get the data from), number of records to ingest, directory path (without file name) to create json response files, dir_path/filename for csv files. The program creates one timeline table for tweet and another table with the latest user information.
The code runs in both Python 2.7 and 3. The major challenge for backward compatibility is the way Python 2.7 handles Unicode (it is quite different from Python 3). Therefore, you need to uncomment or replace a few lines as indicated in the code.
Example call
<screen_name (e.g. CocaCola)> <record_no (e.g. 3000)> <json_path (e.g. /tmp/twitter/raw/)>
<timeline_file_path (e.g. /tmp/twitter/timeline.csv)> <user_file_path (e.g. /tmp/twitter/user.csv)>
Key Points
For the user_timeline data, 200 is the maximum number of the record you can retrieve at a time. To obtain more than 200 records, we need to use the max_id parameter to specify which record is the first for each iteration.
The possibly_sensitive field may be missing. Therefore, use the key error exception to assign no value in case it is missing.
Enjoy!
Python Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | import requests_oauthlib import requests import os import json import numpy as np import sys ''' # for Python 2.7 reload(sys) sys.setdefaultencoding('utf8') ''' class TwitterScraper: '''This class is used to scrape Twitter Timeline''' def __init__(self, api_key, api_secret, access_token, access_token_secret): self.api_key = api_key self.api_secret = api_secret self.access_token = access_token self.access_token_secret = access_token_secret @staticmethod def export_with_url(auth, url, json_path, json_file_name): '''Take url string and auth object and return json object & generate json files''' r = requests.get(url, auth=auth) if r.status_code == 200: print("Connection Successful. Status Code: {}".format(r.status_code)) # Convert to json object data = json.loads(r.text) # Write Json file f = open(json_path + json_file_name, "w") f.write(json.dumps(data, indent=4)) # Return json object return data else: print("Connection Unsuccessful. Status Code: {}".format(r.status_code)) quit() @staticmethod def get_max_id(json_data): '''Return max_id from twitter timeline data''' lst = [] for i in range(len(json_data)): id = json_data[i]["id"] lst.append(id) array = np.array(lst) max_id = min(array) print ("The oldest Id in the record set is: {}".format(str(max_id))) return max_id def get_data(self, screen_name, record_no, json_path): '''Return a list of json object depending on the no of records''' counter = 0 max_id = None data_list = [] auth = requests_oauthlib.OAuth1(self.api_key, self.api_secret,\ self.access_token, self.access_token_secret) if int(record_no) <= 200: url = 'https://api.twitter.com/1.1/statuses/\ user_timeline.json?screen_name={}&count={}'\ .format(screen_name, record_no) data = self.export_with_url(auth, url, json_path, "timeline.json") data_list.append(data) return data_list else: while(counter < int(record_no)): file_name = "timeline_{}.json".format(counter) if max_id is None: url = 'https://api.twitter.com/1.1/statuses/\ user_timeline.json?screen_name={}&count=200'.format(screen_name) data = self.export_with_url(auth, url, json_path, file_name) data_list.append(data) max_id = self.get_max_id(data) - 1 counter += 200 else: url = 'https://api.twitter.com/1.1/statuses/\ user_timeline.json?screen_name={}&count=200&max_id={}'\ .format(screen_name, max_id) data = self.export_with_url(auth, url, json_path, file_name) data_list.append(data) max_id = self.get_max_id(data) - 1 counter += 200 return data_list def create_timeline_table(self, data_list, file_path): '''Generate timeline csv file''' # timeline = open(file_path, "w") # for Python 2.7 timeline = open(file_path, 'w', encoding='utf-8') header = '"contributors","truncated","text","is_quote_status","in_reply_to_status_id",\ "id","favorite_count","source","retweeted","coordinates",\ "in_reply_to_screen_name","in_reply_to_user_id","retweet_count","id_str","favorited",\ "geo","in_reply_to_user_id_str","possibly_sensitive","lang","created_at",\ "in_reply_to_status_id_str","place"' timeline.write(header + '\n') for data in data_list: for i in range(len(data)): row_string = "" row = data[i] contributors = row["contributors"] or '' truncated = str(row["truncated"]) # for Python 2.7 # text = row["text"].encode('latin1', 'ignore').replace('\n', ' ') text = row["text"].replace('\n', ' ') is_quote_status = str(row["is_quote_status"]) in_reply_to_status_id = row["in_reply_to_status_id"] or '' id = str(row["id"]) favorite_count = str(row["favorite_count"]) source = row["source"].encode('latin1', 'ignore') or '' retweeted = str(row["retweeted"]) coordinates = str(row["coordinates"]) in_reply_to_screen_name = row["in_reply_to_screen_name"] or '' in_reply_to_user_id = str(row["in_reply_to_user_id"]) or '' retweet_count = str(row["retweet_count"]) id_str = row["id_str"] favorited = str(row["favorited"]) geo = row["geo"] or '' in_reply_to_user_id_str = row["in_reply_to_user_id_str"] or '' try: possibly_sensitive = str(row["possibly_sensitive"]) except KeyError: possibly_sensitive = '' lang = row["lang"] created_at = row["created_at"] in_reply_to_status_id_str = row["in_reply_to_status_id_str"] or '' place = row["place"] or '' row_string = '"{0}","{1}","{2}","{3}","{4}","{5}","{6}","{7}","{8}","{9}",\ {10},"{11}","{12}","{13}","{14}","{15}","{16}","{17}","{18}","{19}","{20}","{21}"\n'\ .format(contributors,truncated,text,is_quote_status,in_reply_to_status_id,\ id,favorite_count,source,retweeted,coordinates,in_reply_to_screen_name,\ in_reply_to_user_id,retweet_count,\ id_str,favorited,geo,in_reply_to_user_id_str,possibly_sensitive,\ lang,created_at,in_reply_to_status_id_str,place) timeline.write(row_string) timeline.close() print("Generated timeline.csv") def create_user_table(self, data_list, file_path): '''Create user table from user info in the first timeline''' data = data_list[0] # user_csv = open(file_path, "w") # for Python 2.7 user_csv = open(file_path, 'w', encoding='utf-8') user = data[0]["user"] user_string = '' follow_request_sent = str(user["follow_request_sent"]) has_extended_profile = str(user["has_extended_profile"]) profile_use_background_image = str(user["profile_use_background_image"]) default_profile_image = str(user["default_profile_image"]) id = str(user["id"]) profile_background_image_url_https = user["profile_background_image_url_https"] verified = str(user["verified"]) translator_type = user["translator_type"] or '' profile_text_color = user["profile_text_color"] profile_image_url_https = user["profile_image_url_https"] profile_sidebar_fill_color = user["profile_sidebar_fill_color"] followers_count = str(user["followers_count"]) profile_sidebar_border_color = user["profile_sidebar_border_color"] id_str = user["id_str"] profile_background_color = user["profile_background_color"] listed_count = str(user["listed_count"]) is_translation_enabled = str(user["is_translation_enabled"]) utc_offset = str(user["utc_offset"]) statuses_count = str(user["statuses_count"]) description = user["description"] friends_count = str(user["friends_count"]) location = user["location"] profile_link_color = user["profile_link_color"] profile_image_url = user["profile_image_url"] following = str(user["following"]) geo_enabled = str(user["geo_enabled"]) profile_banner_url = user["profile_banner_url"] profile_background_image_url = user["profile_background_image_url"] screen_name = user["screen_name"] lang = user["lang"] profile_background_tile = str(user["profile_background_tile"]) favourites_count = str(user["favourites_count"]) name = user["name"] notifications = str(user["notifications"]) url = user["url"] created_at = user["created_at"] contributors_enabled = str(user["contributors_enabled"]) time_zone = user["time_zone"] protected = str(user["protected"]) default_profile = str(user["default_profile"]) is_translator = str(user["is_translator"]) user_string = '"{0}","{1}","{2}","{3}","{4}","{5}","{6}","{7}","{8}","{9}","{10}",\ "{11}","{12}","{13}","{14}","{15}","{16}","{17}","{18}","{19}","{20}",\ "{21}","{22}","{23}","{24}","{25}","{26}","{27}","{28}","{29}","{30}",\ "{31}","{32}","{33}","{34}","{35}","{36}","{37}","{38}","{39}","{40}"\n'\ .format(follow_request_sent,has_extended_profile,profile_use_background_image,default_profile_image,\ id,profile_background_image_url_https,verified,translator_type,profile_text_color,profile_image_url_https,\ profile_sidebar_fill_color,followers_count,profile_sidebar_border_color,id_str,profile_background_color,\ listed_count,is_translation_enabled,utc_offset,statuses_count,description,friends_count,location,\ profile_link_color,profile_image_url,following,geo_enabled,profile_banner_url,profile_background_image_url,\ screen_name,lang,profile_background_tile,favourites_count,name,notifications,url,created_at,\ contributors_enabled,time_zone,protected,default_profile,is_translator) header = '"follow_request_sent","has_extended_profile","profile_use_background_image",\ "default_profile_image","id","profile_background_image_url_https","verified","translator_type",\ "profile_text_color","profile_image_url_https","profile_sidebar_fill_color","followers_count",\ "profile_sidebar_border_color","id_str","profile_background_color","listed_count",\ "is_translation_enabled","utc_offset","statuses_count","description","friends_count","location",\ "profile_link_color","profile_image_url","following","geo_enabled","profile_banner_url",\ "profile_background_image_url","screen_name","lang","profile_background_tile","favourites_count",\ "name","notifications","url","created_at","contributors_enabled","time_zone","protected",\ "default_profile","is_translator"' user_csv.write(header + '\n') # print user_string user_csv.write(user_string) user_csv.close() print ("Generated user.csv") if __name__ == "__main__": # Get variable from arg input api_key = sys.argv[1] api_secret = sys.argv[2] access_token = sys.argv[3] access_token_secret = sys.argv[4] screen_name = sys.argv[5] record_no = sys.argv[6] json_path = sys.argv[7] timeline_file_path = sys.argv[8] user_file_path = sys.argv[9] twitter = TwitterScraper(api_key, api_secret, access_token, access_token_secret) data_list = twitter.get_data(screen_name, record_no, json_path) twitter.create_timeline_table(data_list, timeline_file_path) twitter.create_user_table(data_list, user_file_path) |