Converting JSON to CSV and Loading it to Postgres with Node.js
- By : Mydatahack
- Category : Data Engineering, Data Ingestion
- Tags: Bulk Load, JSON to CSV, Node.js, Postgres
To convert JSON to CSV, I love using json2csv. It really does all the hard work of working the JSON structure out and converting it to a flat file. For nested JSON elements, you can simply specify them by the dot notation (like transaction.itemId). When it contains an array element, it automatically expands it and creates a denormalised table for you. Well, let’s see how it works.
Input Data
Here is the input data, transaction.json. I think it is a good representation of standard JSON file you are likely to ingest. It has nested elements in Payment and an array element in Transaction. It follows schemaless patterns with missing elements when they do not exist in the dataset.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | [ { "Id": 100, "Name": "John", "TransactionId": "tran1", "Transaction": [ { "ItemId":"a100", "price": 200 }, { "ItemId":"a110", "price": 200 } ], "Subscriber": true, "Payment": { "Type": "Credit-Card", "Total": 400, "Success": true }, "Note": "1st Complete Record" }, { "Id": 101, "Name": "Tom", "TransactionId": "tran2", "Transaction": [ { "ItemId":"a100", "price": 200 }, { "ItemId":"a110", "price": 200 } ], "Subscriber": true, "Payment": { "Type": "Debit-Card", "Total": 400, "Success": true }, "Note":null }, { "Id": 102, "Name": "Margaret", "TransactionId": "tran3", "Transaction": [ { "ItemId":"a100", "price": 200 }, { "ItemId":"a110", "price": 200 } ], "Subscriber": true, "Payment": { "Type": "Credit-Card", "Total": 400, "Success": true } }, { "Id": 103, "Name": "Dylan", "TransactionId": "tran4", "Transaction": [ { "ItemId":"a100", "price": 200 }, { "ItemId":"a110", "price": 200 } ], "Subscriber": true, "Payment": null, "Note": "Payment is Null" }, { "Id": 104, "Name": "Oliver", "TransactionId": "tran5", "Transaction": [ { "ItemId":"a100", "price": 200 }, { "ItemId":"a110", "price": 200 } ], "Subscriber": true, "Payment": { "Total": 400, "Success": true }, "Note": "Payment Type is missing" }, { "Id": 105, "Name": "Sarah", "TransactionId": "tran6", "Transaction": [ { "ItemId":"a100", "price": 200 }, { "ItemId":"a123", "price": 1000 }, { "ItemId":"a134", "price": 400 } ], "Subscriber": true, "Note": "Payment is missing" } ] |
Setup
As usual, let’s create a project folder and install required modules.
cd node-csv-load
npm init
npm i pg -ES
npm i pg-copy-streams -ES
npm i json2csv -ES
We also need to create a table that contains all the elements in the source file.
1 2 3 4 5 6 7 8 9 10 11 12 | CREATE TABLE usermanaged.trans ( Id VARCHAR(20) PRIMARY KEY ,Name VARCHAR(255) ,TransactionId VARCHAR(20) ,Transaction_ItemId VARCHAR(20) ,Transaction_price INTEGER ,Subscriber bool ,Payment_Type VARCHAR(100) ,Payment_Total INTEGER ,Payment_Success bool ,Note text ); |
Code
For bulk loading data in to Postgres, see the post here. The key to convert JSON to CSV is to define the fields and which element to unwind. As Transaction has an array, we will unwind with it. This will automatically create a denormalised table.
There are a few different ways to convert json to csv as in the documentation. In this example, we are going to use readable stream and pipe the conversion function. This method is memory efficient especially when you have a large JSON file.
After json2csv emits the ‘end’ event, we will call the query execution function. This will ensure that the queries get executed after the csv file creation.
// Loading required modules const fs = require('fs') const path = require('path') const Json2csvTransform = require('json2csv').Transform const {Client} = require('pg') const copyFrom = require('pg-copy-streams').from const config = require('./config.json') // Defining file path const inputFile = path.join(__dirname, '/data/transaction.json') const outputFile = path.join(__dirname, '/data/transaction.csv') // Defining fields const fields = ['Id', 'Name', 'TransactionId', 'Transaction.ItemId', 'Transaction.price', 'Subscriber', 'Payment.Type', 'Payment.Total', 'Payment.Success', 'Note'] // target table var table = 'usermanaged.trans' // Getting connectin parameters from config.json const host = config.host const user = config.user const pw = config.pw const db = config.db const port = config.port const conString = `postgres://${user}:${pw}@${host}:${port}/${db}` const executeQuery = (targetTable) => { console.log('Starting executeQuery function') // Connecting to Database const client = new Client({ connectionString: conString, }) client.connect() const execute = (target, callback) => { client.query(`Truncate ${target}`, (err) => { if (err) { client.end() callback(err) // return console.log(err.stack) } else { console.log(`Truncated ${target}`) callback(null, target) } }) } execute(targetTable, (err) =>{ if (err) return console.log(`Error in Truncate Table: ${err}`) var stream = client.query(copyFrom(`COPY ${targetTable} FROM STDIN CSV HEADER`)) var fileStream = fs.createReadStream(outputFile) fileStream.on('error', (error) =>{ console.log(`Error in creating read stream ${error}`) }) stream.on('error', (error) => { console.log(`Error in creating stream ${error}`) }) stream.on('end', () => { console.log(`Completed loading data into ${targetTable}`) client.end() }) fileStream.pipe(stream); }) } // Create a readable stream const input = fs.createReadStream(inputFile, { encoding: 'utf8' }); const output = fs.createWriteStream(outputFile, { encoding: 'utf8' }); const json2csv = new Json2csvTransform({fields, unwind:'Transaction'}); const processor = input.pipe(json2csv).pipe(output); // You can also listen for events on the conversion and see how the header or the lines are coming out. json2csv .on('header', header => console.log(header)) .on('line', line => console.log(line)) .on('error', err => console.log(err)) .on('end', () =>{ console.log(`Writing csv file, ${outputFile}, has been completed`) // After csv file is created, execute the query to load the data executeQuery(table) })
Fun!