Package kenozooid :: Package driver :: Package ostc :: Module parser

Source Code for Module kenozooid.driver.ostc.parser

  1  # 
  2  # Kenozooid - dive planning and analysis toolbox. 
  3  # 
  4  # Copyright (C) 2009-2019 by Artur Wroblewski <wrobell@riseup.net> 
  5  # 
  6  # This program is free software: you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation, either version 3 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, 
 12  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 14  # GNU General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program.  If not, see <http://www.gnu.org/licenses/>. 
 18  # 
 19   
 20  """ 
 21  OSTC dive computer binary data parsing routines. 
 22  """ 
 23   
 24  import logging 
 25  import re 
 26  from collections import namedtuple 
 27  from struct import unpack, calcsize 
 28  from binascii import hexlify 
 29   
 30  log = logging.getLogger('kenozooid.driver.ostc') 
 31   
 32  # command 'a' output 
 33  Data = namedtuple('Data', 'preamble eeprom voltage ver1 ver2 profiles') 
 34  FMT_STATUS = '<6s256sHbb' 
 35  LEN_STATUS = calcsize(FMT_STATUS) 
 36   
 37  # EEPROM data, command 'g' output (nfy) 
 38  EEPROMData = namedtuple('EEPROMData', 'serial dives data') 
 39  FMT_EEPROM = '<HH252s' 
 40   
 41  # profile data is one of 
 42  # - FAFA20..(42)..FBFB...FDFD 
 43  # - FAFA21..(52)..FBFB...FDFD 
 44  RE_PROFILES = re.compile(b'(\xfa\xfa' \ 
 45          b'(\x20.{42}|\x21.{52})\xfb\xfb)' \ 
 46          b'(.+?\xfd\xfd)', re.DOTALL) 
 47   
 48  # dive profile header 
 49  DiveHeader = namedtuple('DiveHeader', """\ 
 50  version month day year hour minute max_depth dive_time_m dive_time_s 
 51  min_temp surface_pressure desaturation 
 52  gas1_o2 gas1_he gas2_o2 gas2_he gas3_o2 gas3_he gas4_o2 gas4_he gas5_o2 gas5_he 
 53  gas6_o2 gas6_he gas 
 54  ver1 ver2 voltage sampling div_temp div_deco div_gf div_ppo2 
 55  div_deco_debug div_cns salnity max_cns 
 56  """) 
 57  FMT_DIVE_HEADER = '<6BHHB' 'HHH' '10B' 'BBB' 'BBHB4B' 'BBBB' 
 58   
 59  DiveHeader_191 = namedtuple('DiveHeader_191', 
 60          DiveHeader._fields + ('avg_depth', 'dive_time_total_s', 
 61              'gf_lo', 'gf_hi', 'deco_type', 
 62              'reserved1', 'reserved2', 'reserved3')) 
 63  FMT_DIVE_HEADER_191 = FMT_DIVE_HEADER + 'HH' 'BB' 'BBBB' 
 64   
 65  # dive profile data block sample 
 66  DiveSample = namedtuple('DiveSample', 'depth alarm gas_set_o2 gas_set_he' 
 67      ' current_gas setpoint temp deco_depth deco_time gf ppo2 cns') 
 68   
 69   
