Package kenozooid :: Package driver :: Package ostc

Source Code for Package kenozooid.driver.ostc

  1  # 
  2  # Kenozooid - dive planning and analysis toolbox. 
  3  # 
  4  # Copyright (C) 2009-2019 by Artur Wroblewski <wrobell@riseup.net> 
  5  # 
  6  # This program is free software: you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation, either version 3 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, 
 12  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 14  # GNU General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program.  If not, see <http://www.gnu.org/licenses/>. 
 18  # 
 19   
 20  """ 
 21  Driver for OSTC, an open source dive computer. 
 22   
 23  OSTC dive computer specification and documentation of communication 
 24  protocol can be found at address 
 25   
 26      http://www.heinrichsweikamp.net/ 
 27   
 28  """ 
 29   
 30  from datetime import datetime, timedelta 
 31  from serial import Serial, SerialException 
 32  from binascii import hexlify 
 33  from functools import partial 
 34  from operator import attrgetter 
 35  import logging 
 36   
 37  log = logging.getLogger('kenozooid.driver.ostc') 
 38   
 39  import kenozooid.uddf as ku 
 40  import kenozooid.component as kc 
 41  from kenozooid.driver import DeviceDriver, Simulator, DataParser, DeviceError 
 42  from kenozooid.units import C2K, B2Pa 
 43  from . import parser as ostc_parser 
 44  import kenozooid.data as kd 
 45   
 46  GAS_GETTERS = {i: attrgetter('gas{}_o2'.format(i), 'gas{}_he'.format(i)) 
 47          for i in range(1, 7)} 
