Source code for datconv.readers.dcijson

# -*- coding: utf-8 -*-
"""This module implements Datconv Reader which reads data from JSON file."""

# Standard Python Libs
import logging
from collections import deque

# Libs installed using pip
from lxml import etree  # http://lxml.de/tutorial.html

# Datconv generic modules
from datconv.filters import WRITE, REPEAT, BREAK

####################################################################
Log = None
"""Log variable is automatically set by main datconv script using logging.getLogger method.
Use it for logging messages in need.
"""

[docs]class FilterBreak(Exception): """Exception class to support Reader.process break isued from Filter class.""" pass
[docs]class ToLimitBreak(Exception): """Exception class to support Reader.process break caused by reaching configured record limit.""" pass
####################################################################
[docs]class DCReader: """This Datconv JSON Reader class uses ijson sax-type parser to read and interpret JSON file. It assumes that input file contain array of json objects and every such object is passed as record to Writer. This Reader passes always empty header and footer to Writer.\n Example:\n Input: .. code-block:: json [ { "PadnDrawNbrs": { "cdc": 5019, "product": "addn" } }, { "SiteData": { "siteId": 38 }, "rec0Control": { "curDraw": 5 } } ] Output (:ref:`writers_dcxml`): .. code-block:: xml <Datconv> <rec> <PadnDrawNbrs> <cdc>5019</cdc> <product>addn</product> </PadnDrawNbrs> </rec> <rec> <SiteData> <siteId>38</siteId> </SiteData> <rec0Control> <curDraw>5</curDraw> </rec0Control> </rec> </Datconv> """ def __init__(self, rec_tag = 'rec', log_prog_step = 0, backend = None): """Parameters are usually passed from YAML file as subkeys of ``Reader:CArg`` key. :param rec_tag: name or tag to be placed as record marker. :param log_prog_step: log info message after this number of records or does not log progress messages if this key is 0 or logging level is set to value higher than INFO. :param backend: backend used by ijson package to parse json file, possible values:\n ``yajl2_cffi`` - requires ``yajl2`` C library and ``cffi`` Python package to be installed in the system;\n ``yajl2`` - requires ``yajl2`` C library to be installed in the system;\n None - uses default, Python only backend. For more detailed descriptions see :ref:`readers_conf_template`. """ assert Log is not None self._wri = self._flt = None self._rec_tag = rec_tag self._lp_step = log_prog_step self._backend = backend # OBLIGATORY def setWriter(self, writer): self._wri = writer # OBLIGATORY def setFilter(self, flt): self._flt = flt
[docs] def Process(self, inpath, outpath = None, rfrom = 1, rto = 0): """Parameters are usually passed from YAML file as subkeys of ``Reader:PArg`` key. :param inpath: Path to input file. :param outpath: Path to output file passed to Writer (fall-back if output connector is not defined). :param rfrom-rto: specifies scope of records to be processed. For more detailed descriptions see :ref:`readers_conf_template`. """ if self._backend == 'yajl2_cffi': import ijson.backends.yajl2_cffi as ijson elif self._backend == 'yajl2': import ijson.backends.yajl2 as ijson else: import ijson self._rfrom = rfrom self._rto = rto self._recno = 1 self._lp_rec = 0 if self._lp_step > 0 and Log.isEnabledFor(logging.INFO): self._lp_rec = self._lp_step # Common or this function local variables # 0 - outside element (e.g. between records) # 2 - inside record while reading # 3 - inside record while skipping self._mode = 0 _curkey = self._rec_tag _deck = deque() # Used for read records self._rectag = None self._curtag = None # Used for skipped records self._s_nestlev = 0 try: # OBLIGATORY header = [] if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(header) self._wri.writeHeader(header) with open(inpath, 'rb') as fd: #binary mode required by C-based backends _not_first_event = False _parser = ijson.basic_parse(fd) for event, value in _parser: if event == 'map_key': _curkey = value elif event == 'start_map': _deck.append(_curkey) if _curkey is not None: self._OnObjStart(_curkey) elif event == 'end_map': _curkey = _deck.pop() if _curkey is not None: self._OnObjEnd(_curkey) elif event == 'start_array': if _not_first_event and _curkey is None: _deck.append('arr') self._OnObjStart('arr') elif event == 'end_array': if len(_deck) > 0: key = _deck.pop() if key == 'arr': self._OnObjEnd('arr') else: _deck.append(key) else: key = _curkey if _curkey is not None else event self._OnData(key, value) _not_first_event = True except FilterBreak: pass except ToLimitBreak: pass finally: # OBLIGATORY footer = [] if self._flt is not None: if hasattr(self._flt, 'setFooter'): self._flt.setFooter(footer) self._wri.writeFooter(footer)
## Note: ## I did not found (unfortunately) any other way to implement iteration then code duplication ## Expressions like ## if <iteration>: ## yield obj ## does not work because Python compiler treats every method that contain ## yield key-word in special way, and if condition (even if disables actual instruction ## execusion) does not change this. ## Also caling yield indirectly (from called sub-methon) does not work. ## yield must be present diretly in callable passed to iteration loop. def Iterate(self, inpath, outpath = None, rfrom = 1, rto = 0): if self._backend == 'yajl2_cffi': import ijson.backends.yajl2_cffi as ijson elif self._backend == 'yajl2': import ijson.backends.yajl2 as ijson else: import ijson self._rfrom = rfrom self._rto = rto self._recno = 1 self._lp_rec = 0 if self._lp_step > 0 and Log.isEnabledFor(logging.INFO): self._lp_rec = self._lp_step # Common or this function local variables # 0 - outside element (e.g. between records) # 2 - inside record while reading # 3 - inside record while skipping self._mode = 0 _curkey = self._rec_tag _deck = deque() # Used for read records self._rectag = None self._curtag = None # Used for skipped records self._s_nestlev = 0 try: # OBLIGATORY header = [] if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(header) self._wri.writeHeader(header) with open(inpath, 'rb') as fd: #binary mode required by C-based backends _not_first_event = False _parser = ijson.basic_parse(fd) for event, value in _parser: if event == 'map_key': _curkey = value elif event == 'start_map': _deck.append(_curkey) if _curkey is not None: self._OnObjStart(_curkey) elif event == 'end_map': _curkey = _deck.pop() if _curkey is not None: for obj in self._OnObjEnd_Iter(_curkey): yield obj elif event == 'start_array': if _not_first_event and _curkey is None: _deck.append('arr') self._OnObjStart('arr') elif event == 'end_array': if len(_deck) > 0: key = _deck.pop() if key == 'arr': for obj in self._OnObjEnd_Iter('arr'): yield obj else: _deck.append(key) else: key = _curkey if _curkey is not None else event for obj in self._OnData_Iter(key, value): yield obj _not_first_event = True except FilterBreak: pass except ToLimitBreak: pass finally: # OBLIGATORY footer = [] if self._flt is not None: if hasattr(self._flt, 'setFooter'): self._flt.setFooter(footer) self._wri.writeFooter(footer) def _OnObjStart(self, key): if self._mode == 0: if self._recno < self._rfrom: self._mode = 3 return self._curtag = self._rectag = etree.Element(key) self._mode = 2 elif self._mode == 2: assert self._curtag is not None self._curtag = etree.SubElement(self._curtag, key) elif self._mode == 3: self._s_nestlev = self._s_nestlev + 1 def _OnData(self, key, value): if self._mode == 0: self._recno = self._recno + 1 if self._recno <= self._rfrom: return rec = etree.Element(key) rec.text = str(value) if self._flt is not None: while True: # OBLIGATORY res = self._flt.filterRecord(rec) if res & WRITE: self._wri.writeRecord(rec) if res & REPEAT: continue if res & BREAK: Log.info('Filter caused Process to stop on record %d' % (self._recno - 1)) raise FilterBreak break else: # OBLIGATORY self._wri.writeRecord(rec) if self._rto > 0 and self._recno > self._rto: raise ToLimitBreak if self._recno - 1 == self._lp_rec: Log.info('Processed %d records' % (self._recno - 1)) self._lp_rec = self._lp_rec + self._lp_step elif self._mode == 2: assert self._curtag is not None tag = etree.SubElement(self._curtag, key) tag.text = str(value) elif self._mode == 3: pass def _OnData_Iter(self, key, value): if self._mode == 0: self._recno = self._recno + 1 if self._recno <= self._rfrom: return rec = etree.Element(key) rec.text = str(value) if self._flt is not None: while True: # OBLIGATORY res = self._flt.filterRecord(rec) if res & WRITE: yield self._wri.writeRecord(rec) if res & REPEAT: continue if res & BREAK: Log.info('Filter caused Process to stop on record %d' % (self._recno - 1)) raise FilterBreak break else: # OBLIGATORY yield self._wri.writeRecord(rec) if self._rto > 0 and self._recno > self._rto: raise ToLimitBreak if self._recno - 1 == self._lp_rec: Log.info('Processed %d records' % (self._recno - 1)) self._lp_rec = self._lp_rec + self._lp_step elif self._mode == 2: assert self._curtag is not None tag = etree.SubElement(self._curtag, key) tag.text = str(value) elif self._mode == 3: pass def _OnObjEnd(self, key): if self._mode == 0: pass elif self._mode == 2: assert self._curtag is not None if self._curtag == self._rectag: self._recno = self._recno + 1 rec = self._curtag if self._flt is not None: while True: # OBLIGATORY res = self._flt.filterRecord(rec) if res & WRITE: self._wri.writeRecord(rec) if res & REPEAT: continue if res & BREAK: Log.info('Filter caused Process to stop on record %d' % (self._recno - 1)) raise FilterBreak break else: # OBLIGATORY self._wri.writeRecord(rec) if self._rto > 0 and self._recno > self._rto: raise ToLimitBreak if self._recno - 1 == self._lp_rec: Log.info('Processed %d records' % (self._recno - 1)) self._lp_rec = self._lp_rec + self._lp_step self._mode = 0 else: self._curtag = self._curtag.getparent() elif self._mode == 3: if self._s_nestlev > 0: self._s_nestlev = self._s_nestlev - 1 else: self._recno = self._recno + 1 self._mode = 0 def _OnObjEnd_Iter(self, key): if self._mode == 0: pass elif self._mode == 2: assert self._curtag is not None if self._curtag == self._rectag: self._recno = self._recno + 1 rec = self._curtag if self._flt is not None: while True: # FIXME: This will not work correctly in iterator mode # OBLIGATORY res = self._flt.filterRecord(rec) if res & WRITE: yield self._wri.writeRecord(rec) if res & REPEAT: continue if res & BREAK: Log.info('Filter caused Process to stop on record %d' % (self._recno - 1)) raise FilterBreak break else: # OBLIGATORY yield self._wri.writeRecord(rec) if self._rto > 0 and self._recno > self._rto: raise ToLimitBreak if self._recno - 1 == self._lp_rec: Log.info('Processed %d records' % (self._recno - 1)) self._lp_rec = self._lp_rec + self._lp_step self._mode = 0 else: self._curtag = self._curtag.getparent() elif self._mode == 3: if self._s_nestlev > 0: self._s_nestlev = self._s_nestlev - 1 else: self._recno = self._recno + 1 self._mode = 0