Bulk Loading Postgres with Node.js
- By : Mydatahack
- Category : Data Engineering, Data Ingestion
- Tags: Bulk Load, Copy, Data Ingestion, Node.js, Postgres
The fastest way to bulk load data into Postgres is to call Copy, which is a SQL command to load data into a table from a flat file. To connect to Postgres with Node.js, we can use the node-postgres module (pg). To use the copy function, we can use the pg-copy-streams module, which enables you to execute the copy function from a file readable stream.
We will first check out how to load the table and then create the code that does truncate & load.
Setup
Let’s set it up by creating a project and installing required modules.
cd node-pg-test
npm init
npm i pg -ES
npm i pg-copy-streams -ES
config.json
For the connection details, let’s use config.json and import the connection from the config file.
"host": "your host",
"user": "your username",
"pw": "your password",
"db": "your database name",
"port": "port, 5432 is default"
}
Import modules and define global variables
We import the required modules. Then, we define the input file path and table name. We can import the connection details from config.json file located in the same folder.
// Import required modules const fs = require('fs') const path = require('path') const { Pool, Client} = require('pg') const copyFrom = require('pg-copy-streams').from const config = require('./config.json') // inputfile & target table var inputFile = path.join(__dirname, '/data/customer.csv') var table = 'usermanaged.customers' // Getting connectin parameters from config.json const host = config.host const user = config.user const pw = config.pw const db = config.db const port = config.port const conString = `postgres://${user}:${pw}@${host}:${port}/${db}`
Load Table
Let’s create the readable stream and pipe the copy function.
// Connecting to Database const client = new Client({ connectionString: conString, }) client.connect() // Execute Copy Function var stream = client.query(copyFrom(`COPY ${targetTable} FROM CSV HEADER STDIN`)) var fileStream = fs.createReadStream(inputFile) fileStream.on('error', (error) =>{ console.log(`Error in reading file: ${error}`) }) stream.on('error', (error) => { console.log(`Error in copy command: ${error}`) }) stream.on('end', () => { console.log(`Completed loading data into ${targetTable}`) client.end() }) fileStream.pipe(stream);
Truncate Table
Before doing the classic truncate and load ingestion pattern, let’s review the code for truncating table.
// Connecting to Database const client = new Client({ connectionString: conString, }) client.connect() // Execute Truncate Table client.query(`Truncate ${targetTable}`, (err) => { if (err) { client.end() // return console.log(err.stack) return console.log(`Error in truncate table ${err}`) process.exit(1) } else { console.log(`Truncated ${targetTable}`) } })
Truncate & Load
Let’s put them all together to do truncate & load. Remember that the functions we are using are asynchronous. To ensure sequential execution of the code, we are using callback. With callback, we can make sure that the copy function only gets executed after the truncate statement was successfully executed.
// Connecting to Database const client = new Client({ connectionString: conString, }) client.connect() const executeQuery = (targetTable) => { const execute = (target, callback) => { client.query(`Truncate ${target}`, (err) => { if (err) { client.end() callback(err) // return console.log(err.stack) } else { console.log(`Truncated ${target}`) callback(null, target) } }) } execute(targetTable, (err) =>{ if (err) return console.log(`Error in Truncate Table: ${err}`) var stream = client.query(copyFrom(`COPY ${targetTable} FROM STDIN`)) var fileStream = fs.createReadStream(inputFile) fileStream.on('error', (error) =>{ console.log(`Error in creating read stream ${error}`) }) stream.on('error', (error) => { console.log(`Error in creating stream ${error}`) }) stream.on('end', () => { console.log(`Completed loading data into ${targetTable}`) client.end() }) fileStream.pipe(stream); }) } // Execute the function executeQuery(table)