48 49 -def pressure(depth):
50 """ 51 Convert depth in meters to pressure in mBars. 52 """ 53 return int(depth + 10)
54 55 56 @kc.inject(DeviceDriver, id='ostc', name='OSTC Driver', 57 models=('OSTC', 'OSTC Mk.2', 'OSTC 2N'))
58 -class OSTCDriver(object):
59 """ 60 OSTC dive computer driver. 61 """
62 - def __init__(self, port):
63 super(OSTCDriver, self).__init__() 64 65 self._device = Serial(port=port, 66 baudrate=115200, 67 bytesize=8, 68 stopbits=1, 69 parity='N', 70 timeout=5) # 1s timeout is too short sometimes with 'a' command
71 72
73 - def _write(self, cmd):
74 log.debug('sending command {}'.format(cmd)) 75 self._device.write(cmd) 76 log.debug('returned after command {}'.format(cmd))
77 78
79 - def _read(self, size):
80 assert size > 0 81 log.debug('reading {} byte(s)'.format(size)) 82 data = self._device.read(size) 83 log.debug('got {} byte(s) of data'.format(len(data))) 84 if len(data) != size: 85 raise DeviceError('Device communication error') 86 return data
87 88 89 @staticmethod
90 - def scan(port=None):
91 """ 92 Look for OSTC dive computer connected to one of USB ports. 93 94 Library pySerial is used, therefore no scanning and port shall be 95 specified. 96 """ 97 try: 98 drv = OSTCDriver(port) 99 log.debug('connected ostc to port {}'.format(port)) 100 yield drv 101 except SerialException as ex: 102 log.debug('{}'.format(ex))
103 104
105 - def version(self):
106 """ 107 Read OSTC dive computer firmware version. 108 """ 109 self._write(b'e') 110 v1, v2 = self._read(2) 111 self._read(16) # fingerprint, ignore as it can be 0x00 if not built yet 112 return 'OSTC {}.{}'.format(v1, v2)
113
114 115 116 @kc.inject(Simulator, id='ostc') 117 -class OSTCSimulator(object):
118 """ 119 OSTC dive computer simulator support. 120 """
121 - def start(self):
122 """ 123 Put OSTC dive computer into dive simulation mode. The dive computer 124 will not show dive mode screen until "dived" into configured depth 125 (option CF0). 126 """ 127 self.driver._write(b'c')
128 129
130 - def stop(self):
131 """ 132 Stop OSTC dive simulation mode. OSTC stays in dive mode until 133 appropriate period of time passes, which is configured with option 134 CF2. 135 """ 136 self.driver._write(b'\x00')
137 138
139 - def depth(self, depth):
140 """ 141 Send dive computer to given depth. 142 """ 143 p = pressure(round(depth)) 144 self.driver._write(bytearray((p,)))
145 146 147 148 @kc.inject(DataParser, id='ostc', data=('gas',))
149 -class OSTCDataParser(object):
150 """ 151 OSTC dive computer data parser. 152 """
153 - def dump(self):
154 """ 155 Download OSTC status and all dive profiles. 156 """ 157 self.driver._write(b'a') 158 data = self.driver._read(33034) 159 status = ostc_parser.get_data(data) 160 if (status.ver1, status.ver2) >= (1, 91): 161 log.debug('detected ostc firmware >= 1.91, reading additional data') 162 data += self.driver._read(32768) 163 return data
164 165
166 - def dives(self, dump):
167 """ 168 Convert dive data into UDDF format. 169 """ 170 ostc_data = ostc_parser.get_data(dump.data) 171 172 for h, p in ostc_parser.profiles(ostc_data.profiles): 173 log.debug('header: {}'.format(hexlify(h))) 174 log.debug('profile: {}'.format(hexlify(p))) 175 176 header = ostc_parser.header(h) 177 dive_data = ostc_parser.dive_data(header, p) 178 179 # set time of the start of dive 180 st = datetime(2000 + header.year, header.month, header.day, 181 header.hour, header.minute) 182 # ostc dive computer saves time at the end of dive in its 183 # memory, so substract the dive time; 184 # sampling amount is substracted as well as below (0, 0) 185 # waypoint is added 186 duration = timedelta(minutes=header.dive_time_m, 187 seconds=header.dive_time_s + header.sampling) 188 st -= duration 189 190 # firmware ver < 1.91 has no average depth information 191 avg_depth = header.avg_depth / 100.0 \ 192 if hasattr(header, 'avg_depth') else None 193 194 # firmware ver < 1.91 has no deco type information 195 dive_mode = None 196 if hasattr(header, 'deco_type'): 197 if header.deco_type in (0, 4): 198 dive_mode = 'opencircuit' 199 elif header.deco_type in (2, 5): 200 dive_mode = 'closedcircuit' 201 elif header.doc_type == 3: 202 dive_mode = 'apnoe' 203 204 try: 205 profile = list(self._get_profile(header, dive_data)) 206 yield kd.Dive(datetime=st, 207 depth=header.max_depth / 100.0, 208 duration=duration.seconds, 209 temp=C2K(header.min_temp / 10.0), 210 avg_depth=avg_depth, 211 mode=dive_mode, 212 profile=profile) 213 except ValueError as ex: 214 log.error('invalid dive {0.year:>02d}-{0.month:>02d}-{0.day:>02d}' \ 215 ' {0.hour:>02d}:{0.minute:>02d}' \ 216 ' max depth={0.max_depth}'.format(header))
217 218
219 - def _get_profile(self, header, dive_data):
220 """ 221 Parse OSTC dive samples. 222 """ 223 # ostc starts dive below at a depth, so add (0, 0) sample 224 yield kd.Sample(depth=0.0, time=0, gas=self._get_gas(header, header.gas)) 225 226 for i, sample in enumerate(dive_data, 1): 227 temp = C2K(sample.temp) if sample.temp else None 228 229 setpoint = B2Pa(sample.setpoint / 100.0) if sample.setpoint else None 230 231 # deco info 232 deco_time = sample.deco_time * 60.0 if sample.deco_depth else None 233 deco_depth = sample.deco_depth if sample.deco_depth else None 234 deco_alarm = False 235 if sample.alarm is not None: 236 deco_alarm = sample.alarm in (2, 3) 237 238 gas = None 239 if sample.current_gas is not None: 240 gas = self._get_gas(header, sample.current_gas) 241 elif sample.gas_set_o2 is not None: 242 gas = kd.gas(sample.gas_set_o2, sample.gas_set_he) 243 244 yield kd.Sample(depth=sample.depth, 245 time=(i * header.sampling), 246 alarm=('deco',) if deco_alarm else None, 247 temp=temp, 248 setpoint=setpoint, 249 setpointby='user' if setpoint else None, 250 deco_time=deco_time, 251 deco_depth=deco_depth, 252 gas=gas) 253 254 yield kd.Sample(depth=0.0, time=(i + 1) * header.sampling)
255 256
257 - def _get_gas(self, header, gas_no):
258 """ 259 Get gas information from OSTC dive header. 260 261 :Parameters: 262 header 263 Dive header information. 264 gas_no 265 Gas number to get (1-6). 266 """ 267 getter = GAS_GETTERS.get(gas_no) 268 if getter is None: 269 raise ValueError('invalid gas mix number') 270 o2, he = getter(header) 271 return kd.gas(o2=o2, he=he)
272 273
274 - def version(self, data):
275 """ 276 Get OSTC model and version information from raw data. 277 """ 278 status = ostc_parser.get_data(data) 279 model = 'OSTC' 280 if status.eeprom.serial > 6999: 281 model = 'OSTC 2C' 282 elif status.eeprom.serial > 2047: 283 model = 'OSTC 2N' 284 elif status.eeprom.serial > 300: 285 model = 'OSTC Mk.2' 286 return '{} {}.{:02}'.format(model, status.ver1, status.ver2)
287 288 289 # vim: sw=4:et:ai 290