How to Bulk Load Data with JDBC and Python
- By : Mydatahack
- Category : Data Engineering, Data Ingestion
- Tags: Bulk Load, JayDeBeApi, JDBC, Python
Let’s do data bulk load by using JDBC and Python. The aim of this post is pretty much the same as the previous one with ODBC. We are going to export a table into a csv file and import the exported file into a table by using JDBC drivers and Python. To interact with JDBC drivers, you need to install the JayDeBeApi module.
For some reason, I could not get JayBeDeApi to work with Python 2.7. When I install it, I kept getting the class not found error (such as Error: java.lang.RuntimeException: Class com.mysql.jdbc.Driver not found) although I had the correct class name and jar file path. When I switched to Python 3 with JayBeDeApi3 installation, the problem disappeared. So, code here only works on Python 3. For Python 3, you need to install JayBeDeApi3 as below.
Let’s first import the required modules. Although you installed as JayBeDeApi3, the name of the module in the code is jaydebeapi.
1 2 3 | import jaydebeapi as jdbc import pandas as pd import sys |
Exporting CSV file from Table
The function takes a select statement and connection parameters. You have to make sure to have the correct class name (case sensitive!) and the path to the JDBC jar file. The rest is pretty straight forward. We are using pandas function to convert the query results into a data frame and creating a csv file from it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | def table_to_csv(sql, file_path, jdbc_class,\ jdbc_path, url, user, pw): '''This function create a csv file from sql with the specified JDBC driver.''' try: conn = jdbc.connect(jdbc_class, [url, user, pw], jdbc_path) print('Connecting to {}'.format(url)) # Get data into pandas dataframe df = pd.read_sql(sql, conn) # Write to csv file df.to_csv(file_path, encoding='utf-8', header = True,\ doublequote = True, sep=',', index=False) print("CSV File has been created") conn.close() except Exception as e: print("Error: {}".format(str(e))) sys.exit(1) |
Here is the execution example with MySQL JDBC.
1 2 3 4 5 6 7 8 9 10 11 | # Create CSV file from MySQL Table sql = 'Select * From world.city' file_path = '/tmp/city.csv' mysql_class = 'com.mysql.jdbc.Driver' mysql_jdbc_path = '/tmp/mysql-connector-java-5.1.46.jar' mysql_url = 'jdbc:mysql://localhost:3306/world' mysql_user = 'username' mysql_pw = 'password' table_to_csv(sql, file_path, mysql_class, mysql_jdbc_path,\ mysql_url, mysql_user, mysql_pw) |
Importing CSV file to Table
The function takes a flat file upload statement and connection parameters. If you do not set autocommit, you need to execute the commit statement. Either way works fine.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | def load_csv(load_sql, jdbc_class, jdbc_path,\ url, user, pw): '''This function upload csv into a table with the specified JDBC driver.''' try: conn = jdbc.connect(jdbc_class, [url, user, pw], jdbc_path) print('Connecting to {}'.format(url)) # Create cursor and execute sql cur = conn.cursor() conn.jconn.setAutoCommit(True) cur.execute(load_sql) print('Successfully executed {}'.format(load_sql)) conn.close() except Exception as e: print("Error: {}".format(str(e))) sys.exit(1) |
Here are the execution examples with MySQL and Postgres. Each database has SQL syntax for this and you need to pass the statement to the function. MySQL uses the LOAD DATA INFILE command while Postgres uses the copy command.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | # (1) Load CSV file to MySQL Table load_sql = "LOAD DATA LOCAL INFILE '/tmp/city.csv' INTO TABLE usermanaged.city\ FIELDS TERMINATED BY ',' ENCLOSED BY '\"' IGNORE 1 LINES;" mysql_class = 'com.mysql.jdbc.Driver' mysql_jdbc_path = '/tmp/mysql-connector-java-5.1.46.jar' mysql_url = 'jdbc:mysql://localhost:3306/world' mysql_user = 'username' mysql_pw = 'password' load_csv(load_sql, mysql_class, mysql_jdbc_path,\ mysql_url, mysql_user, mysql_pw) # (2) Load CSV file to Postgres Table pg_class = 'org.postgresql.Driver' pg_jdbc_path = '/tmp/postgresql-42.2.2.jar' pg_url = 'jdbc:postgresql://localhost:5432/mydatahack' pg_user = 'username' pg_pw = 'password' load_sql = "COPY usermanaged.city FROM '/tmp/city.csv' CSV HEADER" load_csv(load_sql, pg_class, pg_jdbc_path,\ pg_url, pg_user, pg_pw) |