How To Get Data From MongoDB With Python
- By : Mydatahack
- Category : Data Engineering, Data Ingestion
- Tags: MogoDB, Postgres, Python
How to get data from MongoDB with Python
MongoDB is one of the most popular no SQL databases used as a backend database for web and mobile applications. Data is stored in MongoDB as BSON, which looks like JSON files. Once you understand the way MongoDB stores data, all you need to do is to connect to the database, query the data and convert it into a structured file with the same way as we do with JSON files.
Here are two useful MongoDB tutorials for beginners: Learn Mongo DB from Tutorial Point and Getting Started with MongoDB from the official website. The pymongo package makes it really easy for us to use Python with MongoDB. I recommend PyMongo documentation for getting stated. If you are having trouble with connection like authentication, Conncet to MongoDB via the Python Driver from the official website has a detailed examples and explanations that help you to troubleshoot your code.
In this post, I will use the official MongoDB example dataset and show you how to grab data from MongoDB, convert it into structured tables and load them to Postgres database.
- Setting up MongoDB
First of all, let’s install MongoDB, create the test database and user credential with the right privilege and upload an example data set, restaurants into the test.
Steps
(1) Donwload the community server from MongoDB Download Center and install it. The key point for Windows installation is to create a data directory to set up the environment.
(2) Once the installation is completed, start the database. In Windows, I just use the mongod command to start the server.
(3) There are a few good user interface available out there. My favorite is Robo 3T. MongoDB Compass comes with MonogDB installation and is good enough for a starter. By default, you will have admin and local databases for Windows installation. Open your command line interface and bring up the mongo db command line interface client by typing mongo.
(4) You can use various commands to do administrative tasks and run queries from the command line interface. Let’s create a new user called admin with password as Password1.
pwd: "Password1",
customData: { employeeId: 12345 },
roles: [ { role: "clusterAdmin", db: "admin" },
{ role: "readAnyDatabase", db: "admin" },
"readWrite"] },
{ w: "majority" , wtimeout: 5000 } )
(5) Create a new database called test. The ‘use’ command is used to switch to different databases. If no database exists, it creates the database. You can also use show dbs to check all the existing databases and show collections to check all the collections existing in the current database you are in.
show dbs
show collections
(6) You can now use the command line interface with the newly created credential.
(7) Download the restaurant data as a json file and then upload it to the test database with the mongoimport command. The dataset can be downloaded here.
(8) Once it’s imported, try to query some data (query examples below). If it works, we are now ready to ingest the restaurant collection with Python.
db.restaurants.find({"restaurant_id":"30191841"}).pretty()
- Quick Starter Guide to pymongo
With pymongo, we can connect to MongoDB and query the records. Here is the quick code example for connecting to MongoDB, querying the first 10 records and printing the result to the console in the pretty format.
from pymongo import MongoClient
import pprint
client = MongoClient('mongodb://admin:[email protected]:27017/test')
db = client.test
restaurants = db.restaurants
print('Total Record for the collection: ' + str(restaurants.count()))
for record in restaurants.find().limit(10):
pprint.pprint(record)
- Data Model
Here is an example of a document (meaning a row record) from the restaurant collection (meaning table).
As you can see, the collection is denormalised as the grade fileds are nested. To convert it into a relational database table, it is better to normalise it. My suggestion is to create the main restaurant table and child grades table with id as the foreign key.
You can create table in Postgres as below. In the database, I set up a schema called mongodb.
id VARCHAR(100) PRIMARY KEY,
address_building VARCHAR(100),
address_coord1 VARCHAR(100),
address_coord2 VARCHAR(100),
address_street VARCHAR(255),
address_zipcode VARCHAR(30),
address_borough VARCHAR(50),
cuisine VARCHAR(255),
name VARCHAR(255),
restaurant_id VARCHAR(100)
);
CREATE TABLE mongodb.grades (
id VARCHAR(100) REFERENCES mongodb.restaurants (id),
DATE TIMESTAMP,
grade VARCHAR(100),
score VARCHAR(100)
);
- Python Code
It’s time for coding!
(1) You need to import pymongo and psycopg2 packages.
1 2 3 | import pymongo from pymongo import MongoClient import psycopg2 |
(2) Grab the query results and convert it into a list that contains a list of row record. Return the record list and the header list. You need to handle the index out of bound error for corrdinates.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | def create_restaurants(results): ''' Take the query outcome and convert to a list and return it. It also defines headers and return it. ''' records = [] for record in results: tmp = [] headers = ['id','address_building','address_coord1','address_coord2',\ 'address_street','address_zipcode','address_borough','cuisine','name',\ 'restaurant_id'] tmp.append(str(record['_id']).split("(")[0]) address = record['address'] tmp.append(address['building']) try: tmp.append(str(address['coord'][0])) except IndexError: tmp.append('NA') try: tmp.append(str(address['coord'][1])) except IndexError: tmp.append('NA') tmp.append(address['street']) tmp.append(address['zipcode']) tmp.append(record['borough']) tmp.append(record['cuisine']) tmp.append(record['name']) tmp.append(record['restaurant_id']) records.append(tmp) print('Preped restaurant table.') return records, headers def create_grades(results): ''' Take the query outcome and convert to a list and return it. It also defines headers and return it. ''' records = [] headers = ['id','date','grade','score'] for record in results: for grade in record['grades']: tmp = [] tmp.append(str(record['_id']).split("(")[0]) tmp.append(str(grade['date']).split("(")[0].replace('T', ' ').replace('Z', ' ')) tmp.append(grade['grade']) tmp.append(str(grade['score'])) records.append(tmp) return records, headers |
(3) Create a csv file for each table with double quotes. Double quotes are important because some records contain comma. encoding=’utf-8′ is for Python 3. For Python 2, you can set default encoding as below.
import sys
reload(sys)
sys.setdefaultencoding(“utf-8”)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | def create_table(records, headers, file_path): "Take a list of records and headers and generate csv" f = open(file_path, 'w', encoding='utf-8') row_len = len(headers) f.write(format_list(headers, row_len, ',', '"')) for record in records: f.write(format_list(record, row_len, ',', '"')) f.close() print('CSV file successfully created: {}'.format(file_path)) def format_list(list, length, delimiter, quote): counter = 1 string = '' for record in list: if counter == length: string += quote + record + quote + '\n' else: string += quote + record + quote + delimiter counter += 1 return string |
(4) Load csv file to postgres. I am using the copy_expert function because it has a better control. For example, using copy_from() will not ignore double quotes when you load data. You can check documentations here for further details. I have Postgres installed on my local computer. I created a database called mydatahack and credentials with username as mydatahack and password as Password1. The user was then added to the user group postgres which is the default credential comes with postgres when you install it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | def pg_load_table(file_path, table_name): try: conn = psycopg2.connect("dbname='mydatahack' user='mydatahack' host='localhost' password='Password1'") print("Connecting to Database") cur = conn.cursor() f = open(file_path, "r", encoding='utf-8') cur.execute("Truncate {} Cascade;".format(table_name)) print("Truncated {}".format(table_name)) cur.copy_expert("copy {} from STDIN CSV HEADER".format(table_name), f) cur.execute("commit;") print("Loaded data into {}".format(table_name)) conn.close() print("DB connection closed.") except Exception as e: print("Error: {}".format(str(e))) |
(5) Let’s put them all together and execute it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | def main(): # (1) Connect to MongoDB instance client = MongoClient('mongodb://admin:[email protected]:27017/test') # (2) Choose database and create the db object db = client.test # (3) Choose the collection and create Collection Object restaurants = db.restaurants print('Total Record for the collection: ' + str(restaurants.count())) # (4) Retrieve all data from the restaurants collection results = restaurants.find() # (4-1) Transform data into a structured tables rest_tup = create_restaurants(results) rest_records = rest_tup[0] rest_headers = rest_tup[1] create_table(rest_records, rest_headers, './restaurants.csv') # (5) Retrive all data again for grades results = restaurants.find() # (5-1) Transform data for grades grades_tup = create_grades(results) grades_records = grades_tup[0] grades_headers = grades_tup[1] create_table(grades_records, grades_headers, './grades.csv') # (6) Load data into Postgres pg_load_table('./restaurants.csv', 'mongodb.restaurants') pg_load_table('./grades.csv', 'mongodb.grades') if __name__ == "__main__": main() |
Once the data is loaded into the postgres, you can use it to analyse the data. For example, calculating the average grade per restaurant and rank them with SQL will be fun.
Epic!