Package kenozooid :: Package driver :: Package hwos :: Module api

Source Code for Module kenozooid.driver.hwos.api

  1  # 
  2  # Kenozooid - dive planning and analysis toolbox. 
  3  # 
  4  # Copyright (C) 2009-2019 by Artur Wroblewski <wrobell@riseup.net> 
  5  # 
  6  # This program is free software: you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation, either version 3 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, 
 12  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 14  # GNU General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program.  If not, see <http://www.gnu.org/licenses/>. 
 18  # 
 19   
 20  """ 
 21  Implementation of Kenozooid driver API for hwOS family dive computers. 
 22  """ 
 23   
 24  import asyncio 
 25  import btzen 
 26  import logging 
 27   
 28  import kenozooid.component as kc 
 29  from kenozooid.driver import DeviceDriver, DataParser, DeviceError 
 30  from . import parser 
 31  from .raw import read_data 
 32   
 33  logger = logging.getLogger(__name__) 
 34   
 35  MODELS = sorted(set(parser.MODEL.values())) 
36 37 @kc.inject(DeviceDriver, id='hwos', name='hwOS Driver', models=MODELS) 38 -class HWOSDriver:
39 """ 40 Driver for hwOS family OSTC dive computers. 41 """
42 - def __init__(self, port):
43 self._device = btzen.Serial(port)
44
45 - async def start(self):
46 await self._device.write(b'\xbb') 47 value = await self._device.read(2) 48 assert value == b'\xbb\x4d'
49
50 - async def read_data(self, cmd, n, params=None):
51 """ 52 Read data from hwOS OSTC dive computer. 53 54 A command parameters can be set. Parameters are sent before data is 55 read. 56 57 The implementation allows to follow hwOS OSTC dive computer 58 protocol stricly. For example for `0x66` command 59 60 - get echo byte first 61 - then send the dive number 62 63 If above protocol is not followed, then transfer will occasionally 64 hang. 65 """ 66 assert len(cmd) == 1 67 68 dev = self._device 69 70 # write command and read echo 71 await dev.write(cmd) 72 value = await dev.read(1) 73 assert value == cmd 74 75 if params: 76 await dev.write(params) 77 78 # read: echo and 0x4d marker 79 data = await dev.read(n + 1) 80 assert data[-1:] == b'\x4d' 81 return data[:-1]
82
83 - async def stop(self):
84 await self._device.write(b'\xff')
85 86 @staticmethod
87 - def scan(port=None):
88 """ 89 Look for hwOS based dive computer connected via Bluetooth. 90 91 NOTE: Just connects to a dive computer using given MAC address. 92 93 :param port: Bluetooth MAC address of a dive computer. 94 """ 95 drv = HWOSDriver(port) 96 logger.debug('hwos based ostc driver for {}'.format(port)) 97 yield drv
98 99 @kc.inject(DataParser, id='hwos', data=('gas',))
100 -class HWOSDataParser(object):
101 """ 102 Dive data parser for hwOS dive computer. 103 104 The data format is 105 106 - version and identity, command 0x69, 64 bytes 107 - hardware and features, command 0x60, 5 bytes 108 - dive headers, comman 0x61, 65536 bytes 109 - dive profiles, command 0x66, variable size depends on dive headers 110 """
111 - def dump(self):
112 """ 113 Download hwOS based dive computer raw data. 114 """ 115 loop = asyncio.get_event_loop() 116 manager = btzen.ConnectionManager() 117 try: 118 manager.add(self.driver._device) 119 120 task = asyncio.wait( 121 [manager, read_data(self.driver)], 122 return_when=asyncio.FIRST_COMPLETED, 123 ) 124 done, _ = loop.run_until_complete(task) 125 assert len(done) == 1 126 data = done.pop().result() 127 return data 128 finally: 129 manager.close()
130
131 - def version(self, data):
132 data = parser.raw_data(data) 133 model, major, minor = parser.model_version(data) 134 return '{} {}.{:02}'.format(model, major, minor)
135
136 - def dives(self, data):
137 """ 138 Convert dive data into UDDF format. 139 """ 140 return parser.parse_dives(data)
141 142 # vim: sw=4:et:ai 143