Source code for datconv.readers.dcijson_events

# -*- coding: utf-8 -*-
"""This module implements Datconv Reader which reads data from JSON file."""

# Standard Python Libs
import logging

# Libs installed using pip
from lxml import etree  # http://lxml.de/tutorial.html

# Datconv generic modules
from datconv.filters import WRITE, REPEAT, BREAK

####################################################################
Log = None
"""Log variable is automatically set by main datconv script using logging.getLogger method.
Use it for logging messages in need.
"""

[docs]class FilterBreak(Exception): """Exception class to support Reader.process break isued from Filter class.""" pass
[docs]class ToLimitBreak(Exception): """Exception class to support Reader.process break caused by reaching configured record limit.""" pass
####################################################################
[docs]class DCReader: """This Datconv Reader class is utility class to help discover structure of JSON data file. It returns events generated by Python ``ijson`` JSON files parser.\n Example records returned by this reader (mode == 3):\n Input: .. code-block:: json { "PadnDrawNbrs": { "cdc": 5019, "product": "addn" } } Output (:ref:`writers_dccsv`): .. code-block:: none prefix , event , value item , start_map , None item , map_key , PadnDrawNbrs item.PadnDrawNbrs , start_map , None item.PadnDrawNbrs , map_key , cdc item.PadnDrawNbrs.cdc , number , 5019 item.PadnDrawNbrs , map_key , product item.PadnDrawNbrs.product , string , addn item.PadnDrawNbrs , end_map , None item , end_map , None Usage instructions of ``ijson`` package: * `<https://pypi.python.org/pypi/ijson/>`_ * `<http://softwaremaniacs.org/blog/2010/09/18/ijson/en/>`_ * `<http://explique.me/Ijson/>`_ """ def __init__(self, mode = 3, rec_tag = 'rec', log_prog_step = 0, backend = None): """Parameters are usually passed from YAML file as subkeys of ``Reader:CArg`` key. :param mode: returns: 1-only unique prefixes; 2-unique (prefix,event) pairs; 3-all events (including data). :param rec_tag: name or tag to be placed as record marker. :param log_prog_step: log info message after this number of records or does not log progress messages if this key is 0 or logging level is set to value higher than INFO. :param backend: backend used by ijson package to parse json file, possible values:\n ``yajl2_cffi`` - requires ``yajl2`` C library and ``cffi`` Python package to be installed in the system;\n ``yajl2`` - requires ``yajl2`` C library to be installed in the system;\n None - uses default, Python only backend. For more detailed descriptions see :ref:`readers_conf_template`. """ assert Log is not None self._wri = self._flt = None self._mode = mode self._rec_tag = rec_tag self._lp_step = log_prog_step self._backend = backend # OBLIGATORY def setWriter(self, writer): self._wri = writer # OBLIGATORY def setFilter(self, flt): self._flt = flt
[docs] def Process(self, inpath, outpath = None, rfrom = 1, rto = 0): """Parameters are usually passed from YAML file as subkeys of ``Reader:PArg`` key. :param inpath: Path to input file. :param outpath: Path to output file passed to Writer (fall-back if output connector is not defined). :param rfrom-rto: specifies scope of records to be processed. For more detailed descriptions see :ref:`readers_conf_template`. """ if self._backend == 'yajl2_cffi': import ijson.backends.yajl2_cffi as ijson elif self._backend == 'yajl2': import ijson.backends.yajl2 as ijson else: import ijson _recno = 1 _lp_rec = 0 if self._mode in [1, 2]: _unique = set() elif self._mode == 3: _unique = None else: Log.error('Invalid value of key mode (=%d); allowed values [1,2,3]' % self._mode) return if self._lp_step > 0 and Log.isEnabledFor(logging.INFO): _lp_rec = self._lp_step try: header = [{'_tag_':'ijson_events', '_bra_':True}] if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(header) self._wri.writeHeader(header) with open(inpath, 'r') as fd: parser = ijson.parse(fd) for prefix, event, value in parser: if rto > 0 and _recno > rto: raise ToLimitBreak if prefix in ['item', ''] and not event in ['start_array', 'start_map', 'map_key']: _recno = _recno + 1 if _recno == _lp_rec: Log.info('Processed %d records' % _recno) _lp_rec = _lp_rec + self._lp_step if _recno < rfrom: return if self._mode == 1: if prefix in _unique: continue _unique.add(prefix) rec = etree.Element(self._rec_tag) p_xml = etree.SubElement(rec, 'prefix') p_xml.text = str(prefix) elif self._mode == 2: if (prefix, event) in _unique: continue _unique.add((prefix, event)) rec = etree.Element(self._rec_tag) p_xml = etree.SubElement(rec, 'prefix') p_xml.text = str(prefix) e_xml = etree.SubElement(rec, 'event') e_xml.text = str(event) elif self._mode == 3: rec = etree.Element(self._rec_tag) p_xml = etree.SubElement(rec, 'prefix') p_xml.text = str(prefix) e_xml = etree.SubElement(rec, 'event') e_xml.text = str(event) v_xml = etree.SubElement(rec, 'value') v_xml.text = str(value) if self._flt is not None: while True: # OBLIGATORY res = self._flt.filterRecord(rec) if res & WRITE: self._wri.writeRecord(rec) if res & REPEAT: continue if res & BREAK: Log.info('Filter caused Process to stop on record %d' % _recno) raise FilterBreak break else: # OBLIGATORY self._wri.writeRecord(rec) except FilterBreak: pass except ToLimitBreak: pass finally: # OBLIGATORY footer = [] if self._flt is not None: if hasattr(self._flt, 'setFooter'): self._flt.setFooter(footer) self._wri.writeFooter(footer)
def Iterate(self, inpath, outpath = None, rfrom = 1, rto = 0): if self._backend == 'yajl2_cffi': import ijson.backends.yajl2_cffi as ijson elif self._backend == 'yajl2': import ijson.backends.yajl2 as ijson else: import ijson _recno = 1 _lp_rec = 0 if self._mode in [1, 2]: _unique = set() elif self._mode == 3: _unique = None else: Log.error('Invalid value of key mode (=%d); allowed values [1,2,3]' % self._mode) return if self._lp_step > 0 and Log.isEnabledFor(logging.INFO): _lp_rec = self._lp_step try: header = [{'_tag_':'ijson_events', '_bra_':True}] if self._flt is not None: if hasattr(self._flt, 'setHeader'): self._flt.setHeader(header) self._wri.writeHeader(header) with open(inpath, 'r') as fd: parser = ijson.parse(fd) for prefix, event, value in parser: if rto > 0 and _recno > rto: raise ToLimitBreak if prefix in ['item', ''] and not event in ['start_array', 'start_map', 'map_key']: _recno = _recno + 1 if _recno == _lp_rec: Log.info('Processed %d records' % _recno) _lp_rec = _lp_rec + self._lp_step if _recno < rfrom: return if self._mode == 1: if prefix in _unique: continue _unique.add(prefix) rec = etree.Element(self._rec_tag) p_xml = etree.SubElement(rec, 'prefix') p_xml.text = str(prefix) elif self._mode == 2: if (prefix, event) in _unique: continue _unique.add((prefix, event)) rec = etree.Element(self._rec_tag) p_xml = etree.SubElement(rec, 'prefix') p_xml.text = str(prefix) e_xml = etree.SubElement(rec, 'event') e_xml.text = str(event) elif self._mode == 3: rec = etree.Element(self._rec_tag) p_xml = etree.SubElement(rec, 'prefix') p_xml.text = str(prefix) e_xml = etree.SubElement(rec, 'event') e_xml.text = str(event) v_xml = etree.SubElement(rec, 'value') v_xml.text = str(value) if self._flt is not None: while True: # OBLIGATORY res = self._flt.filterRecord(rec) if res & WRITE: yield self._wri.writeRecord(rec) if res & REPEAT: continue if res & BREAK: Log.info('Filter caused Process to stop on record %d' % _recno) raise FilterBreak break else: # OBLIGATORY yield self._wri.writeRecord(rec) except FilterBreak: pass except ToLimitBreak: pass finally: # OBLIGATORY footer = [] if self._flt is not None: if hasattr(self._flt, 'setFooter'): self._flt.setFooter(footer) self._wri.writeFooter(footer)