Converting JSON to CSV and Loading it to Postgres with Node.js

To convert JSON to CSV, I love using json2csv. It really does all the hard work of working the JSON structure out and converting it to a flat file. For nested JSON elements, you can simply specify them by the dot notation (like transaction.itemId). When it contains an array element, it automatically expands it and creates a denormalised table for you. Well, let’s see how it works.

Input Data

Here is the input data, transaction.json. I think it is a good representation of standard JSON file you are likely to ingest. It has nested elements in Payment and an array element in Transaction. It follows schemaless patterns with missing elements when they do not exist in the dataset.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
[
{
    "Id": 100,
    "Name": "John",
    "TransactionId": "tran1",
  "Transaction": [
    {
    "ItemId":"a100",
    "price": 200
    },
    {
    "ItemId":"a110",
    "price": 200  
    }
  ],
  "Subscriber": true,

  "Payment": {
    "Type": "Credit-Card",
    "Total": 400,
    "Success": true
  },
  "Note": "1st Complete Record"
},
{
    "Id": 101,
    "Name": "Tom",
    "TransactionId": "tran2",
  "Transaction": [
    {
    "ItemId":"a100",
    "price": 200
    },
    {
    "ItemId":"a110",
    "price": 200  
    }
  ],
  "Subscriber": true,

  "Payment": {
    "Type": "Debit-Card",
    "Total": 400,
    "Success": true
  },
  "Note":null
},
{
    "Id": 102,
    "Name": "Margaret",
    "TransactionId": "tran3",
  "Transaction": [
    {
    "ItemId":"a100",
    "price": 200
    },
    {
    "ItemId":"a110",
    "price": 200  
    }
  ],
  "Subscriber": true,

  "Payment": {
    "Type": "Credit-Card",
    "Total": 400,
    "Success": true
  }
},
{
    "Id": 103,
    "Name": "Dylan",
    "TransactionId": "tran4",
  "Transaction": [
    {
    "ItemId":"a100",
    "price": 200
    },
    {
    "ItemId":"a110",
    "price": 200  
    }
  ],
  "Subscriber": true,
  "Payment": null,
  "Note": "Payment is Null"
},
{
    "Id": 104,
    "Name": "Oliver",
    "TransactionId": "tran5",
  "Transaction": [
    {
    "ItemId":"a100",
    "price": 200
    },
    {
    "ItemId":"a110",
    "price": 200  
    }
  ],
  "Subscriber": true,

  "Payment": {
    "Total": 400,
    "Success": true
  },
  "Note": "Payment Type is missing"
},
{
    "Id": 105,
    "Name": "Sarah",
    "TransactionId": "tran6",
  "Transaction": [
    {
    "ItemId":"a100",
    "price": 200
    },
    {
    "ItemId":"a123",
    "price": 1000
    },
    {
      "ItemId":"a134",
      "price": 400
      }
  ],
  "Subscriber": true,
  "Note": "Payment is missing"
}
]

Setup

As usual, let’s create a project folder and install required modules.

mkdir node-csv-load
cd node-csv-load
npm init
npm i pg -ES
npm i pg-copy-streams -ES
npm i json2csv -ES

We also need to create a table that contains all the elements in the source file.

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE usermanaged.trans (
    Id VARCHAR(20) PRIMARY KEY
    ,Name VARCHAR(255)
    ,TransactionId VARCHAR(20)
    ,Transaction_ItemId VARCHAR(20)
    ,Transaction_price INTEGER
    ,Subscriber bool
    ,Payment_Type VARCHAR(100)
    ,Payment_Total INTEGER
    ,Payment_Success bool
    ,Note text
);

Code

For bulk loading data in to Postgres, see the post here. The key to convert JSON to CSV is to define the fields and which element to unwind. As Transaction has an array, we will unwind with it. This will automatically create a denormalised table.

There are a few different ways to convert json to csv as in the documentation. In this example, we are going to use readable stream and pipe the conversion function. This method is memory efficient especially when you have a large JSON file.

After json2csv emits the ‘end’ event, we will call the query execution function. This will ensure that the queries get executed after the csv file creation.

// Loading required modules
const fs = require('fs')
const path = require('path')
const Json2csvTransform = require('json2csv').Transform
const {Client} = require('pg')
const copyFrom = require('pg-copy-streams').from
const config = require('./config.json')

// Defining file path
const inputFile = path.join(__dirname, '/data/transaction.json')
const outputFile = path.join(__dirname, '/data/transaction.csv')

// Defining fields
const fields = ['Id', 'Name', 'TransactionId', 'Transaction.ItemId',
'Transaction.price', 'Subscriber', 'Payment.Type', 'Payment.Total',
 'Payment.Success', 'Note']

// target table
var table = 'usermanaged.trans'

// Getting connectin parameters from config.json
const host = config.host
const user = config.user
const pw = config.pw
const db = config.db
const port = config.port
const conString = `postgres://${user}:${pw}@${host}:${port}/${db}`

const executeQuery = (targetTable) => {
    console.log('Starting executeQuery function')
    // Connecting to Database
    const client = new Client({
        connectionString: conString,
    })
    client.connect()

    const execute = (target, callback) => {
        client.query(`Truncate ${target}`, (err) => {
                if (err) {
                client.end()
                callback(err)
                // return console.log(err.stack)
                } else {
                console.log(`Truncated ${target}`)
                callback(null, target)
                }
            })
    }
    execute(targetTable, (err) =>{
        if (err) return console.log(`Error in Truncate Table: ${err}`)
        var stream = client.query(copyFrom(`COPY ${targetTable} FROM STDIN CSV HEADER`))
        var fileStream = fs.createReadStream(outputFile)
        
        fileStream.on('error', (error) =>{
            console.log(`Error in creating read stream ${error}`)
        })
        stream.on('error', (error) => {
            console.log(`Error in creating stream ${error}`)
        })
        stream.on('end', () => {
            console.log(`Completed loading data into ${targetTable}`)
            client.end()
        })
        fileStream.pipe(stream);
    })
}

// Create a readable stream
const input = fs.createReadStream(inputFile, { encoding: 'utf8' });
const output = fs.createWriteStream(outputFile, { encoding: 'utf8' });
const json2csv = new Json2csvTransform({fields, unwind:'Transaction'});
 
const processor = input.pipe(json2csv).pipe(output);
 
// You can also listen for events on the conversion and see how the header or the lines are coming out.
json2csv
  .on('header', header => console.log(header))
  .on('line', line => console.log(line))
  .on('error', err => console.log(err))
  .on('end', () =>{
      console.log(`Writing csv file, ${outputFile}, has been completed`)
      // After csv file is created, execute the query to load the data
      executeQuery(table)
  }) 

Fun!

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Python

If you need to interact with a REST endpoint that takes a XML string as a payload and returns another XML string as a response, this is the quick guide if you want to use Python. If you want to do it with Node.js, you can check out the post …

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Node.js

Here is the quick Node.js example of interacting with a rest API endpoint that takes XML string as a payload and return with XML string as response. Once we get the response, we will convert it to a JSON object. For this example, we will use the old-school QAS (Quick …

Data Engineering
Downloading All Public GitHub Gist Files

I used to use plug-ins to render code blocks for this blog. Yesterday, I decided to move all the code into GitHub Gist and inject them from there. Using a WordPress plugin to render code blocks can be problematic when update happens. Plugins might not be up to date. It …