Source code for datconv.readers.dccsv
# -*- coding: utf-8 -*-
# Checked with python 2.7
"""This module implements Datconv Reader which reads data from CSV file."""
# Standard Python Libs
import logging
import csv
# Libs installed using pip
from lxml import etree
# Datconv generic modules
from datconv.filters import WRITE, REPEAT, BREAK
####################################################################
Log = None
"""Log varaible is automatically set by main datconv script using logging.getLogger method.
Use it for logging messages in need.
"""
[docs]class DCReader:
"""This module implements Datconv Reader which reads data from CSV file."""
def __init__(self, columns = 'item', strip = False, csv_opt = None):
"""Parameters are usually passed from YAML file as subkeys of Reader:CArg key.
:param columns: this parameter may be one of 3 possible types:\n
if it is positive number, it specifies line number in input file that stores column names. \n
if it is a list, it directly specifies column names in input file. \n
if it is string it stands for column name prefix, i.e. columns will have names <prefix>1, <prefix>2, ...
:param strip: if True, strips white spaces from values
:param csv_opt: dictionary with csv writer options. See `documentation <https://docs.python.org/3/library/csv.html>`_ of csv standard Python library. If None, Reader tries to recognize format using ``csv.Sniffer`` class.
For more detailed descriptions see :ref:`conf_template.yaml <readers_conf_template>` file in this module folder.
"""
assert Log is not None
self._wri = self._flt = None
if isinstance(columns, int):
self._colrow = columns
else:
self._colrow = 0
self._column = {}
if isinstance(columns, list):
colnr = 1
for col in columns:
self._column[colnr] = col
colnr = colnr + 1
if isinstance(columns, str):
self._colpref = columns
else:
self._colpref = 'item'
self._strip = strip
self._csv = csv_opt
def setWriter(self, writer):
self._wri = writer
def setFilter(self, flt):
self._flt = flt
[docs] def Process(self, inpath, outpath = None, rfrom = 1, rto = 0):
"""Parameters are usually passed from YAML file as subkeys of ``Reader:PArg`` key.
:param inpath: Path to input file.
:param outpath: Path to output file passed to Writer (fall-back if output connector is not defined).
:param rfrom-rto: specifies scope of records to be processed.
For more detailed descriptions see :ref:`readers_conf_template`.
"""
# OBLIGATORY
header = []
if self._flt is not None:
if hasattr(self._flt, 'setHeader'):
self._flt.setHeader(header)
# OBLIGATORY
self._wri.writeHeader(header)
with open(inpath, newline='') as csvfile:
if self._csv:
_rea = csv.reader(csvfile, **self._csv)
else:
dialect = csv.Sniffer().sniff(csvfile.read(1024))
csvfile.seek(0)
_rea = csv.reader(csvfile, dialect)
rowno = 0
recno = 0
for line in _rea:
rowno = rowno + 1
if rowno < self._colrow:
continue
colno = 1
if rowno == self._colrow:
for item in line:
self._column[colno] = _NormalizeTag(item)
colno = colno + 1
continue
recno = recno + 1
if recno < rfrom:
continue
if rto > 0 and recno > rto:
break
rec = etree.Element('rec')
for item in line:
if colno in self._column:
key = self._column[colno]
else:
key = self._colpref + str(colno)
self._column[colno] = key
if self._strip:
etree.SubElement(rec, key).text = item.strip()
else:
etree.SubElement(rec, key).text = item
colno = colno + 1
if self._flt is not None:
filter_break = False
while True:
res = self._flt.filterRecord(rec)
if res & WRITE:
self._wri.writeRecord(rec)
if res & REPEAT:
continue
if res & BREAK:
filter_break = True
break
if filter_break:
Log.info('Filter caused Process to stop on record %d' % recno)
break
else:
self._wri.writeRecord(rec)
# OBLIGATORY
footer = []
if self._flt is not None:
if hasattr(self._flt, 'setFooter'):
self._flt.setFooter(footer)
self._wri.writeFooter(footer)
def Iterate(self, inpath, outpath = None, rfrom = 1, rto = 0):
# OBLIGATORY
header = []
if self._flt is not None:
if hasattr(self._flt, 'setHeader'):
self._flt.setHeader(header)
# OBLIGATORY
self._wri.writeHeader(header)
with open(inpath, newline='') as csvfile:
if self._csv:
_rea = csv.reader(csvfile, **self._csv)
else:
dialect = csv.Sniffer().sniff(csvfile.read(1024))
csvfile.seek(0)
_rea = csv.reader(csvfile, dialect)
rowno = 0
recno = 0
for line in _rea:
rowno = rowno + 1
if rowno < self._colrow:
continue
colno = 1
if rowno == self._colrow:
for item in line:
self._column[colno] = _NormalizeTag(item)
colno = colno + 1
continue
recno = recno + 1
if recno < rfrom:
continue
if rto > 0 and recno > rto:
break
rec = etree.Element('rec')
for item in line:
if colno in self._column:
key = self._column[colno]
else:
key = self._colpref + str(colno)
self._column[colno] = key
if self._strip:
etree.SubElement(rec, key).text = item.strip()
else:
etree.SubElement(rec, key).text = item
colno = colno + 1
if self._flt is not None:
filter_break = False
while True:
res = self._flt.filterRecord(rec)
if res & WRITE:
yield self._wri.writeRecord(rec)
if res & REPEAT:
continue
if res & BREAK:
filter_break = True
break
if filter_break:
Log.info('Filter caused Process to stop on record %d' % recno)
break
else:
yield self._wri.writeRecord(rec)
# OBLIGATORY
footer = []
if self._flt is not None:
if hasattr(self._flt, 'setFooter'):
self._flt.setFooter(footer)
self._wri.writeFooter(footer)
def _NormalizeTag(tagname):
return tagname.strip().translate(str.maketrans('', '', '<> /\\?:;[]{}~`!@#$%^&*()+=|"\''))