70 -def get_data(data):
71 """ 72 Get status information and profile raw data, see `Data` named tuple. 73 """ 74 dump = Data(*unpack(FMT_STATUS, data[:LEN_STATUS]), 75 profiles=data[LEN_STATUS:]) 76 eeprom = EEPROMData(*unpack(FMT_EEPROM, dump.eeprom)) 77 dump = dump._replace(eeprom=eeprom) 78 log.debug('unpacked status dump, voltage {}, version {}.{}, serial {}' \ 79 .format(dump.voltage, dump.ver1, dump.ver2, dump.eeprom.serial)) 80 return dump
81 82
83 -def profiles(data):
84 """ 85 Split profile data into individual dive profiles using profile 86 regular expression `RE_PROFILES`. 87 88 Collection of tuples (header, block) is returned 89 90 header 91 dive profile header 92 block 93 dive profile block data 94 95 """ 96 return ((h, p) for h, _, p in RE_PROFILES.findall(data))
97 98
99 -def header(data):
100 """ 101 Parse OSTC dive profile header, see `DiveHeader` named tuple. 102 """ 103 if len(data) == 47: 104 header = DiveHeader(*unpack(FMT_DIVE_HEADER, data[2:-2])) 105 elif len(data) == 57: 106 header = DiveHeader_191(*unpack(FMT_DIVE_HEADER_191, data[2:-2])) 107 else: 108 raise ValueError('Unknown length of profile header: {}'.format(len(data))) 109 110 log.debug('parsed dive header {0.year:>02d}-{0.month:>02d}-{0.day:>02d}' \ 111 ' {0.hour:>02d}:{0.minute:>02d}' \ 112 ' max depth={0.max_depth}'.format(header)) 113 if header.month == 0: 114 header = header._replace(month=1) 115 log.debug('corrected dive header month info (ver. {0.ver1}.{0.ver2})' \ 116 .format(header)) 117 return header
118 119
120 -def dive_data(header, data):
121 """ 122 Parse OSTC dive profile data block. 123 """ 124 div_temp_s, div_temp_c = divisor(header.div_temp) 125 div_deco_s, div_deco_c = divisor(header.div_deco) 126 div_gf_s, div_gf_c = divisor(header.div_gf) 127 div_ppo2_s, div_ppo2_c = divisor(header.div_ppo2) 128 div_deco_debug_s, div_deco_debug_c = divisor(header.div_deco_debug) 129 div_cns_s, div_cns_c = divisor(header.div_cns) 130 131 log.debug('header divisor values {:x} {:x} {:x} {:x} {:x} {:x}' 132 .format(header.div_temp, header.div_deco, header.div_gf, 133 header.div_ppo2, header.div_deco_debug, header.div_cns)) 134 135 dive_total_time = header.dive_time_m * 60 + header.dive_time_s 136 137 i = 0 138 j = 1 # sample number 139 while i < len(data) - 2: # skip profile block data end 140 depth = unpack('<H', data[i:i + 2])[0] / 100.0 141 i += 2 142 143 # size is count of bytes after profile byte 144 pfb = data[i] 145 i += 1 146 size, event = flag_byte(pfb) 147 log.debug('sample {} info: depth = {:.2f}, pfb = {:x}, size = {}, ' \ 148 'data: {}'.format(j, depth, pfb, size, hexlify(data[i:i + size]))) 149 150 alarm = None 151 gas_set = 0 152 gas_set_o2 = None 153 gas_set_he = None 154 gas_change = 0 155 current_gas = None 156 setpoint_change = 0 157 setpoint = None 158 # parse event byte information 159 if event: 160 v = data[i] 161 i += 1 162 alarm = v & 0x0f 163 gas_set = v & 0x10 164 gas_change = v & 0x20 165 setpoint_change = v & 0x40 166 167 log.debug('alarm = {}, gas_set = {}, gas_change = {},' \ 168 ' setpoint_change = {}'.format(alarm, gas_set, gas_change, 169 setpoint_change)) 170 171 if gas_set: 172 gas_set_o2 = data[i] 173 gas_set_he = data[i + 1] 174 i += 2 175 gas_set = 2 176 177 if gas_change: 178 current_gas = data[i] 179 i += 1 180 gas_change = 1 181 182 div_bytes = 0 183 184 temp = sample_data(data, i, j, div_temp_s, div_temp_c) 185 if temp is not None: 186 assert len(temp) == div_temp_c == 2 187 temp = unpack('<H', temp)[0] / 10.0 188 i += div_temp_c 189 div_bytes += div_temp_c 190 191 deco = sample_data(data, i, j, div_deco_s, div_deco_c) 192 if deco is not None: 193 assert len(deco) == div_deco_c 194 deco_depth, deco_time = deco 195 i += div_deco_c 196 div_bytes += div_deco_c 197 log.debug('deco time {}, depth {}'.format(deco_time, deco_depth)) 198 else: 199 deco_depth, deco_time = None, None 200 201 gf = sample_data(data, i, j, div_gf_s, div_gf_c) 202 if gf is not None: 203 i += div_gf_c 204 div_bytes += div_gf_c 205 206 ppo2 = sample_data(data, i, j, div_ppo2_s, div_ppo2_c) 207 if ppo2 is not None: 208 i += div_ppo2_c 209 div_bytes += div_ppo2_c 210 log.debug('ppo2 {}'.format(hexlify(ppo2))) 211 212 deco_debug = sample_data(data, i, j, div_deco_debug_s, div_deco_debug_c) 213 if deco_debug is not None: 214 i += div_deco_debug_c 215 div_bytes += div_deco_debug_c 216 log.debug('deco debug {}'.format(hexlify(deco_debug))) 217 218 cns = sample_data(data, i, j, div_cns_s, div_cns_c) 219 if cns is not None: 220 i += div_cns_c 221 div_bytes += div_cns_c 222 log.debug('cns {}'.format(hexlify(cns))) 223 224 if setpoint_change: 225 setpoint = data[i] 226 i += 1 227 setpoint_change = 1 228 log.debug('setpoint change {}'.format(setpoint)) 229 230 if size != event + gas_set + gas_change + setpoint_change + div_bytes: 231 log.debug('invalid dive data, sample = {}, depth = {:.2f},' \ 232 ' pfb = {:x}, size = {}, event = {}, alarm = {}, temp = {},' \ 233 ' gas_set = {}, gas_change = {}, cns = {},' \ 234 ' setpoint_change = {}, setpoint = {},' 235 ' div_bytes = {}, deco_debug = {}' 236 .format(j, depth, pfb, size, event, alarm, temp, gas_set, 237 gas_change, cns, setpoint_change, setpoint, 238 div_bytes, 239 hexlify(deco_debug) if deco_depth else [])) 240 raise ValueError('Invalid dive') 241 242 # is a sample within dive total time? if not, then skip sample 243 if header.sampling * (j - 1) <= dive_total_time: 244 yield DiveSample(depth, alarm, gas_set_o2, gas_set_he, current_gas, 245 setpoint, temp, deco_depth, deco_time, gf, ppo2, cns) 246 else: 247 log.debug('skipped sample {} (out of dive time), seek {}' 248 .format(j, i)) 249 j += 1 250 251 assert data[i:i + 2] == b'\xfd\xfd'
252 253
254 -def sample_data(data, i, sample, div_sample, div_count):
255 """ 256 Parse sample item like temperature, deco, etc. 257 258 :Parameters: 259 data 260 Profile block data. 261 i 262 Profile block data index, where sample item can be found. 263 sample 264 Number of dive sample (starts from 1). 265 div_sample 266 Divisor sampling information. 267 div_count 268 Sample item data bytes count. 269 """ 270 v = None 271 if div_sample and sample % div_sample == 0: 272 v = data[i:i + div_count] 273 return v
274 275
276 -def divisor(value):
277 """ 278 Split divisor value into divisor sample information and divisor 279 byte count. 280 """ 281 return value & 0b1111, value >> 4
282 283
284 -def flag_byte(value):
285 """ 286 Split profile flag byte into 287 288 - amount of additional bytes of extended information 289 - event byte presence, which is zero or one 290 291 """ 292 return value & 0x7f, value >> 7
293 294 295 # vim: sw=4:et:ai 296