1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 OSTC dive computer binary data parsing routines.
22 """
23
24 import logging
25 import re
26 from collections import namedtuple
27 from struct import unpack, calcsize
28 from binascii import hexlify
29
30 log = logging.getLogger('kenozooid.driver.ostc')
31
32
33 Data = namedtuple('Data', 'preamble eeprom voltage ver1 ver2 profiles')
34 FMT_STATUS = '<6s256sHbb'
35 LEN_STATUS = calcsize(FMT_STATUS)
36
37
38 EEPROMData = namedtuple('EEPROMData', 'serial dives data')
39 FMT_EEPROM = '<HH252s'
40
41
42
43
44 RE_PROFILES = re.compile(b'(\xfa\xfa' \
45 b'(\x20.{42}|\x21.{52})\xfb\xfb)' \
46 b'(.+?\xfd\xfd)', re.DOTALL)
47
48
49 DiveHeader = namedtuple('DiveHeader', """\
50 version month day year hour minute max_depth dive_time_m dive_time_s
51 min_temp surface_pressure desaturation
52 gas1_o2 gas1_he gas2_o2 gas2_he gas3_o2 gas3_he gas4_o2 gas4_he gas5_o2 gas5_he
53 gas6_o2 gas6_he gas
54 ver1 ver2 voltage sampling div_temp div_deco div_gf div_ppo2
55 div_deco_debug div_cns salnity max_cns
56 """)
57 FMT_DIVE_HEADER = '<6BHHB' 'HHH' '10B' 'BBB' 'BBHB4B' 'BBBB'
58
59 DiveHeader_191 = namedtuple('DiveHeader_191',
60 DiveHeader._fields + ('avg_depth', 'dive_time_total_s',
61 'gf_lo', 'gf_hi', 'deco_type',
62 'reserved1', 'reserved2', 'reserved3'))
63 FMT_DIVE_HEADER_191 = FMT_DIVE_HEADER + 'HH' 'BB' 'BBBB'
64
65
66 DiveSample = namedtuple('DiveSample', 'depth alarm gas_set_o2 gas_set_he'
67 ' current_gas setpoint temp deco_depth deco_time gf ppo2 cns')
68
69
71 """
72 Get status information and profile raw data, see `Data` named tuple.
73 """
74 dump = Data(*unpack(FMT_STATUS, data[:LEN_STATUS]),
75 profiles=data[LEN_STATUS:])
76 eeprom = EEPROMData(*unpack(FMT_EEPROM, dump.eeprom))
77 dump = dump._replace(eeprom=eeprom)
78 log.debug('unpacked status dump, voltage {}, version {}.{}, serial {}' \
79 .format(dump.voltage, dump.ver1, dump.ver2, dump.eeprom.serial))
80 return dump
81
82
84 """
85 Split profile data into individual dive profiles using profile
86 regular expression `RE_PROFILES`.
87
88 Collection of tuples (header, block) is returned
89
90 header
91 dive profile header
92 block
93 dive profile block data
94
95 """
96 return ((h, p) for h, _, p in RE_PROFILES.findall(data))
97
98
100 """
101 Parse OSTC dive profile header, see `DiveHeader` named tuple.
102 """
103 if len(data) == 47:
104 header = DiveHeader(*unpack(FMT_DIVE_HEADER, data[2:-2]))
105 elif len(data) == 57:
106 header = DiveHeader_191(*unpack(FMT_DIVE_HEADER_191, data[2:-2]))
107 else:
108 raise ValueError('Unknown length of profile header: {}'.format(len(data)))
109
110 log.debug('parsed dive header {0.year:>02d}-{0.month:>02d}-{0.day:>02d}' \
111 ' {0.hour:>02d}:{0.minute:>02d}' \
112 ' max depth={0.max_depth}'.format(header))
113 if header.month == 0:
114 header = header._replace(month=1)
115 log.debug('corrected dive header month info (ver. {0.ver1}.{0.ver2})' \
116 .format(header))
117 return header
118
119
121 """
122 Parse OSTC dive profile data block.
123 """
124 div_temp_s, div_temp_c = divisor(header.div_temp)
125 div_deco_s, div_deco_c = divisor(header.div_deco)
126 div_gf_s, div_gf_c = divisor(header.div_gf)
127 div_ppo2_s, div_ppo2_c = divisor(header.div_ppo2)
128 div_deco_debug_s, div_deco_debug_c = divisor(header.div_deco_debug)
129 div_cns_s, div_cns_c = divisor(header.div_cns)
130
131 log.debug('header divisor values {:x} {:x} {:x} {:x} {:x} {:x}'
132 .format(header.div_temp, header.div_deco, header.div_gf,
133 header.div_ppo2, header.div_deco_debug, header.div_cns))
134
135 dive_total_time = header.dive_time_m * 60 + header.dive_time_s
136
137 i = 0
138 j = 1
139 while i < len(data) - 2:
140 depth = unpack('<H', data[i:i + 2])[0] / 100.0
141 i += 2
142
143
144 pfb = data[i]
145 i += 1
146 size, event = flag_byte(pfb)
147 log.debug('sample {} info: depth = {:.2f}, pfb = {:x}, size = {}, ' \
148 'data: {}'.format(j, depth, pfb, size, hexlify(data[i:i + size])))
149
150 alarm = None
151 gas_set = 0
152 gas_set_o2 = None
153 gas_set_he = None
154 gas_change = 0
155 current_gas = None
156 setpoint_change = 0
157 setpoint = None
158
159 if event:
160 v = data[i]
161 i += 1
162 alarm = v & 0x0f
163 gas_set = v & 0x10
164 gas_change = v & 0x20
165 setpoint_change = v & 0x40
166
167 log.debug('alarm = {}, gas_set = {}, gas_change = {},' \
168 ' setpoint_change = {}'.format(alarm, gas_set, gas_change,
169 setpoint_change))
170
171 if gas_set:
172 gas_set_o2 = data[i]
173 gas_set_he = data[i + 1]
174 i += 2
175 gas_set = 2
176
177 if gas_change:
178 current_gas = data[i]
179 i += 1
180 gas_change = 1
181
182 div_bytes = 0
183
184 temp = sample_data(data, i, j, div_temp_s, div_temp_c)
185 if temp is not None:
186 assert len(temp) == div_temp_c == 2
187 temp = unpack('<H', temp)[0] / 10.0
188 i += div_temp_c
189 div_bytes += div_temp_c
190
191 deco = sample_data(data, i, j, div_deco_s, div_deco_c)
192 if deco is not None:
193 assert len(deco) == div_deco_c
194 deco_depth, deco_time = deco
195 i += div_deco_c
196 div_bytes += div_deco_c
197 log.debug('deco time {}, depth {}'.format(deco_time, deco_depth))
198 else:
199 deco_depth, deco_time = None, None
200
201 gf = sample_data(data, i, j, div_gf_s, div_gf_c)
202 if gf is not None:
203 i += div_gf_c
204 div_bytes += div_gf_c
205
206 ppo2 = sample_data(data, i, j, div_ppo2_s, div_ppo2_c)
207 if ppo2 is not None:
208 i += div_ppo2_c
209 div_bytes += div_ppo2_c
210 log.debug('ppo2 {}'.format(hexlify(ppo2)))
211
212 deco_debug = sample_data(data, i, j, div_deco_debug_s, div_deco_debug_c)
213 if deco_debug is not None:
214 i += div_deco_debug_c
215 div_bytes += div_deco_debug_c
216 log.debug('deco debug {}'.format(hexlify(deco_debug)))
217
218 cns = sample_data(data, i, j, div_cns_s, div_cns_c)
219 if cns is not None:
220 i += div_cns_c
221 div_bytes += div_cns_c
222 log.debug('cns {}'.format(hexlify(cns)))
223
224 if setpoint_change:
225 setpoint = data[i]
226 i += 1
227 setpoint_change = 1
228 log.debug('setpoint change {}'.format(setpoint))
229
230 if size != event + gas_set + gas_change + setpoint_change + div_bytes:
231 log.debug('invalid dive data, sample = {}, depth = {:.2f},' \
232 ' pfb = {:x}, size = {}, event = {}, alarm = {}, temp = {},' \
233 ' gas_set = {}, gas_change = {}, cns = {},' \
234 ' setpoint_change = {}, setpoint = {},'
235 ' div_bytes = {}, deco_debug = {}'
236 .format(j, depth, pfb, size, event, alarm, temp, gas_set,
237 gas_change, cns, setpoint_change, setpoint,
238 div_bytes,
239 hexlify(deco_debug) if deco_depth else []))
240 raise ValueError('Invalid dive')
241
242
243 if header.sampling * (j - 1) <= dive_total_time:
244 yield DiveSample(depth, alarm, gas_set_o2, gas_set_he, current_gas,
245 setpoint, temp, deco_depth, deco_time, gf, ppo2, cns)
246 else:
247 log.debug('skipped sample {} (out of dive time), seek {}'
248 .format(j, i))
249 j += 1
250
251 assert data[i:i + 2] == b'\xfd\xfd'
252
253
254 -def sample_data(data, i, sample, div_sample, div_count):
255 """
256 Parse sample item like temperature, deco, etc.
257
258 :Parameters:
259 data
260 Profile block data.
261 i
262 Profile block data index, where sample item can be found.
263 sample
264 Number of dive sample (starts from 1).
265 div_sample
266 Divisor sampling information.
267 div_count
268 Sample item data bytes count.
269 """
270 v = None
271 if div_sample and sample % div_sample == 0:
272 v = data[i:i + div_count]
273 return v
274
275
277 """
278 Split divisor value into divisor sample information and divisor
279 byte count.
280 """
281 return value & 0b1111, value >> 4
282
283
285 """
286 Split profile flag byte into
287
288 - amount of additional bytes of extended information
289 - event byte presence, which is zero or one
290
291 """
292 return value & 0x7f, value >> 7
293
294
295
296