Source code for kenozooid.uddf

#
# Kenozooid - dive planning and analysis toolbox.
#
# Copyright (C) 2009-2019 by Artur Wroblewski <wrobell@riseup.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

"""
The `kenozooid.uddf` module provides support for parsing, searching and
manipulation of data stored in UDDF files.

The functions implemented in this module can be divided into the following
categories

- XML nodes functions
- generic XML data searching and manipulation functions
- functions for searching and manipulation of diving specific data

Searching functions use XPath expressions (queries) to find data. Each tag
name in an query should be prefixed with 'uddf:' string to indicate UDDF
namespace, i.e. 'uddf:diver', 'uddf:waypoint' - appropriate namespace
mapping for this prefix is defined for each XPath call.

The result of parsing or search of data is usually iterator of XML nodes or
data records (named tuples in Python terms).

Module `lxml` is used for XML parsing and querying with XPath. Full
capabilities of underlying `libxml2` library is used by design. The
ElementTree XML data model is used for XML nodes.
"""

from collections import namedtuple, OrderedDict, Counter
from lxml import etree as et
from functools import partial
from datetime import datetime
from dateutil.parser import parse as dparse
from io import FileIO
from operator import itemgetter
from uuid import uuid4 as uuid
from copy import deepcopy
from dirty.xml import xml
from dirty import RawString
import base64
import bz2
import itertools
import hashlib
import logging
import os
import os.path
import pkg_resources

import kenozooid
import kenozooid.util as kt

log = logging.getLogger('kenozooid.uddf')

#
# Default UDDF namespace mapping.
#
_NSMAP = {'uddf': 'http://www.streit.cc/uddf/3.2/'}

# Node id formatter
FORMAT_ID = 'id-{}'

FMT_F = partial(str.format, '{0:.1f}')
FMT_F2 = partial(str.format, '{0:.2f}')
FMT_I = lambda v: '{}'.format(int(round(v)))
FMT_DT = lambda dt: format(dt, '%Y-%m-%dT%H:%M:%S%z')

#
# Parsing and searching.
#

XPath = partial(et.XPath, namespaces=_NSMAP)
XPath.__doc__ = """
    XPath query constructor for UDDF data.

    Use `uddf` prefix to create a query, i.e.::

        XPath('uddf:informationbeforedive/uddf:datetime/text()')

    .. seealso: lxml.etree.XPath
"""

# XPath queries for default dive data
XP_DEFAULT_DIVE_DATA = (
    XPath('uddf:informationbeforedive/uddf:divenumber/text()'),
    XPath('uddf:informationbeforedive/uddf:datetime/text()'),
    XPath('uddf:informationafterdive/uddf:greatestdepth/text()'),
    XPath('uddf:informationafterdive/uddf:diveduration/text()'),
    XPath('uddf:informationafterdive/uddf:lowesttemperature/text()'),
    XPath('uddf:informationafterdive/uddf:averagedepth/text()'),
    XPath('uddf:samples/uddf:waypoint/uddf:divemode[1]/@type'),
    None,
)

# XPath queries for default dive profile sample data
XP_DEFAULT_PROFILE_DATA =  (
    XPath('uddf:depth/text()'),
    XPath('uddf:divetime/text()'),
    XPath('uddf:temperature/text()'),
    XPath('uddf:setpo2/text()'),
    XPath('uddf:setpo2/@setby'),
    XPath('uddf:decostop/@duration'),
    XPath('uddf:decostop/@decodepth'),
    XPath('uddf:alarm/text()'),
    XPath('uddf:switchmix/@ref'),
)

XP_DEFAULT_GAS_DATA =  (
    XPath('@id'),
    XPath('uddf:name/text()'),
    XPath('uddf:o2/text()'),
    XPath('uddf:he/text()'),
)

# XPath query to locate dive profile sample
XP_WAYPOINT = XPath('./uddf:samples/uddf:waypoint')
# XPath query to locate gas mix
XP_MIX = XPath('/uddf:uddf/uddf:gasdefinitions/uddf:mix')

