Source code for datconv.outconn.postgresql.jinsert

# -*- coding: utf-8 -*-
"""This module implements Datconv Output Connector which directly inserts data to PostreSQL database.
This connector should be used with Writer: :ref:`writers_dcjson`.
It requires Python package ``psycopg2`` to be installed.
"""

# Standard Python Libs

# Libs installed using pip
import psycopg2

# Datconv packages
from .. import OBJECT
from . import *


Log = None
"""Log varaible is automatically set by main datconv script using logging.getLogger method.
Use it for logging messages in need.
"""
[docs]class DCConnector: """Please see constructor description for more details.""" def __init__(self, connstring, table, schema = 'public', \ dump_sql = False, \ autocommit = False, \ check_keywords = True, lowercase = 0, cast = None, \ on_conflict = None, options = []): """Parameters are usually passed from YAML file as subkeys of OutConnector:CArg key. :param connstring: connection string to database. :param table: table name name where to insert records. :param schema: table schema name where to insert records. :param dump_sql: if true, insert statements are being saved to file specified as ``connstring`` and not inserted to database (option to be used for debugging). :param autocommit: parameter passed to connection, if true every insert is automatically commited (slows down insert operations radically), if false chenges are commited at the end - i.e. if any insert fail everything is rolled back and no records are added. :param check_keywords: if true, prevents conflicts with SQL keywords. :param lowercase: if >1, all JSON keys will be converted to lower-case; if =1, only first level keys; if =0, no conversion happen. :param cast: array of arrays of the form: [['rec', 'value'], str], what means that record: {"rec": {"value": 5025}} will be writen as {"rec": {"value": "5025"}} - i.e. it is ensured that "value" will allways be string. First position determines address of data to be converted, last position specifies the type: str, bool, short, int, long, float or array. Where short stands for 16 bit integer, int - 32 bit integer and long - 64 bit integer. Field names shold be given after all other configured transformations (lowercase, no_underscore, check_keywords). :param on_conflict: specify what to do when record with given primary key exist in the table; one of strings 'ignore', 'update' or None (raise error in such situation). :param options: array or additional options added to INSERT caluse (see Posgresql documentation). For more detailed descriptions see :ref:`conf_template.yaml <outconn_postgresql_conf_template>` file in this module folder. """ assert Log is not None import datconv.outconn.postgresql datconv.outconn.postgresql.PckLog = Log self._connstring = connstring self._tablename = table self._tableschema = schema self._autocommit = autocommit self._dump = dump_sql if dump_sql: self._conn = open(connstring, "w") else: self._conn = psycopg2.connect(connstring) if autocommit: self._conn.autocommit = True self._cur = self._conn.cursor() self._chkkwd = check_keywords if check_keywords: if self._tablename.upper() in Keywords: self._tablename = self._tablename + '_' self._lowercase = lowercase if self._lowercase > 0: self._tablename = self._tablename.lower() self._PREFIX = 'INSERT INTO "%s"."%s"(' % (self._tableschema, self._tablename) self._cast = cast self._conflict_opt = 0 self._conflict_str = '' self._conflict_keys = None if on_conflict is not None: if on_conflict.lower() == 'ignore': self._conflict_opt = 1 self._conflict_str = '\nON CONFLICT DO NOTHING' elif on_conflict.lower() == 'update': if dump_sql: constr = '???' self._conflict_keys = [] else: tablename = self._tablename.split('.') if len(tablename) > 1: schemaname = '.'.join(tablename[:-1]) tablename = tablename[-1] else: schemaname = 'public' tablename = tablename[0] self._cur.execute('SELECT column_name FROM information_schema.key_column_usage WHERE table_name = %s AND table_schema = %s', (tablename, schemaname)) keys = self._cur.fetchall() if keys: self._conflict_keys = [c[0] for c in keys] constr = ','.join(self._conflict_keys) else: raise Exception('Table %s does not exists or has no primary key' % tablename) self._conflict_opt = 2 self._conflict_str = '\nON CONFLICT (%s) DO UPDATE SET' % constr self._options = '\n'.join(options) self._total_no = 0 self._inserted_no = 0 def supportedInterfases(self): return OBJECT def tryObject(self, obj): self._total_no = 0 self._inserted_no = 0 return isinstance(obj, dict) def pushObject(self, obj): self._total_no += 1 if self._lowercase > 0: lowerKeys(obj, self._lowercase > 1) if self._chkkwd: checkKeywords(obj) if self._cast: castValues(obj, self._cast) self._pushRecord(obj) def onFinish(self, bSuccess): if self._dump: if Log: if bSuccess: Log.info('%d records saved to %s' % (self._inserted_no, self._connstring)) else: Log.error('Program did not finished properly: output saved to %s may be inconsistent' % self._connstring) else: if bSuccess: self._conn.commit() if Log: Log.info('%d out of %d records inserted to table %s' % (self._inserted_no, self._total_no, self._tablename)) else: self._conn.rollback() if Log: if self._autocommit: Log.error('Program did not finished properly: only %d out of %d records were saved' % (self._inserted_no, self._total_no)) else: Log.error('Program did not finished properly: no records were saved') self._conn.close() def _pushRecord(self, obj): fields = '' params = '' values = [] conflict_str = self._conflict_str for k, v in obj.items(): fields += '"' + str(k) + '",' if self._dump: params += obj2str(v) + ',' else: params += '%s,' values.append(obj2db(v)) if self._conflict_opt == 2: if str(k) not in self._conflict_keys: conflict_str += ' "%s"=EXCLUDED."%s",' % (str(k), str(k)) if self._conflict_opt == 2: conflict_str = conflict_str[:-1] if self._dump: sql = self._PREFIX + fields[:-1] + \ ') VALUES (' + params[:-1] + ')' + \ conflict_str + \ self._options + ';\n' self._conn.write(sql) self._inserted_no += 1 else: sql = self._PREFIX + fields[:-1] + \ ') VALUES (' + params[:-1] + ')' + \ conflict_str + \ self._options try: self._cur.execute(sql, tuple(values)) except: Log.error('Failed query ' + sql) raise self._inserted_no += self._cur.rowcount