How To Get Data From MongoDB With Python

How to get data from MongoDB with Python

MongoDB is one of the most popular no SQL databases used as a backend database for web and mobile applications. Data is stored in MongoDB as BSON, which looks like JSON files. Once you understand the way MongoDB stores data, all you need to do is to connect to the database, query the data and convert it into a structured file with the same way as we do with JSON files.

Here are two useful MongoDB tutorials for beginners: Learn Mongo DB from Tutorial Point and  Getting Started with MongoDB from the official website. The pymongo package makes it really easy for us to use Python with MongoDB. I recommend PyMongo documentation for getting stated. If you are having trouble with connection like authentication, Conncet to MongoDB via the Python Driver from the official website has a detailed examples and explanations that help you to troubleshoot your code.

In this post, I will use the official MongoDB example dataset and show you how to grab data from MongoDB, convert it into structured tables and load them to Postgres database.

  1. Setting up MongoDB

First of all, let’s install MongoDB, create the test database and user credential with the right privilege and upload an example data set, restaurants into the test.

Steps

(1) Donwload the community server from MongoDB Download Center and install it. The key point for Windows installation is to create a data directory to set up the environment.

mkdir c:\data\db

(2) Once the installation is completed, start the database. In Windows, I just use the mongod command to start the server.

mongod

(3) There are a few good user interface available out there. My favorite is Robo 3T. MongoDB Compass comes with MonogDB installation and is good enough for a starter. By default, you will have admin and local databases for Windows installation. Open your command line interface and bring up the mongo db command line interface client by typing mongo.

(4) You can use various commands to do administrative tasks and run queries from the command line interface. Let’s create a new user called admin with password as Password1.

db.createUser( { user: "admin",
                 pwd: "Password1",
                 customData: { employeeId: 12345 },
                 roles: [ { role: "clusterAdmin", db: "admin" },
                          { role: "readAnyDatabase", db: "admin" },
                          "readWrite"] },
               { w: "majority" , wtimeout: 5000 } )

(5) Create a new database called test. The ‘use’ command is used to switch to different databases. If no database exists, it creates the database. You can also use show dbs to check all the existing databases and show collections to check all the collections existing in the current database you are in.

use test
show dbs
show collections

(6) You can now use the command line interface with the newly created credential.

mongo --host localhost --port 27017 -u admin -p Password1

(7) Download the restaurant data as a json file and then upload it to the test database with the mongoimport command. The dataset can be downloaded here.

mongoimport --db test --collection restaurants --drop --file C:\Users\test\Downloads\primer-dataset.json

(8) Once it’s imported, try to query some data (query examples below). If it works, we are now ready to ingest the restaurant collection with Python.

db.restaurants.find().limit(10).pretty()
db.restaurants.find({"restaurant_id":"30191841"}).pretty()
  1. Quick Starter Guide to pymongo

With pymongo, we can connect to MongoDB and query the records. Here is the quick code example for connecting to MongoDB, querying the first 10 records and printing the result to the console in the pretty format.

import pymongo
from pymongo import MongoClient
import pprint

client = MongoClient('mongodb://admin:Password1@localhost:27017/test')
db = client.test
restaurants = db.restaurants
print('Total Record for the collection: ' + str(restaurants.count()))
for record in restaurants.find().limit(10):
     pprint.pprint(record)
  1. Data Model

Here is an example of a document (meaning a row record) from the restaurant collection (meaning table).

As you can see, the collection is denormalised as the grade fileds are nested. To convert it into a relational database table, it is better to normalise it. My suggestion is to create the main restaurant table and child grades table with id as the foreign key.

You can create table in Postgres as below. In the database, I set up a schema called mongodb.

CREATE TABLE mongodb.restaurants (
    id VARCHAR(100) PRIMARY KEY,
    address_building VARCHAR(100),
    address_coord1 VARCHAR(100),
    address_coord2 VARCHAR(100),
    address_street VARCHAR(255),
    address_zipcode VARCHAR(30),
    address_borough VARCHAR(50),
    cuisine VARCHAR(255),
    name VARCHAR(255),
    restaurant_id VARCHAR(100)
);

CREATE TABLE mongodb.grades (
    id VARCHAR(100) REFERENCES mongodb.restaurants (id),
    DATE TIMESTAMP,
    grade VARCHAR(100),
    score VARCHAR(100)
);
  1. Python Code

It’s time for coding!

(1) You need to import pymongo and psycopg2 packages.

1
2
3
import pymongo
from pymongo import MongoClient
import psycopg2

