Converting CSV to JSON and Loading it to Postgres with Node.js
- By : Mydatahack
- Category : Data Engineering, Data Ingestion
- Tags: Bulk Load, CSV to Json, Node.js, Postgres
To convert csv to json, Node has an awesome module, csvtojson. It takes a json file and convert it to csv asynchronously. Once we convert csv to json, let’s load it to a Postgres table with jsonb data type. Postgres supports JSON data and you can query it (see the previous blog about ingesting json into Postgres here). We are going to use pg-copy-streams to bulk load the json file into Postgres (see Bulk Loading Postgres with Node.js).
Setup
Let’s create a project folder and install all the required module.
cd node-json-load
npm init
npm i pg -ES
npm i pg-copy-streams -ES
npm i csvtojson -ES
We also need to create a table in Postgres with the jsonb data type.
1 2 3 | CREATE TABLE usermanaged.customer ( DATA jsonb ); |
Code
We are getting the connection details from config.json. See Bulk Loading Postgres with Node.js for further information on how to load data into Postgres.
The key is to call executeQuery() after calling writeFileSync(). This is because writeFileSync() is a synchronous function and the query gets executed only after completing the json file creation. The json file creating happens as a callback function. JSON conversion happens asynchronously. After it is completed with the ‘done’ event, we can start writing the converted results into a file.
// Import required modules const csv = require('csvtojson') const path = require('path') const fs = require('fs') const {Client} = require('pg') const copyFrom = require('pg-copy-streams').from const config = require('./config.json') // Setting file path const inputFile = path.join(__dirname, '/data/customer.csv') const outputFile = path.join(__dirname, '/data/customer.json') // target table var table = 'usermanaged.customer' // Getting connectin parameters from config.json const host = config.host const user = config.user const pw = config.pw const db = config.db const port = config.port const conString = `postgres://${user}:${pw}@${host}:${port}/${db}` const executeQuery = (targetTable) => { console.log('Starting executeQuery function') // Connecting to Database const client = new Client({ connectionString: conString, }) client.connect() const execute = (target, callback) => { client.query(`Truncate ${target}`, (err) => { if (err) { client.end() callback(err) // return console.log(err.stack) } else { console.log(`Truncated ${target}`) callback(null, target) } }) } execute(targetTable, (err) =>{ if (err) return console.log(`Error in Truncate Table: ${err}`) var stream = client.query(copyFrom(`COPY ${targetTable} FROM STDIN`)) var fileStream = fs.createReadStream(outputFile) fileStream.on('error', (error) =>{ console.log(`Error in creating read stream ${error}`) }) stream.on('error', (error) => { console.log(`Error in creating stream ${error}`) }) stream.on('end', () => { console.log(`Completed loading data into ${targetTable}`) client.end() }) fileStream.pipe(stream); }) } const main = (inputPath, outputPath) => { console.log(`Converting ${inputPath} to JSON.`) const convCsv = (inputP, outputP, callback) => { buff = '' csv() .fromFile(inputP) .on('data', (data) => { buff += data.toString('utf8') }) .on('done', (error) => { // if error happens, callback with error if (error) return callback(error) // if no error, callback with the converted data console.log('Finished conversion.') callback(null, buff) }) } convCsv(inputPath, outputPath, (error, data) =>{ // if error happens, gives error message and code stops here. if (error) return console.error(`Error in csv conversion: ${error}`) // if no error, write file fs.writeFileSync(outputPath, data) console.log(`File created as ${outputPath}`) executeQuery(table) }) } main(inputFile, outputFile)
Fun!