Bulk Loading Postgres with Node.js

The fastest way to bulk load data into Postgres is to call Copy, which is a SQL command to load data into a table from a flat file. To connect to Postgres with Node.js, we can use the node-postgres module (pg). To use the copy function, we can use the pg-copy-streams module, which enables you to execute the copy function from a file readable stream.

We will first check out how to load the table and then create the code that does truncate & load.

Setup

Let’s set it up by creating a project and installing required modules.

mkdir node-pg-test
cd node-pg-test
npm init
npm i pg -ES
npm i pg-copy-streams -ES

config.json

For the connection details, let’s use config.json and import the connection from the config file.

{
"host": "your host",
"user": "your username",
"pw": "your password",
"db": "your database name",
"port": "port, 5432 is default"
}

Import modules and define global variables

We import the required modules. Then, we define the input file path and table name. We can import the connection details from config.json file located in the same folder.

// Import required modules
const fs = require('fs')
const path = require('path')
const { Pool, Client} = require('pg')
const copyFrom = require('pg-copy-streams').from
const config = require('./config.json')

// inputfile & target table
var inputFile = path.join(__dirname, '/data/customer.csv')
var table = 'usermanaged.customers'

// Getting connectin parameters from config.json
const host = config.host
const user = config.user
const pw = config.pw
const db = config.db
const port = config.port
const conString = `postgres://${user}:${pw}@${host}:${port}/${db}`

Load Table

Let’s create the readable stream and pipe the copy function.

// Connecting to Database
const client = new Client({
    connectionString: conString,
  })
  client.connect()
  // Execute Copy Function
var stream = client.query(copyFrom(`COPY ${targetTable} FROM CSV HEADER STDIN`))
var fileStream = fs.createReadStream(inputFile)

fileStream.on('error', (error) =>{
    console.log(`Error in reading file: ${error}`)
})
stream.on('error', (error) => {
    console.log(`Error in copy command: ${error}`)
})
stream.on('end', () => {
    console.log(`Completed loading data into ${targetTable}`)
    client.end()
})
fileStream.pipe(stream);

Truncate Table

Before doing the classic truncate and load ingestion pattern, let’s review the code for truncating table.

// Connecting to Database
const client = new Client({
    connectionString: conString,
  })
client.connect()

// Execute Truncate Table
client.query(`Truncate ${targetTable}`, (err) => {
    if (err) {
      client.end()
      // return console.log(err.stack)
      return console.log(`Error in truncate table ${err}`)
      process.exit(1)
    } else {
      console.log(`Truncated ${targetTable}`)
    }
  })

Truncate & Load

Let’s put them all together to do truncate & load. Remember that the functions we are using are asynchronous. To ensure sequential execution of the code, we are using callback. With callback, we can make sure that the copy function only gets executed after the truncate statement was successfully executed.

// Connecting to Database
const client = new Client({
    connectionString: conString,
  })

client.connect()

const executeQuery = (targetTable) => {
    const execute = (target, callback) => {
        client.query(`Truncate ${target}`, (err) => {
                if (err) {
                client.end()
                callback(err)
                // return console.log(err.stack)
                } else {
                console.log(`Truncated ${target}`)
                callback(null, target)
                }
            })
    }
    execute(targetTable, (err) =>{
        if (err) return console.log(`Error in Truncate Table: ${err}`)
        var stream = client.query(copyFrom(`COPY ${targetTable} FROM STDIN`))
        var fileStream = fs.createReadStream(inputFile)
        
        fileStream.on('error', (error) =>{
            console.log(`Error in creating read stream ${error}`)
        })
        stream.on('error', (error) => {
            console.log(`Error in creating stream ${error}`)
        })
        stream.on('end', () => {
            console.log(`Completed loading data into ${targetTable}`)
            client.end()
        })
        fileStream.pipe(stream);
    })  
}
// Execute the function
executeQuery(table)
Data Engineering
Sending XML Payload and Converting XML Response to JSON with Python

If you need to interact with a REST endpoint that takes a XML string as a payload and returns another XML string as a response, this is the quick guide if you want to use Python. If you want to do it with Node.js, you can check out the post …

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Node.js

Here is the quick Node.js example of interacting with a rest API endpoint that takes XML string as a payload and return with XML string as response. Once we get the response, we will convert it to a JSON object. For this example, we will use the old-school QAS (Quick …

Data Engineering
Downloading All Public GitHub Gist Files

I used to use plug-ins to render code blocks for this blog. Yesterday, I decided to move all the code into GitHub Gist and inject them from there. Using a WordPress plugin to render code blocks can be problematic when update happens. Plugins might not be up to date. It …