# XPath queries for default dive computer dump data
XP_DEFAULT_DUMP_DATA = (
    XPath('uddf:link/@ref'),
    # //uddf:divecomputerdump[position()] gives current()
    XPath('../../uddf:diver/uddf:owner//uddf:divecomputer[' \
            '@id = //uddf:divecomputerdump[position()]/uddf:link/@ref' \
        ']/uddf:model/text()'),
    XPath('uddf:datetime/text()'),
    XPath('uddf:dcdump/text()'),
)

# XPath queries for default buddy data
XP_DEFAULT_BUDDY_DATA = (
    XPath('@id'),
    XPath('uddf:personal/uddf:firstname/text()'),
    XPath('uddf:personal/uddf:middlename/text()'),
    XPath('uddf:personal/uddf:lastname/text()'),
    XPath('uddf:personal/uddf:membership/@organisation'),
    XPath('uddf:personal/uddf:membership/@memberid'),
)

# XPath queries for default dive site data
XP_DEFAULT_SITE_DATA = (
    XPath('@id'),
    XPath('uddf:name/text()'),
    XPath('uddf:geography/uddf:location/text()'),
    XPath('uddf:geography/uddf:longitude/text()'),
    XPath('uddf:geography/uddf:latitude/text()'),
)

# XPath query to find a buddy
XP_FIND_BUDDY = XPath('/uddf:uddf/uddf:diver/uddf:buddy[' \
    '@id = $buddy' \
    ' or uddf:personal/uddf:membership/@memberid = $buddy' \
    ' or uddf:personal/uddf:membership/@organisation = $buddy' \
    ' or contains(uddf:personal/uddf:firstname/text(), $buddy)' \
    ' or contains(uddf:personal/uddf:lastname/text(), $buddy)' \
    ']')

# XPath query to find a dive site
XP_FIND_SITE = XPath('/uddf:uddf/uddf:divesite/uddf:site[' \
    '@id = $site' \
    ' or contains(uddf:name/text(), $site)' \
    ' or contains(uddf:geography/uddf:location/text(), $site)' \
    ']')

# XPath query to find dives
XP_FIND_DIVES = XPath('/uddf:uddf/uddf:profiledata' \
    '/uddf:repetitiongroup/uddf:dive[in-range(position(), $nodes)' \
    ' and in-range(uddf:informationbeforedive/uddf:divenumber/text(), $dives)]')

# XPath query to find dive gases
XP_FIND_DIVE_GASES = XPath('/uddf:uddf/uddf:gasdefinitions' \
    '/uddf:mix[@id=/uddf:uddf/uddf:profiledata/uddf:repetitiongroup' \
    '/uddf:dive[in-range(position(), $nodes)]' \
    '/uddf:samples/uddf:waypoint/uddf:switchmix/@ref]')


class RangeError(ValueError):
    """
    Error raised when a range cannot be parsed.

    .. seealso::
        parse_range
    """
    pass


