Package kenozooid :: Package driver :: Package hwos :: Module raw

Source Code for Module kenozooid.driver.hwos.raw

 1  # 
 2  # Kenozooid - dive planning and analysis toolbox. 
 3  # 
 4  # Copyright (C) 2009-2019 by Artur Wroblewski <wrobell@riseup.net> 
 5  # 
 6  # This program is free software: you can redistribute it and/or modify 
 7  # it under the terms of the GNU General Public License as published by 
 8  # the Free Software Foundation, either version 3 of the License, or 
 9  # (at your option) any later version. 
10  # 
11  # This program is distributed in the hope that it will be useful, 
12  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
14  # GNU General Public License for more details. 
15  # 
16  # You should have received a copy of the GNU General Public License 
17  # along with this program.  If not, see <http://www.gnu.org/licenses/>. 
18  # 
19   
20  """ 
21  Raw data reader for hwOS OSTC dive computer. 
22  """ 
23   
24  from .parser import COMMANDS, dive_headers_data, dive_profile_size, to_int 
25   
26  import logging 
27   
28  logger = logging.getLogger(__name__) 
29   
30 -async def read_data(drv):
31 """ 32 Fetch raw data from hwOS based dive computer. 33 34 :param drv: hwOS dive computer driver. 35 36 .. seealso:: `HWOSDataParser.dump` 37 """ 38 try: 39 await drv.start() 40 data = [await drv.read_data(b, n) for b, n in COMMANDS] 41 assert len(data) == 3 42 43 headers = data[2] 44 assert len(headers) == 65536, [len(v) for v in data] 45 46 profiles = await dive_profiles(drv, headers) 47 data.extend(profiles) 48 49 return b''.join(data) 50 finally: 51 await drv.stop()
52
53 -async def dive_profiles(drv, headers):
54 """ 55 Read raw data of all dive profiles. 56 57 :param drv: hwOS OSTC dive computer driver. 58 :param headers: Dive headers raw data. 59 """ 60 headers = dive_headers_data(headers) 61 sizes = (dive_profile_size(v) for v in headers) 62 return [await dive_profile(drv, k, n) for k, n in enumerate(sizes)]
63
64 -async def dive_profile(drv, k, n):
65 """ 66 Read dive profile raw data of a single dive. 67 68 :param drv: hwOS OSTC dive computer driver. 69 :param k: Dive number. 70 :param n: Size of dive profile data to be read. 71 """ 72 # dive profile data includes header data, so add 256 73 size = n + 256 - 3 74 dive_no = bytes([k]) 75 data = await drv.read_data(b'\x66', size, params=dive_no) 76 77 head = data[:256] 78 profile = data[256:] 79 80 assert head[:2] == b'\xfa\xfa', head[:2] 81 assert head[-2:] == b'\xfb\xfb', head[-2:] 82 assert n == to_int(profile[:3]) 83 assert profile[-2:] == b'\xfd\xfd', profile[-2:] 84 85 return profile
86 87 # vim: sw=4:et:ai 88