Source code for datconv.readers.dcijson_keys

# -*- coding: utf-8 -*-
"""This module implements Datconv Reader which reads data from JSON file."""

# Standard Python Libs
import logging
from collections import deque

# Libs installed using pip
from lxml import etree  # http://lxml.de/tutorial.html

# Datconv generic modules
from datconv.filters import WRITE, REPEAT, BREAK

####################################################################
Log = None
"""Log variable is automatically set by main datconv script using logging.getLogger method.
Use it for logging messages in need.
"""

[docs]class FilterBreak(Exception): """Exception class to support Reader.process break isued from Filter class.""" pass
[docs]class ToLimitBreak(Exception): """Exception class to support Reader.process break caused by reaching configured record limit.""" pass
####################################################################
[docs]class DCReader: """This Datconv JSON Reader class uses ijson sax-type parser to read and interpret JSON file. It assumes that input file contain array of json objects and data records are values of some key(s) inside those objects.\n Example (headkeys = [], reckeys = [], footkeys = []):\n Input: .. code-block:: json [ { "PadnDrawNbrs": { "cdc": 5019, "product": "addn" } }, { "SiteData": { "siteId": 38 }, "rec0Control": { "curDraw": 5 } } ] Output (:ref:`writers_dcxml`): .. code-block:: xml <Datconv> <PadnDrawNbrs> <cdc>5019</cdc> <product>addn</product> </PadnDrawNbrs> <SiteData> <siteId>38</siteId> </SiteData> <rec0Control> <curDraw>5</curDraw> </rec0Control> </Datconv> """ def __init__(self, headkeys = [], reckeys = [], footkeys = [], log_prog_step = 0, backend = None): """Parameters are usually passed from YAML file as subkeys of ``Reader:CArg`` key. :param headkeys: list of key names that will be passed to Writer as header. :param reckeys: list of key names that will be treated as records. If empty all highest level keys that are not heders or footers are passed to Writer as records. :param footkeys: list of key names that will be passed to Writer as footer. :param log_prog_step: log info message after this number of records or does not log progress messages if this key is 0 or logging level is set to value higher than INFO. :param backend: backend used by ijson package to parse json file, possible values:\n ``yajl2_cffi`` - requires ``yajl2`` C library and ``cffi`` Python package to be installed in the system;\n ``yajl2`` - requires ``yajl2`` C library to be installed in the system;\n None - uses default, Python only backend. For more detailed descriptions see :ref:`readers_conf_template`. """ assert Log is not None self._wri = self._flt = None self._hkeys = headkeys self._rkeys = reckeys self._fkeys = footkeys self._lp_step = log_prog_step self._backend = backend # OBLIGATORY def setWriter(self, writer): self._wri = writer # OBLIGATORY def setFilter(self, flt): self._flt = flt
[docs] def Process(self, inpath, outpath = None, rfrom = 1, rto = 0): """Parameters are usually passed from YAML file as subkeys of ``Reader:PArg`` key. :param inpath: Path to input file. :param outpath: Path to output file passed to Writer (fall-back if output connector is not defined). :param rfrom-rto: specifies scope of records to be processed. For more detailed descriptions see :ref:`readers_conf_template`. """ if self._backend == 'yajl2_cffi': import ijson.backends.yajl2_cffi as ijson elif self._backend == 'yajl2': import ijson.backends.yajl2 as ijson else: import ijson self._rfrom = rfrom self._rto = rto self._recno = 1 self._lp_rec = 0 if self._lp_step > 0 and Log.isEnabledFor(logging.INFO): self._lp_rec = self._lp_step # Common or this function local variables # 0 - outside element (e.g. between records) # 1 - inside header or footer # 2 - inside record while reading # 3 - inside record while skipping self._mode = 0 _curkey = None _deck = deque() # Used for header and footer self._header = [] self._curh = None self._dech = deque() self._h_written = False # Used for read records self._rectag = None self._curtag = None # Used for skipped records self._s_nestlev = 0 try: with open(inpath, 'rb') as fd: #binary mode required by C-based backends _not_first_event = False _parser = ijson.basic_parse(fd) for event, value in _parser: if event == 'map_key': _curkey = value elif event == 'start_map': _deck.append(_curkey) if _curkey is not None: self._OnObjStart(_curkey) elif event == 'end_map': _curkey = _deck.pop() if _curkey is not None: self._OnObjEnd(_curkey) elif event == 'start_array': if _not_first_event and _curkey is None: _deck.append('arr') self._OnObjStart('arr') elif event == 'end_array': if len(_deck) > 0: key = _deck.pop() if key == 'arr': self._OnObjEnd('arr') else: _deck.append(key) else: key = _curkey if _curkey is not None else event self._OnData(key, value) _not_first_event = True except FilterBreak: pass except ToLimitBreak: pass finally: if not self._h_written: if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(self._header) self._wri.writeHeader(self._header) self._h_written = True self._header = [] # OBLIGATORY if self._flt is not None: if hasattr(self._flt, 'setFooter'): self._flt.setFooter(self._header) self._wri.writeFooter(self._header)
def Iterate(self, inpath, outpath = None, rfrom = 1, rto = 0): if self._backend == 'yajl2_cffi': import ijson.backends.yajl2_cffi as ijson elif self._backend == 'yajl2': import ijson.backends.yajl2 as ijson else: import ijson self._rfrom = rfrom self._rto = rto self._recno = 1 self._lp_rec = 0 if self._lp_step > 0 and Log.isEnabledFor(logging.INFO): self._lp_rec = self._lp_step # Common or this function local variables # 0 - outside element (e.g. between records) # 1 - inside header or footer # 2 - inside record while reading # 3 - inside record while skipping self._mode = 0 _curkey = None _deck = deque() # Used for header and footer self._header = [] self._curh = None self._dech = deque() self._h_written = False # Used for read records self._rectag = None self._curtag = None # Used for skipped records self._s_nestlev = 0 try: with open(inpath, 'rb') as fd: #binary mode required by C-based backends _not_first_event = False _parser = ijson.basic_parse(fd) for event, value in _parser: if event == 'map_key': _curkey = value elif event == 'start_map': _deck.append(_curkey) if _curkey is not None: self._OnObjStart(_curkey) elif event == 'end_map': _curkey = _deck.pop() if _curkey is not None: for obj in self._OnObjEnd_Iter(_curkey): yield obj elif event == 'start_array': if _not_first_event and _curkey is None: _deck.append('arr') self._OnObjStart('arr') elif event == 'end_array': if len(_deck) > 0: key = _deck.pop() if key == 'arr': for obj in self._OnObjEnd_Iter('arr'): yield obj else: _deck.append(key) else: key = _curkey if _curkey is not None else event for obj in self._OnData_Iter(key, value): yield obj _not_first_event = True except FilterBreak: pass except ToLimitBreak: pass finally: if not self._h_written: if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(self._header) self._wri.writeHeader(self._header) self._h_written = True self._header = [] # OBLIGATORY if self._flt is not None: if hasattr(self._flt, 'setFooter'): self._flt.setFooter(self._header) self._wri.writeFooter(self._header) def _OnObjStart(self, key): if self._mode == 0: if not self._h_written and key in self._hkeys: self._curh = {} self._curh['_tag_'] = key self._mode = 1 elif key in self._fkeys: if not self._h_written: if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(self._header) self._wri.writeHeader(self._header) self._h_written = True self._header = [] self._curh = {} self._curh['_tag_'] = key self._mode = 1 elif key in self._rkeys or len(self._rkeys) == 0: if not self._h_written: if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(self._header) self._wri.writeHeader(self._header) self._h_written = True self._header = [] if self._recno < self._rfrom: self._mode = 3 return self._curtag = self._rectag = etree.Element(key) self._mode = 2 elif self._mode == 1: self._dech.append(self._curh) self._curh = {} elif self._mode == 2: assert self._curtag is not None self._curtag = etree.SubElement(self._curtag, key) elif self._mode == 3: self._s_nestlev = self._s_nestlev + 1 def _OnData(self, key, value): if self._mode == 0: if not self._h_written and key in self._hkeys: self._header.append({'_tag_': key, 'val': value}) elif key in self._fkeys: if not self._h_written: if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(self._header) self._wri.writeHeader(self._header) self._h_written = True self._header = [] self._header.append({'_tag_': key, 'val': value}) elif key in self._rkeys or len(self._rkeys) == 0: if not self._h_written: if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(self._header) self._wri.writeHeader(self._header) self._h_written = True self._header = [] self._recno = self._recno + 1 if self._recno <= self._rfrom: return rec = etree.Element(key) rec.text = str(value) if self._flt is not None: while True: # OBLIGATORY res = self._flt.filterRecord(rec) if res & WRITE: self._wri.writeRecord(rec) if res & REPEAT: continue if res & BREAK: Log.info('Filter caused Process to stop on record %d' % (self._recno - 1)) raise FilterBreak break else: # OBLIGATORY self._wri.writeRecord(rec) if self._rto > 0 and self._recno > self._rto: raise ToLimitBreak if self._recno - 1 == self._lp_rec: Log.info('Processed %d records' % (self._recno - 1)) self._lp_rec = self._lp_rec + self._lp_step elif self._mode == 1: self._curh[key] = value elif self._mode == 2: assert self._curtag is not None tag = etree.SubElement(self._curtag, key) tag.text = str(value) elif self._mode == 3: pass def _OnData(self, key, value): if self._mode == 0: if not self._h_written and key in self._hkeys: self._header.append({'_tag_': key, 'val': value}) elif key in self._fkeys: if not self._h_written: if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(self._header) self._wri.writeHeader(self._header) self._h_written = True self._header = [] self._header.append({'_tag_': key, 'val': value}) elif key in self._rkeys or len(self._rkeys) == 0: if not self._h_written: if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(self._header) self._wri.writeHeader(self._header) self._h_written = True self._header = [] self._recno = self._recno + 1 if self._recno <= self._rfrom: return rec = etree.Element(key) rec.text = str(value) if self._flt is not None: while True: # OBLIGATORY res = self._flt.filterRecord(rec) if res & WRITE: self._wri.writeRecord(rec) if res & REPEAT: continue if res & BREAK: Log.info('Filter caused Process to stop on record %d' % (self._recno - 1)) raise FilterBreak break else: # OBLIGATORY self._wri.writeRecord(rec) if self._rto > 0 and self._recno > self._rto: raise ToLimitBreak if self._recno - 1 == self._lp_rec: Log.info('Processed %d records' % (self._recno - 1)) self._lp_rec = self._lp_rec + self._lp_step def _OnData_Iter(self, key, value): if self._mode == 0: if not self._h_written and key in self._hkeys: self._header.append({'_tag_': key, 'val': value}) elif key in self._fkeys: if not self._h_written: if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(self._header) self._wri.writeHeader(self._header) self._h_written = True self._header = [] self._header.append({'_tag_': key, 'val': value}) elif key in self._rkeys or len(self._rkeys) == 0: if not self._h_written: if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(self._header) self._wri.writeHeader(self._header) self._h_written = True self._header = [] self._recno = self._recno + 1 if self._recno <= self._rfrom: return rec = etree.Element(key) rec.text = str(value) if self._flt is not None: while True: # OBLIGATORY res = self._flt.filterRecord(rec) if res & WRITE: yield self._wri.writeRecord(rec) if res & REPEAT: continue if res & BREAK: Log.info('Filter caused Process to stop on record %d' % (self._recno - 1)) raise FilterBreak break else: # OBLIGATORY yield self._wri.writeRecord(rec) if self._rto > 0 and self._recno > self._rto: raise ToLimitBreak if self._recno - 1 == self._lp_rec: Log.info('Processed %d records' % (self._recno - 1)) self._lp_rec = self._lp_rec + self._lp_step elif self._mode == 1: self._curh[key] = value elif self._mode == 2: assert self._curtag is not None tag = etree.SubElement(self._curtag, key) tag.text = str(value) elif self._mode == 3: pass elif self._mode == 1: self._curh[key] = value elif self._mode == 2: assert self._curtag is not None tag = etree.SubElement(self._curtag, key) tag.text = str(value) elif self._mode == 3: pass def _OnObjEnd(self, key): if self._mode == 0: pass elif self._mode == 1: if len(self._dech) > 0: h = self._curh self._curh = self._dech.pop() self._curh[key] = h else: self._header.append(self._curh) self._curh = None self._mode = 0 elif self._mode == 2: assert self._curtag is not None if self._curtag == self._rectag: self._recno = self._recno + 1 rec = self._curtag if self._flt is not None: while True: # OBLIGATORY res = self._flt.filterRecord(rec) if res & WRITE: self._wri.writeRecord(rec) if res & REPEAT: continue if res & BREAK: Log.info('Filter caused Process to stop on record %d' % (self._recno - 1)) raise FilterBreak break else: # OBLIGATORY self._wri.writeRecord(rec) if self._rto > 0 and self._recno > self._rto: raise ToLimitBreak if self._recno - 1 == self._lp_rec: Log.info('Processed %d records' % (self._recno - 1)) self._lp_rec = self._lp_rec + self._lp_step self._mode = 0 else: self._curtag = self._curtag.getparent() elif self._mode == 3: if self._s_nestlev > 0: self._s_nestlev = self._s_nestlev - 1 else: self._recno = self._recno + 1 self._mode = 0 def _OnObjEnd_Iter(self, key): if self._mode == 0: pass elif self._mode == 1: if len(self._dech) > 0: h = self._curh self._curh = self._dech.pop() self._curh[key] = h else: self._header.append(self._curh) self._curh = None self._mode = 0 elif self._mode == 2: assert self._curtag is not None if self._curtag == self._rectag: self._recno = self._recno + 1 rec = self._curtag if self._flt is not None: while True: # OBLIGATORY res = self._flt.filterRecord(rec) if res & WRITE: yield self._wri.writeRecord(rec) if res & REPEAT: continue if res & BREAK: Log.info('Filter caused Process to stop on record %d' % (self._recno - 1)) raise FilterBreak break else: # OBLIGATORY yield self._wri.writeRecord(rec) if self._rto > 0 and self._recno > self._rto: raise ToLimitBreak if self._recno - 1 == self._lp_rec: Log.info('Processed %d records' % (self._recno - 1)) self._lp_rec = self._lp_rec + self._lp_step self._mode = 0 else: self._curtag = self._curtag.getparent() elif self._mode == 3: if self._s_nestlev > 0: self._s_nestlev = self._s_nestlev - 1 else: self._recno = self._recno + 1 self._mode = 0