[docs]def parse(f, ver_check=True): """ Parse XML file and return document object. File to parse can be anything supported by ``lxml`` library. If file to parse is file name and ends with '.bz2', then it is treated as file compressed with bzip2. :Parameters: f File to parse. ver_check Check version of UDDF file. """ if isinstance(f, str) and (f.endswith('.bz2') or f.endswith('.bz2.bak')): log.debug('detected compressed file') f = bz2.BZ2File(f) doc = et.parse(f) if ver_check: v1, v2, *_ = doc.getroot().get('version').split('.') if (v1, v2) != ('3', '2'): raise ValueError('UDDF file version {}.{} is not supported.' \ ' Please upgrade file with "kz upgrade" command.' \ .format(v1, v2)) return doc
[docs]def find(f, query, **params): """ Find XML nodes in UDDF file using XPath query. UDDF file can be a file name, file object, URL and basically everything what is supported by `lxml` library. File to parse can be a file name ending with '.bz2'. It is treated as file compressed with bzip2. :Parameters: f UDDF file to parse. query XPath expression or XPath object. params XPath query parameters. .. seealso:: :py:func:`XPath`, :py:func:`parse` """ log.debug('parsing and searching with query: {}; parameters {}' \ .format(query, params)) doc = parse(f) if isinstance(query, str): return xp(doc, query) else: return (n for n in query(doc, **params))
[docs]def xp(node, query): """ Find items with XPath query. The query is performed using UDDF namespace. Iterator of items (strings, nodes) found by query is returned. :Parameters: node Document node or query starting node. query XPath query. .. seealso:: lxml.etree.Element.xpath """ for n in node.xpath(query, namespaces=_NSMAP): yield n
[docs]def xp_first(node, query): """ Get first element found with XPath query. The query is performed using UDDF namespace. First element is returned or None if it is not found. :Parameters: node Document node or query starting node. query XPath query. .. seealso:: lxml.etree.Element.xpath """ data = xp(node, query) return next(data, None)
[docs]def xp_last(node, query): """ Get last element found with XPath query. The query is performed using UDDF namespace. Last element is returned or None if it is not found. :Parameters: node Document node or query starting node. query XPath query. .. seealso:: lxml.etree.Element.xpath """ nodes = node.xpath(query, namespaces=_NSMAP) return nodes[-1] if nodes else None
def find_data(name, node, fields, queries, parsers, nquery=None): """ Find data records starting from specified XML node. A record type (namedtuple) is created with specified fields. The data of a record is retrieved with XPath expression objects, which is converted from string to appropriate type using parsers. A parser can be any type or function, i.e. `float`, `int` or `dateutil.parser.parse`. If XML node is too high to execture XPath expression objects, then the basis for field queries can be relocated with `nquery` parameter. If `nquery` parameter is not specified, then only one record is returned. Otherwise it is generator of records. The length of fields, field queries and field parsers should be the same. :Parameters: name Name of the record to be created. node XML node. fields Names of fields to be created in a record. queries XPath expression objects for each field to retrieve its value. parsers Parsers of field values to be created in a record. nquery XPath expression object to relocate from node to more appropriate position in XML document for record data retrieval. .. seealso:: :py:func:`dive_data`, :py:func:`dive_profile` """ T = namedtuple(name, ' '.join(fields))._make if nquery: data = nquery(node) return (_record(T, n, queries, parsers) for n in data) else: return _record(T, node, queries, parsers) def dive_data(node, fields=None, queries=None, parsers=None): """ Specialized function to return record of a dive data. At the moment record of dive data contains dive start time only, by default. It should be enhanced in the future to return more rich data record. Dive record data can be reconfigured with optional fields, field queries and field parsers parameters. :Parameters: node XML node. fields Names of fields to be created in a record. queries XPath expression object for each field to retrieve its value. parsers Parsers field values to be created in a record. .. seealso:: :py:func:`find_data` """ if fields is None: fields = ('number', 'datetime', 'depth', 'duration', 'temp', 'avg_depth', 'mode', 'profile') queries = XP_DEFAULT_DIVE_DATA parsers = (int, dparse, float, float, float, float, str, dive_profile) return find_data('Dive', node, fields, queries, parsers) def dive_profile(node, fields=None, queries=None, parsers=None): """ Specialized function to return generator of dive profiles records. By default, dive profile record contains following fields time dive time in seconds depth dive depth in meters temp temperature in Kelvins Dive profile record data can be reconfigured with optional fields, field queries and field parsers parameters. :Parameters: node XML node. fields Names of fields to be created in a record. queries XPath expression objects for each field to retrieve its value. parsers Parsers of field values to be created in a record. .. seealso:: :py:func:`find_data` """ if fields is None: fields = ('depth', 'time', 'temp', 'setpoint', 'setpointby', 'deco_time', 'deco_depth', 'alarm', 'gas') queries = XP_DEFAULT_PROFILE_DATA gases = dict(((gas.id, gas) for gas in gas_data(node))) parsers = (float, ) * 4 + (str, float, float, str, gases.get) return find_data('Sample', node, fields, queries, parsers, nquery=XP_WAYPOINT) def gas_data(node, fields=None, queries=None, parsers=None): if fields is None: fields = ('id', 'name', 'o2', 'he') queries = XP_DEFAULT_GAS_DATA parsers = (str, str, int, int) return find_data('Gas', node, fields, queries, parsers, nquery=XP_MIX) def dump_data(node, fields=None, queries=None, parsers=None): """ Get dive computer dump data. The following data is returned dc_id Dive computer id. dc_model Dive computer model information. datetime Date and time when dive computer dump was obtained. data Dive computer dump data. :Parameters: node XML node. fields Names of fields to be created in a record. queries XPath expression objects for each field to retrieve its value. parsers Parsers of field values to be created in a record. .. seealso:: :py:func:`find_data` """ if fields is None: fields = ('dc_id', 'dc_model', 'datetime', 'data') queries = XP_DEFAULT_DUMP_DATA parsers = (str, str, dparse, _dump_decode) return find_data('DiveComputerDump', node, fields, queries, parsers) def buddy_data(node, fields=None, queries=None, parsers=None): """ Get dive buddy data. The following data is returned by default id Buddy id. fname Buddy first name. mname Buddy middle name. lname Buddy last name. org Organization, which a buddy is member of. number Member number id in the organisation. :Parameters: node XML node. fields Names of fields to be created in a record. queries XPath expression objects for each field to retrieve its value. parsers Parsers of field values to be created in a record. .. seealso:: :py:func:`find_data` """ if fields is None: fields = ('id', 'fname', 'mname', 'lname', 'org', 'number') queries = XP_DEFAULT_BUDDY_DATA parsers = (str, ) * 7 return find_data('Buddy', node, fields, queries, parsers) def site_data(node, fields=None, queries=None, parsers=None): """ Get dive site data. The following data is returned by default id Dive site id. name Dive site name. location Dive site location. x Dive site longitude. y Dive site latitude. :Parameters: node XML node. fields Names of fields to be created in a record. queries XPath expression objects for each field to retrieve its value. parsers Parsers of field values to be created in a record. .. seealso:: :py:func:`find_data` """ if fields is None: fields = ('id', 'name', 'location', 'x', 'y') queries = XP_DEFAULT_SITE_DATA parsers = (str, str, str, float, float) return find_data('DiveSite', node, fields, queries, parsers) def parse_range(s): """ Parse textual representation of number range into Python expression. Examples of a ranges >>> parse_range('1-3,5') '1 <= n and n <= 3 or n == 5' >>> parse_range('-3,10') 'n <= 3 or n == 10' Example of infinite range >>> parse_range('20-') '20 <= n' :Parameters: s Textual representation of number range. """ data = [] try: for r in s.split(','): d = r.split('-') if len(d) == 1: data.append('n == %d' % int(d[0])) elif len(d) == 2: p1 = d[0].strip() p2 = d[1].strip() if p1 and p2: data.append('{} <= n and n <= {}'.format(int(p1), int(p2))) elif p1 and not p2: data.append('{} <= n'.format(int(p1))) elif not p1 and p2: data.append('n <= {}'.format(int(p2))) else: raise RangeError('Invalid range %s' % s) except ValueError as ex: raise RangeError('Invalid range %s' % s) return ' or '.join(data) def in_range(ctx, pos, nodes): """ XPath expression function to restrict position of a node to be within numeric range. :Parameters: ctx XPath context object. pos Node position. nodes Number range, i.e. "2-3". .. seealso:: :py:func:`parse_range` """ if not nodes: return True if isinstance(pos, list): if len(pos) == 0: return False if len(pos) != 1: raise ValueError('Too many parameters') pos = int(pos[0]) kf = 'in-range({})'.format(nodes) if kf not in ctx.eval_context: nr = parse_range(nodes) fstr = 'ctx.eval_context["{}"] = lambda n: {}'.format(kf, nr) exec(fstr) return ctx.eval_context[kf](pos) # register in-range XPath function ns = et.FunctionNamespace(None) ns['in-range'] = in_range def _field(node, query, parser): """ Find text value of a node starting from specified XML node. The text value is converted with function `t` and then returned. If node is not found, then `None` is returned. :Parameters: node XML node. query XPath expression object to find node with text value. parser Parser to convert text value to requested type. """ data = [node] if query is None else query(node) if data: return parser(data[0]) def _record(rt, node, queries, parsers): """ Create record with data. The record data is found with queries (XPath expressions objects) starting from XML node. The data is converted to their appropriate type using parsers. If query is `None`, then record data is node itself. :Parameters: rt Record type (named tuple) of record data. node XML node. queries XPath expression objects for each field to retrieve its value. parsers Parsers of field values to be created in a record. """ return rt(_field(node, f, p) for f, p in zip(queries, parsers)) def _dump_decode(data): """ Decode dive computer data, which is stored in UDDF dive computer dump file. """ s = base64.b64decode(data.encode()) return bz2.decompress(s) # # Creating UDDF data. # DEFAULT_FMT_DIVE_PROFILE = { 'depth': lambda d: str.format('{0:.1f}', max(d, 0)), 'temp': partial(str.format, '{0:.1f}'), } # basic data for an UDDF file # fixme: obsolete UDDF_BASIC = """\ <uddf xmlns="http://www.streit.cc/uddf/3.2/" version="3.2.0"> <generator> <name>kenozooid</name> <manufacturer id='kenozooid'> <name>Kenozooid Team</name> <contact> <homepage>http://wrobell.dcmod.org/kenozooid/</homepage> </contact> </manufacturer> <version>{kzver}</version> <datetime></datetime> </generator> <diver> <owner id='owner'> <personal> <firstname>Anonymous</firstname> <lastname>Guest</lastname> </personal> </owner> </diver> </uddf> """.format(kzver=kenozooid.__version__) def create(datetime=datetime.now()): """ fixme: obsolete Create basic UDDF structure. :Parameters: datetime Timestamp of file creation, current time by default. """ root = et.XML(UDDF_BASIC) now = datetime.now() n = root.xpath('//uddf:generator/uddf:datetime', namespaces=_NSMAP)[0] n.text = FMT_DT(datetime) return root
[docs]def set_data(node, queries, formatters=None, **data): """ Set data of nodes or attributes using XPath queries relative to specified XML node. The data values are converted to string with formatters functions. :Parameters: node XML node. queries Path-like expressions of XML structure to be created. formatters Data formatters. data Data values to be set within XML document. """ if formatters is None: formatters = {} nodes = {} # created nodes attrs = set() # created attributes for key, path in queries.items(): value = data.get(key) if value is None: continue if isinstance(path, str): path = [path] value = [value] for p, v in zip(path, value): f = formatters.get(key, str) v = f(v) attr = None tags = p.rsplit('/', 1) if tags[-1].startswith('@'): attr = tags[-1][1:] # skip '@' p = tags[0] if len(tags) > 1 else None n = node if p: n = nodes.get(p) # reuse node created in this call to make t/@a t/@b work, # but create new node to not overwrite attribute value if n is None or (p, attr) in attrs: *_, n = create_node(p, parent=node) nodes[p] = n attrs.add((p, attr)) assert n is not None if attr: n.set(attr, v) else: n.text = v
[docs]def create_node(path, parent=None, append=True): """ TODO: get rid of parent, does not make sense Create a hierarchy of nodes using XML nodes path specification. Path is a string of node names separated by slash character, i.e. a/b/c creates:: <a><b><c/></b><a> If parent node is specified and some part of node hierarchy already exists then only non-existant nodes are created, i.e. if parent is 'x' node in <x><y/></x> then path 'x/z' modifies XML document as follows <x><y/><z/></x> :Parameters: path Hierarchy of nodes. parent Optional parent node. """ # preserve namespace prefix option... please?!? :/ T = lambda tag: tag.replace('uddf:', '{' + _NSMAP['uddf'] + '}') tags = path.split('/') n = parent for t in tags: is_last = tags[-1] == t k_exists = False k = None if n is not None: k = xp_first(n, t) if is_last or k is None: k = et.Element(T(t)) elif k is not None: k_exists = True if n is not None and not k_exists: if append: n.append(k) else: n.insert(0, k) n = k yield n
def create_dive_data(node=None, queries=None, formatters=None, **data): """ Create dive data. :Parameters: node Base node (UDDF root node). queries Path-like expressions of XML structure to be created. formatters Dive data formatters. data Dive data. """ if queries == None: bd = data.get('buddies') bno = len(bd) if bd else 0 f = ('site', 'buddies', 'datetime', 'depth', 'duration', 'temp') q = ('uddf:informationbeforedive/uddf:link/@ref', ['uddf:informationbeforedive/uddf:link/@ref'] * bno, 'uddf:informationbeforedive/uddf:datetime', 'uddf:informationafterdive/uddf:greatestdepth', 'uddf:informationafterdive/uddf:diveduration', 'uddf:informationafterdive/uddf:lowesttemperature') queries = OrderedDict(zip(f, q)) if formatters == None: formatters = { 'datetime': FMT_DT, 'depth': partial(str.format, '{0:.1f}'), 'duration': partial(str.format, '{0:.0f}'), 'temp': partial(str.format, '{0:.1f}'), } _, rg, dn = create_node('uddf:profiledata/uddf:repetitiongroup/uddf:dive', parent=node) _set_id(rg) _set_id(dn) set_data(dn, queries, formatters, **data) return dn def create_buddy_data(node, queries=None, formatters=None, **data): """ Create buddy data. :Parameters: node Base node (UDDF root node). queries Path-like expressions of XML structure to be created. formatters Buddy data formatters. data Buddy data. """ if queries == None: f = ('id', 'fname', 'mname', 'lname', 'org', 'number') q = ('@id', 'uddf:personal/uddf:firstname', 'uddf:personal/uddf:middlename', 'uddf:personal/uddf:lastname', 'uddf:personal/uddf:membership/@organisation', 'uddf:personal/uddf:membership/@memberid') queries = OrderedDict(zip(f, q)) if formatters == None: formatters = {} if 'id' not in data or data['id'] is None: data['id'] = uuid().hex _, buddy = create_node('uddf:diver/uddf:buddy', parent=node) set_data(buddy, queries, formatters, **data) return buddy def create_site_data(node, queries=None, formatters=None, **data): """ Create dive site data. :Parameters: node Base node (UDDF root node). queries Path-like expressions of XML structure to be created. formatters Dive site data formatters. data Dive site data. """ if queries == None: f = ('id', 'name', 'location', 'x', 'y') q = ('@id', 'uddf:name', 'uddf:geography/uddf:location', 'uddf:geography/uddf:longitude', 'uddf:geography/uddf:latitude') queries = OrderedDict(zip(f, q)) if formatters == None: formatters = {} if 'id' not in data or data['id'] is None: data['id'] = uuid().hex _, site = create_node('uddf:divesite/uddf:site', parent=node) set_data(site, queries, formatters, **data) return site def _dump_encode(data): """ Encode dive computer data, so it can be stored in UDDF file. The encoded string is returned. """ s = bz2.compress(data) return base64.b64encode(s) def create_uddf(datetime=datetime.now(), equipment=None, gases=None, dives=None, dump=None): """ Create UDDF XML data. :Parameters: datetime Timestamp of UDDF creation. equipment Diver's (owner) equipment XML data (see create_dc_data). gases List of gases used by the dives. dives Dives XML data (see create_dives). dump Dive computer dump XML data (see create_dump_data). """ doc = xml.uddf( xml.generator( xml.name('kenozooid'), xml.manufacturer( xml.name('Kenozooid Team'), xml.contact(xml.homepage('https://wrobell.dcmod.org/kenozooid/')), id='kenozooid'), xml.version(kenozooid.__version__), xml.datetime(FMT_DT(datetime)), ), xml.diver( xml.owner( xml.personal(xml.firstname('Anonymous'), xml.lastname('Guest')), xml.equipment(equipment) if equipment else None, id='owner')), xml.divecomputercontrol(dump) if dump else None, xml.gasdefinitions(gases) if gases else None, xml.profiledata(xml.repetitiongroup(dives, id=gen_id())) if dives else None, xmlns=_NSMAP['uddf'], version='3.2.0', ) return doc def create_dives(dives, equipment=None): """ Create dives UDDF XML data. :Parameters: dives Iterable of dive tuples. equipment List of used equipment references. """ f = partial(create_dive, equipment=equipment) yield from (f(dive) for dive in dives) def create_dive(dive, equipment=None): """ Create dive UDDF XML data. :Parameters: dive Dive to render as XML. equipment List of used equipment references. """ eq = itertools.chain(kt.nit(dive.equipment), kt.nit(equipment)) log.debug('convert dive {0.datetime}/{0.depth:.1f}/{0.duration} into XML' .format(dive)) return xml.dive( xml.informationbeforedive(xml.datetime(FMT_DT(dive.datetime))), None if not dive.profile else xml.samples(create_dive_samples(dive.profile, dive.mode)), xml.informationafterdive( xml.greatestdepth(FMT_F(dive.depth)), xml.diveduration(FMT_I(dive.duration)), None if dive.temp is None else xml.lowesttemperature(FMT_F(dive.temp)), xml.equipmentused((xml.link(ref=v) for v in eq)), None if dive.avg_depth is None else xml.averagedepth(FMT_F(dive.avg_depth)), ), id=gen_id(), ) def create_dive_samples(samples, mode=None): """ Create dive samples UDDF XML data. :Parameters: samples Iterable of tuples of dive samples. mode Dive mode, i.e. opencircuit, closedcircuit. """ for i, s in enumerate(samples): yield xml.waypoint( None if s.alarm is None else (xml.alarm(a) for a in s.alarm), None if s.deco_time is None else xml.decostop( duration=FMT_I(s.deco_time), decodepth=FMT_I(s.deco_depth), kind='mandatory' ), xml.depth(FMT_F(s.depth)), xml.divetime(FMT_I(s.time)), None if s.setpoint is None else xml.setpo2(FMT_F2(s.setpoint), setby=s.setpointby), None if s.gas is None else xml.switchmix(ref=str(s.gas.id)), None if s.temp is None else xml.temperature(FMT_F(s.temp)), None if mode is None or i > 0 else xml.divemode(type=mode), ) def create_gas(gas): """ Create gas UDDF XML data. :Parameters: gas Gas information to render as XML. """ return xml.mix( xml.name(gas.name), xml.o2(str(gas.o2)), xml.he(str(gas.he)), id=gas.id, ) def create_dc_data(dc_id, model): """ Create dive computer UDDF XML data. :Parameters: dc_id Dive computer id. model Dive computer model. """ yield xml.divecomputer(xml.name(model), xml.model(model), id=dc_id) def create_dump_data(dc_id, datetime, data): """ Create dive computer dump UDDF XML data. :Parameters: dc_id Dive computer id. datetime Date and time when the dump was created. data Dive computer binary data. """ yield xml.divecomputerdump( xml.link(ref=dc_id), xml.datetime(FMT_DT(datetime)), xml.dcdump(_dump_encode(data).decode()), )
[docs]def save(doc, fout, validate=True): """ Save UDDF XML data into a file. If output file is file name ending with '.bz2', then it is compressed with bzip2. The UDDF XML data can be ElementTree XML object or iterable of strings. If output file exists then backup file with ``.bak`` extension is created. :Parameters: doc UDDF XML data. fout Output file. validate Validate UDDF file after saving if True. """ log.debug('saving uddf file') is_fn = isinstance(fout, str) openf = open if is_fn and fout.endswith('.bz2'): openf = bz2.BZ2File log.debug('uddf file will be compressed') fbk = '{}.bak'.format(fout) if is_fn and os.path.exists(fout): os.rename(fout, fbk) log.debug('backup file created') try: f = openf(fout, 'wb') if is_fn else fout if et.iselement(doc): et.ElementTree(doc).write(f, encoding='utf-8', xml_declaration=True, pretty_print=True) else: f.writelines(l.encode('utf-8') for l in doc) if validate: log.debug('validating uddf file') fs = pkg_resources.resource_stream('kenozooid', 'uddf/uddf_3.2.0.xsd') if hasattr(fs, 'name'): log.debug('uddf xsd found: {}'.format(fs.name)) schema = et.XMLSchema(et.parse(fs)) if is_fn: f = openf(fout) else: f.seek(0) schema.assertValid(et.parse(f)) log.debug('uddf file is valid') except Exception as ex: if os.path.exists(fbk): os.rename(fbk, fout) log.debug('backup file restored') raise ex
# # Removing UDDF data. # def remove_nodes(node, query, **params): """ Remove nodes from XML document using XPath query. :Parameters: node Starting XML node for XPath query. query XPath query to find nodes to remove. params XPath query parameters. """ log.debug('node removal with query: {}, params: {}'.format(query, params)) for n in query(node, **params): p = n.getparent() p.remove(n) # # Processing UDDF data. # def reorder(doc): """ fixme: obsolete Reorder and cleanup dives in UDDF document. Following operations are being performed - dives are sorted by dive start time - duplicate dives and repetition groups are removed :Parameters: doc UDDF document. """ find = partial(doc.xpath, namespaces=_NSMAP) profiles = find('//uddf:profiledata') rgroups = find('//uddf:profiledata/uddf:repetitiongroup') if not profiles or not rgroups: raise ValueError('No profile data to reorder') pd = profiles[0] q = '/uddf:uddf/uddf:profiledata/uddf:repetitiongroup/uddf:dive' qt = q + '/uddf:informationbeforedive/uddf:datetime/text()' nodes = find(q) times = find(qt) dives = {} for n, t in zip(nodes, times): dt = dparse(t) # don't rely on string representation for sorting if dt not in dives: dives[dt] = n log.debug('removing old repetition groups') for rg in rgroups: # cleanup old repetition groups pd.remove(rg) rg, = create_node('uddf:repetitiongroup', parent=pd) _set_id(rg) # sort dive nodes by dive time log.debug('sorting dives') for dt, n in sorted(dives.items(), key=itemgetter(0)): rg.append(n)
[docs]class NodeCopier(object): """ UDDF dcument node copier. See :py:func:`NodeCopier.copy` for details. :Attributes: doc The target document. doc_ids The cache of target document ids. """ def __init__(self, doc): """ Initialize node copier. """ self.doc = doc self.doc_ids = set(xp(doc, '//uddf:*/@id')) def __enter__(self): """ Create UDDF node copier context manager. """ return self def __exit__(self, *args): """ Close UDDF node copier context manager. """ pass
[docs] def copy(self, node, target): """ Copy node from UDDF document to target node in destination UDDF document. Target node becomes parent of node to be copied. The copying works under following assumptions - whole node is being copied including its descendants - node is not copied if it has id and id already exists in the target - if copied nodes reference non-descendant nodes and they do _not_ exist in destination document, then referencing nodes are _removed_ - if, due to node removal, its parent node becomes empty, then parent is removed, too Copy of the node is returned. :Parameters: node Node to copy. target The future parent of the copied node. """ cn = deepcopy(node) cn_id = cn.get('id') if cn_id in self.doc_ids: log.debug('id {} already exists, not copying'.format(cn_id)) return None s_ids = set(xp(cn, 'descendant-or-self::uddf:*/@id')) self.doc_ids.update(s_ids) # get referencing nodes nodes = list(xp(cn, 'descendant-or-self::uddf:*[@ref]')) refs = set(k.get('ref') for k in nodes) left = refs - self.doc_ids if __debug__: log.debug('references to remove: {} = {} - {}'.format(left, refs, self.doc_ids)) if cn.get('ref') in left: raise ValueError('Node to copy references non-existing node') # remove nodes referencing missing data to_remove = (n for n in nodes if n.get('ref') in left) assert cn.getparent() is None for n in to_remove: p = n.getparent() while p is not None and len(p) == 1: n = p p = n.getparent() if p is not None: p.remove(n) target.append(cn) return cn
def _set_id(node): """ Generate id for a node if there is no id yet. :Parameters: node Node for which id should be generated. """ if node.get('id') is None: node.set('id', FORMAT_ID.format(uuid().hex)) def gen_id(value=None): """ Generate id for a value. If value is specified then id is MD5 hash of value. If not specified, then id is generated with UUID 4. The returned id is a string prefixed with ``id-`` to make it XML compliant. """ if value is None: vid = uuid().hex else: vid = hashlib.md5(str(value).encode()).hexdigest() return 'id-{}'.format(vid) def xml_file_copy(f): """ Iterator of raw XML data from a file to data into ``dirty.xml`` nodes. :Parameters: f File containing XML data. """ while True: l = f.read(4096) if not l: break yield RawString(l) def get_version(f): """ Get major version of UDDF file. Tuple (major, minor) is returned, i.e. (3, 0), (3, 1), etc. :Parameters: f File to check. """ n = parse(f, ver_check=False).getroot() v1, v2, *_ = n.get('version').split('.') if isinstance(f, FileIO): f.seek(0, 0) log.debug('detected version {}.{}'.format(v1, v2)) return int(v1), int(v2) # vim: sw=4:et:ai