REST API Data Ingestion with Node.js

The classic REST API data ingestion pattern is (1) to make an API call to the endpoint, (2) get the data, (3) transform it to a structured table and (4) load it to a database. Let’s have a go at it with Node.js. We are using JSONPlaceholder which offers a few different REST API endpoints for testing or experimenting. Let’s ingest the photos dataset into Postgres database.

To make REST API calls to http endpoints, we can use the https module, which comes with Node.js installation and you do not need to install it.

We are using node-postgres for connecting to Postgres, pg-copy-streams for bulk loading data, json2csv for converting JSON to CSV.

Apart from making GET requests with https, all the data ingestion techniques in this blog have been covered in the previous posts. Check them out for further details.

OK, here comes the code.

The GET request is an asynchronous function. During the function execution, we keep appending the incoming data to the buffer variable. Once the ‘end’ event is emitted, we convert the JSON data into CSV and load it to Postgres sequentially. This execution pattern works well.

// Import required module
const fs = require('fs')
const path = require('path')
const https = require('https')
const Json2csvParser = require('json2csv').Parser
const {Client} = require('pg')
const copyFrom = require('pg-copy-streams').from
const config = require('./config.json')

// File output path & api endpoint
const outputFile = path.join(__dirname, '/data/photos.csv')
const url = 'https://jsonplaceholder.typicode.com/photos' 

// Define the field
const fields = ['albumId', 'id', 'title', 'url', 'thumbnailUrl']

// target table
var table = 'usermanaged.photos'

// Getting connectin parameters from config.json
const host = config.host
const user = config.user
const pw = config.pw
const db = config.db
const port = config.port
const conString = `postgres://${user}:${pw}@${host}:${port}/${db}`

// Load data function
const executeQuery = (targetTable) => {
    console.log('Starting executeQuery function')
    // Connecting to Database
    const client = new Client({
        connectionString: conString,
    })
    client.connect()

    const execute = (target, callback) => {
        client.query(`Truncate ${target}`, (err) => {
                if (err) {
                client.end()
                callback(err)
                // return console.log(err.stack)
                } else {
                console.log(`Truncated ${target}`)
                callback(null, target)
                }
            })
    }
    execute(targetTable, (err) =>{
        if (err) return console.log(`Error in Truncate Table: ${err}`)
        var stream = client.query(copyFrom(`COPY ${targetTable} FROM STDIN CSV HEADER`))
        var fileStream = fs.createReadStream(outputFile)
        
        fileStream.on('error', (error) =>{
            console.log(`Error in creating read stream ${error}`)
            client.end()
        })
        stream.on('error', (error) => {
            console.log(`Error in creating stream ${error}`)
            client.end()
        })
        stream.on('end', () => {
            console.log(`Completed loading data into ${targetTable}`)
            client.end()
        })
        fileStream.pipe(stream);
    })
}

// Main Logic Execution
https.get(url, (res) => {
    res.setEncoding('utf8')
    let data = ''
    res.on('data', (chunk) => {
        data += chunk
    })
    res.on('end', () => {
        console.log('Starting Json to Csv Conversion...')
        try {
            // Converting Json response to CSV
            const parser = new Json2csvParser({fields})
            const csv = parser.parse(JSON.parse(data))
            // Create a csv file
            fs.writeFileSync(outputFile, csv)
            console.log(`Csv file has been created as ${outputFile}`)
            // Load data into PG table
            executeQuery(table)
        } catch (err) {
            console.error(err);
        }
    })
})
Data Engineering
Sending XML Payload and Converting XML Response to JSON with Python

If you need to interact with a REST endpoint that takes a XML string as a payload and returns another XML string as a response, this is the quick guide if you want to use Python. If you want to do it with Node.js, you can check out the post …

Data Engineering
Sending XML Payload and Converting XML Response to JSON with Node.js

Here is the quick Node.js example of interacting with a rest API endpoint that takes XML string as a payload and return with XML string as response. Once we get the response, we will convert it to a JSON object. For this example, we will use the old-school QAS (Quick …

Data Engineering
Downloading All Public GitHub Gist Files

I used to use plug-ins to render code blocks for this blog. Yesterday, I decided to move all the code into GitHub Gist and inject them from there. Using a WordPress plugin to render code blocks can be problematic when update happens. Plugins might not be up to date. It …