(2) Grab the query results and convert it into a list that contains a list of row record. Return the record list and the header list. You need to handle the index out of bound error for corrdinates.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def create_restaurants(results):
    '''
    Take the query outcome and convert to a list and return it.
    It also defines headers and return it.
    '''

    records = []
    for record in results:
        tmp = []
        headers = ['id','address_building','address_coord1','address_coord2',\
        'address_street','address_zipcode','address_borough','cuisine','name',\
        'restaurant_id']
        tmp.append(str(record['_id']).split("(")[0])
        address = record['address']
        tmp.append(address['building'])
        try:
            tmp.append(str(address['coord'][0]))
        except IndexError:
            tmp.append('NA')
        try:
            tmp.append(str(address['coord'][1]))
        except IndexError:
            tmp.append('NA')
        tmp.append(address['street'])
        tmp.append(address['zipcode'])
        tmp.append(record['borough'])
        tmp.append(record['cuisine'])
        tmp.append(record['name'])
        tmp.append(record['restaurant_id'])
        records.append(tmp)
    print('Preped restaurant table.')
    return records, headers

def create_grades(results):
    '''
    Take the query outcome and convert to a list and return it.
    It also defines headers and return it.
    '''

    records = []
    headers = ['id','date','grade','score']
    for record in results:
        for grade in record['grades']:
            tmp = []
            tmp.append(str(record['_id']).split("(")[0])
            tmp.append(str(grade['date']).split("(")[0].replace('T', ' ').replace('Z', ' '))
            tmp.append(grade['grade'])
            tmp.append(str(grade['score']))
            records.append(tmp)
    return records, headers

(3) Create a csv file for each table with double quotes. Double quotes are important because some records contain comma. encoding=’utf-8′ is for Python 3. For Python 2, you can set default encoding as below.

import sys
reload(sys)
sys.setdefaultencoding(“utf-8”)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def create_table(records, headers, file_path):
    "Take a list of records and headers and generate csv"
    f = open(file_path, 'w', encoding='utf-8')
    row_len = len(headers)
    f.write(format_list(headers, row_len, ',', '"'))
    for record in records:
        f.write(format_list(record, row_len, ',', '"'))
    f.close()
    print('CSV file successfully created: {}'.format(file_path))

def format_list(list, length, delimiter, quote):
    counter = 1
    string = ''
    for record in list:
        if counter == length:
            string += quote + record + quote + '\n'
        else:
            string += quote + record + quote + delimiter
        counter += 1
    return string

(4) Load csv file to postgres. I am using the copy_expert function because it has a better control. For example, using copy_from() will not ignore double quotes when you load data. You can check documentations here for further details. I have Postgres installed on my local computer. I created a database called mydatahack and credentials with username as mydatahack and password as Password1. The user was then added to the user group postgres which is the default credential comes with postgres when you install it.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def pg_load_table(file_path, table_name):
    try:
        conn = psycopg2.connect("dbname='mydatahack' user='mydatahack' host='localhost' password='Password1'")
        print("Connecting to Database")
        cur = conn.cursor()
        f = open(file_path, "r", encoding='utf-8')
        cur.execute("Truncate {} Cascade;".format(table_name))
        print("Truncated {}".format(table_name))
        cur.copy_expert("copy {} from STDIN CSV HEADER".format(table_name), f)
        cur.execute("commit;")
        print("Loaded data into {}".format(table_name))
        conn.close()
        print("DB connection closed.")
    except Exception as e:
        print("Error: {}".format(str(e)))

(5) Let’s put them all together and execute it.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def main():
    # (1) Connect to MongoDB instance
    client = MongoClient('mongodb://admin:Password1@localhost:27017/test')
    # (2) Choose database and create the db object
    db = client.test
    # (3) Choose the collection and create Collection Object
    restaurants = db.restaurants
    print('Total Record for the collection: ' + str(restaurants.count()))
    # (4) Retrieve all data from the restaurants collection
    results = restaurants.find()
    # (4-1) Transform data into a structured tables
    rest_tup = create_restaurants(results)
    rest_records = rest_tup[0]
    rest_headers = rest_tup[1]
    create_table(rest_records, rest_headers, './restaurants.csv')
    # (5) Retrive all data again for grades
    results = restaurants.find()
    # (5-1) Transform data for grades
    grades_tup = create_grades(results)
    grades_records = grades_tup[0]
    grades_headers = grades_tup[1]
    create_table(grades_records, grades_headers, './grades.csv')
    # (6) Load data into Postgres
    pg_load_table('./restaurants.csv', 'mongodb.restaurants')
    pg_load_table('./grades.csv', 'mongodb.grades')

if __name__ == "__main__":
    main()

Once the data is loaded into the postgres, you can use it to analyse the data. For example, calculating the average grade per restaurant and rank them with SQL will be fun.

Epic!

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Python

If you need to interact with a REST endpoint that takes a XML string as a payload and returns another XML string as a response, this is the quick guide if you want to use Python. If you want to do it with Node.js, you can check out the post …

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Node.js

Here is the quick Node.js example of interacting with a rest API endpoint that takes XML string as a payload and return with XML string as response. Once we get the response, we will convert it to a JSON object. For this example, we will use the old-school QAS (Quick …

Data Engineering
Downloading All Public GitHub Gist Files

I used to use plug-ins to render code blocks for this blog. Yesterday, I decided to move all the code into GitHub Gist and inject them from there. Using a WordPress plugin to render code blocks can be problematic when update happens. Plugins might not be up to date. It …