Converting CSV to JSON and Loading it to Postgres with Node.js

To convert csv to json, Node has an awesome module, csvtojson. It takes a json file and convert it to csv asynchronously. Once we convert csv to json, let’s load it to a Postgres table with jsonb data type. Postgres supports JSON data and you can query it (see the previous blog about ingesting json into Postgres here). We are going to use pg-copy-streams to bulk load the json file into Postgres (see Bulk Loading Postgres with Node.js).

Setup

Let’s create a project folder and install all the required module.

mkdir node-json-load
cd node-json-load
npm init
npm i pg -ES
npm i pg-copy-streams -ES
npm i csvtojson -ES

We also need to create a table in Postgres with the jsonb data type.

1
2
3
CREATE TABLE usermanaged.customer (
     DATA jsonb
);

Code

We are getting the connection details from config.json. See Bulk Loading Postgres with Node.js for further information on how to load data into Postgres.

The key is to call executeQuery() after calling writeFileSync(). This is because writeFileSync() is a synchronous function and the query gets executed only after completing the json file creation. The json file creating happens as a callback function. JSON conversion happens asynchronously. After it is completed with the ‘done’ event, we can start writing the converted results into a file.

// Import required modules
const csv = require('csvtojson')
const path = require('path')
const fs = require('fs')
const {Client} = require('pg')
const copyFrom = require('pg-copy-streams').from
const config = require('./config.json')

// Setting file path
const inputFile = path.join(__dirname, '/data/customer.csv')
const outputFile = path.join(__dirname, '/data/customer.json')

// target table
var table = 'usermanaged.customer'

// Getting connectin parameters from config.json
const host = config.host
const user = config.user
const pw = config.pw
const db = config.db
const port = config.port

const conString = `postgres://${user}:${pw}@${host}:${port}/${db}`

const executeQuery = (targetTable) => {
    console.log('Starting executeQuery function')
    // Connecting to Database
    const client = new Client({
        connectionString: conString,
    })
    client.connect()

    const execute = (target, callback) => {
        client.query(`Truncate ${target}`, (err) => {
                if (err) {
                client.end()
                callback(err)
                // return console.log(err.stack)
                } else {
                console.log(`Truncated ${target}`)
                callback(null, target)
                }
            })
    }
    execute(targetTable, (err) =>{
        if (err) return console.log(`Error in Truncate Table: ${err}`)
        var stream = client.query(copyFrom(`COPY ${targetTable} FROM STDIN`))
        var fileStream = fs.createReadStream(outputFile)
        
        fileStream.on('error', (error) =>{
            console.log(`Error in creating read stream ${error}`)
        })
        stream.on('error', (error) => {
            console.log(`Error in creating stream ${error}`)
        })
        stream.on('end', () => {
            console.log(`Completed loading data into ${targetTable}`)
            client.end()
        })
        fileStream.pipe(stream);
    })
}

const main = (inputPath, outputPath) => {
    console.log(`Converting ${inputPath} to JSON.`)
    const convCsv = (inputP, outputP, callback) => {
        buff = ''
        csv()
        .fromFile(inputP)
        .on('data', (data) => {
                buff += data.toString('utf8')
        })
        .on('done', (error) => {
            // if error happens, callback with error
            if (error) return callback(error)
            // if no error, callback with the converted data
            console.log('Finished conversion.')
            callback(null, buff)
        })
    }
    convCsv(inputPath, outputPath, (error, data) =>{
        // if error happens, gives error message and code stops here.
        if (error) return console.error(`Error in csv conversion: ${error}`)
        // if no error, write file
        fs.writeFileSync(outputPath, data)
        console.log(`File created as ${outputPath}`)
        executeQuery(table)
    })
}

main(inputFile, outputFile)

Fun!

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Python

If you need to interact with a REST endpoint that takes a XML string as a payload and returns another XML string as a response, this is the quick guide if you want to use Python. If you want to do it with Node.js, you can check out the post …

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Node.js

Here is the quick Node.js example of interacting with a rest API endpoint that takes XML string as a payload and return with XML string as response. Once we get the response, we will convert it to a JSON object. For this example, we will use the old-school QAS (Quick …

Data Engineering
Downloading All Public GitHub Gist Files

I used to use plug-ins to render code blocks for this blog. Yesterday, I decided to move all the code into GitHub Gist and inject them from there. Using a WordPress plugin to render code blocks can be problematic when update happens. Plugins might not be up to date. It …