#
# DecoTengu - dive decompression library.
#
# Copyright (C) 2013-2018 by Artur Wroblewski <wrobell@riseup.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
DecoTengu dive decompression engine.
[mpdfd] Powell, Mark. Deco for Divers, United Kingdom, 2010
"""
from collections import namedtuple, OrderedDict
import math
import operator
import logging
from .model import ZH_L16B_GF
from .error import ConfigError, EngineError
from .ft import recurse_while, bisect_find
from .flow import coroutine
from . import const
logger = logging.getLogger(__name__)
[docs]class Phase(object):
"""
Dive phase enumeration.
The dive phases are
START
Start of a dive. It happens at begining of the dive (time=0min,
depth=0min). Only one dive step can exist with such dive phase.
DESCENT
Descent during dive - current dive step is deeper than previous one.
CONST
Constant depth during dive - current dive step is at the same depth as
previous one.
ASCENT
Ascent during dive - current dive step is shallower than previous one.
DECO_STOP
Decompression stop. Current dive step is at the same depth as previous
one and ascent is not possible until allowed by decompression model.
GAS_SWITCH
Gas mix switch. Current dive step is at the same depth as previous
one. The time of current and previous dive steps is the same.
"""
START = 'start'
DESCENT = 'descent'
CONST = 'const'
ASCENT = 'ascent'
DECO_STOP = 'deco_stop'
GAS_SWITCH = 'gas_switch'
Step = namedtuple('Step', 'phase abs_p time gas data')
Step.__repr__ = lambda s: 'Step(phase="{}", abs_p={:.4f}, time={:.4f},' \
' gf={:.4f})'.format(s.phase, s.abs_p, s.time, s.data.gf)
Step.__doc__ = """
Dive step information.
:var phase: Dive phase.
:var abs_p: Absolute pressure at depth [bar].
:var time: Time of dive [min].
:var gas: Gas mix configuration.
:var data: Decompression model data.
"""
GasMix = namedtuple('GasMix', 'depth o2 n2 he')
GasMix.__doc__ = """
Gas mix configuration.
:var depth: Gas mix switch depth.
:var o2: O2 percentage.
:var n2: N2 percentage.
:var he: Helium percentage.
"""
DecoStop = namedtuple('DecoStop', 'depth time')
DecoStop.__doc__ = """
Dive decompression stop information.
:var depth: Depth of decompression stop [m].
:var time: Length of decompression stops [min].
"""
[docs]class Engine(object):
"""
DecoTengu decompression engine.
Use decompression engine to calculate dive profile and decompression
information.
:var model: Decompression model.
:var surface_pressure: Surface pressure [bar].
:var ascent_rate: Ascent rate during a dive [m/min].
:var descent_rate: Descent rate during a dive [m/min].
:var last_stop_6m: If true, then last deco stop is at 6m (not default 3m).
:var deco_table: List of decompression stops.
:var _gas_list: List of gas mixes.
:var _deco_stop_search_time: Time limit for decompression stop linear
search.
"""
def __init__(self):
super().__init__()
self.model = ZH_L16B_GF()
self.surface_pressure = const.SURFACE_PRESSURE
self.ascent_rate = 10.0
self.descent_rate = 20.0
self.last_stop_6m = False
self.deco_table = DecoTable()
self._gas_list = []
self._travel_gas_list = []
self._deco_stop_search_time = const.DECO_STOP_SEARCH_TIME
self._meter_to_bar = const.METER_TO_BAR
self._p3m = 3 * const.METER_TO_BAR
[docs] def _to_pressure(self, depth):
"""
Convert depth in meters to absolute pressure in bars.
:param depth: Depth in meters.
"""
return depth * self._meter_to_bar + self.surface_pressure
[docs] def _to_depth(self, abs_p):
"""
Convert absolute pressure to depth.
:param abs_p: Absolute pressure of depth [bar].
"""
depth = (abs_p - self.surface_pressure) / self._meter_to_bar
return round(depth, const.SCALE)
[docs] def _time_to_pressure(self, time, rate):
"""
Convert time into pressure change using depth change rate.
:param time: Time [min].
:param rate: Rate of depth change [m/min].
"""
return time * rate * self._meter_to_bar
[docs] def _pressure_to_time(self, pressure, rate):
"""
Convert pressure change into time using depth change rate.
The returned time is in minutes.
:param pressure: Pressure change [bar].
:param rate: Rate of depth change [m/min].
"""
return pressure / rate / self._meter_to_bar
[docs] def _ceil_pressure_3m(self, abs_p):
"""
Calculate absolute pressure value, so when converted to meters its
value is divisible by 3.
:param abs_p: Input absolute pressure [bar].
"""
v = math.ceil((abs_p - self.surface_pressure) / self._p3m)
return v * self._p3m + self.surface_pressure
[docs] def _n_stops(self, start_abs_p, end_abs_p=None):
"""
Calculate amount of decompression stops required between start and
end depths.
:param start_abs_p: Absolute pressure of starting depth.
:param end_abs_p: Absolute pressure of ending depth (surface
pressure if null).
"""
if end_abs_p is None:
end_abs_p = self.surface_pressure
k = (start_abs_p - end_abs_p) / self._p3m
return round(k)
[docs] def _inv_limit(self, abs_p, data):
"""
Return true if decompression model data does not violate
decompression model ceiling limit invariant.
The invariant is
Absolute pressure (depth) has to be deeper or at the same depth
as absolute pressure of ceiling limit.
:param abs_p: Absolute pressure of current depth.
:param data: Decompression model data.
"""
return abs_p >= self.model.ceiling_limit(data)
[docs] def _can_ascend(self, abs_p, time, data, gf=None):
"""
Check if a diver can ascend from current depth without violating
ascent ceiling limit.
:param abs_p: Absolute pressure of current depth [bar].
:param time: Time of ascent [min].
:param data: Decompression model data.
:param gf: Gradient factor to be used for ceiling check.
"""
p = abs_p - self._time_to_pressure(time, self.ascent_rate)
return p >= self.model.ceiling_limit(data, gf=gf)
[docs] def _step_start(self, abs_p, gas):
"""
Create the very first dive step.
The first step is initialized with decompression data calculated
for surface.
The dive starting depth is usually surface, but any depth can be
specified, i.e. when descent part of the dive is to be skipped.
:param abs_p: Absolute pressure of dive starting depth.
:param gas: Gas mix configuration.
"""
data = self.model.init(self.surface_pressure)
step = Step(Phase.START, abs_p, 0, gas, data)
return step
[docs] def _step_next(self, step, time, gas, phase='const'):
"""
Calculate next dive step at constant depth and advanced by
specified amount of time.
:param step: Current dive step.
:param time: Time spent at current depth [min].
:param gas: Gas mix configuration.
:param data: Decompression model data.
:param phase: Dive phase.
"""
data = self._tissue_pressure_const(step.abs_p, time, gas, step.data)
return Step(phase, step.abs_p, step.time + time, gas, data)
[docs] def _step_next_descent(self, step, time, gas, phase='descent'):
"""
Calculate next dive step when descent is performed for specified
period of time.
:param step: Current dive step.
:param time: Time to descent from current dive step [min].
:param gas: Gas mix configuration.
:param phase: Dive phase.
"""
data = self._tissue_pressure_descent(step.abs_p, time, gas, step.data)
pressure = step.abs_p + self._time_to_pressure(time, self.descent_rate)
return Step(phase, pressure, step.time + time, gas, data)
[docs] def _step_next_ascent(self, step, time, gas, gf=None, phase='ascent'):
"""
Calculate next dive step when ascent is performed for specified
period of time.
FIXME: due to ``gf`` parameter this method is deco model dependant,
this has to be improved
:param step: Current dive step.
:param time: Time to ascent from current dive step [min].
:param gas: Gas mix configuration.
:param data: Decompression model data.
:param phase: Dive phase.
"""
data = self._tissue_pressure_ascent(step.abs_p, time, gas, step.data)
pressure = step.abs_p - self._time_to_pressure(time, self.ascent_rate)
if gf is not None:
# FIXME: make it model independent
data = data._replace(gf=gf)
return Step(phase, pressure, step.time + time, gas, data)
[docs] def _tissue_pressure_const(self, abs_p, time, gas, data):
"""
Calculate tissues gas loading after exposure for specified amount
of time at depth.
:param abs_p: Absolute pressure indicating the depth [bar].
:param time: Time at depth [min].
:param gas: Gas mix configuration.
:param data: Decompression model data.
"""
return self.model.load(abs_p, time, gas, 0, data)
[docs] def _tissue_pressure_descent(self, abs_p, time, gas, data):
"""
Calculate tissues gas loading after descent.
:param abs_p: Starting pressure indicating the depth [bar].
:param time: Time of descent [min].
:param gas: Gas mix configuration.
:param data: Decompression model data.
"""
rate = self.descent_rate * self._meter_to_bar
data = self.model.load(abs_p, time, gas, rate, data)
return data
[docs] def _tissue_pressure_ascent(self, abs_p, time, gas, data):
"""
Calculate tissues gas loading after ascent.
:param abs_p: Starting pressure indicating the depth [bar].
:param time: Time of ascent [min].
:param gas: Gas mix configuration.
:param data: Decompression model data.
"""
rate = -self.ascent_rate * self._meter_to_bar
tp = self.model.load(abs_p, time, gas, rate, data)
return tp
[docs] def _switch_gas(self, step, gas):
"""
Switch gas mix.
The switch results in new dive step.
"""
step = step._replace(phase=Phase.GAS_SWITCH, gas=gas)
if __debug__:
logger.debug('switched to gas mix {} at {}'.format(gas, step))
return step
[docs] def _dive_descent(self, abs_p, gas_list):
"""
Dive descent from surface to absolute pressure of destination
depth.
The last gas on the gas mix list is bottom gas, others are travel
gas mixes.
:param abs_p: Absolute pressure of destination depth.
:param gas_list: List of gas mixes - travel and bottom gas mixes.
"""
gas = gas_list[0]
step = self._step_start(self.surface_pressure, gas)
yield step
stages = self._descent_stages(abs_p, gas_list)
for i, (depth, gas) in enumerate(stages):
if i > 0: # perform gas switch
step = self._switch_gas(step, gas)
yield step
time = self._pressure_to_time(depth - step.abs_p, self.descent_rate)
step = self._step_next_descent(step, time, gas)
yield step
last = gas_list[-1]
if abs(step.abs_p - self._to_pressure(last.depth)) < const.EPSILON:
assert gas != last
step = self._switch_gas(step, last)
yield step
logger.debug('descent finished at {:.4f}bar'.format(step.abs_p))
[docs] def _dive_ascent(self, start, gas_list):
"""
Dive ascent from starting dive step.
The method checks if the ascent is part of NDL dive before dive
ascent starts.
If dive is decompression dive, then ascent is divided into two
phases
- ascent to first decompression stop
- ascent performing decompression stops
:param start: Starting dive step.
:param gas_list: List of gas mixes - bottom and decompression gas
mixes.
"""
# check if ndl dive
bottom_gas = gas_list[0]
step = self._ndl_ascent(start, bottom_gas)
if step:
yield step
return
step = start
stages = self._free_ascent_stages(gas_list)
for step in self._free_staged_ascent(step, stages):
yield step
# we should not arrive at the surface - it is non-ndl dive at this
# stage
assert not abs(step.abs_p - self.surface_pressure) < const.EPSILON
stages = self._deco_ascent_stages(step.abs_p, gas_list)
yield from self._deco_staged_ascent(step, stages)
[docs] def _ndl_ascent(self, start, gas):
"""
Check if NDL ascent to the surface is possible from starting dive
step.
Return the surface dive step if NDL ascent is possible, null
otherwise.
NDL ascent is performed to the surface usually using bottom gas
(NOTE: not always possible - exceptions not implemented yet).
To calculate surface dive step, the surface decompression model
parameters are applied, i.e. for ZH-L16-GF decompression model,
gradient factor value is set to GF high parameter.
:param start: Starting dive step.
:param gas: Gas mix used during NDL ascent.
"""
# FIXME: method is decompression model dependant
gf = self.model.gf_high
p = start.abs_p - self.surface_pressure
time = self._pressure_to_time(p, self.ascent_rate)
step = self._step_next_ascent(start, time, gas, gf=gf)
limit = self.model.ceiling_limit(step.data, gf)
if step.abs_p < limit:
step = None
if __debug__:
logger.debug('deco dive')
else:
if __debug__:
logger.debug('ndl dive')
return step
[docs] def _find_first_stop(self, start, abs_p, gas):
"""
Find first first decompression stop using Schreiner equation.
Method returns dive step - start of first decompression stop.
Below, by depth we mean absolute pressure of depth expressed in
bars.
The depth of first decompression stop is the shallowest depth,
which does not breach the ascent limit imposed by ascent ceiling.
The depth is divisble by 3.
The first decompression stop depth is found by ascending to
adjusted value of current ascent ceiling limit. The current ascent
ceiling value is always adjusted, so its value in meters is
divisible by 3. The ascent is repeated while it is possible to do
so and until target depth.
:param start: Starting dive step indicating current depth.
:param abs_p: Absolute pressure of target depth - surface or gas
switch depth.
:param gas: Gas mix configuration.
"""
assert start.abs_p > abs_p, '{} vs. {}'.format(start.abs_p, abs_p)
assert self._to_depth(abs_p) % 3 == 0, self._to_depth(abs_p)
model = self.model
step = start
limit = model.ceiling_limit(step.data, step.data.gf)
limit = self._ceil_pressure_3m(limit)
limit = max(abs_p, limit)
t = self._pressure_to_time(step.abs_p - limit, self.ascent_rate)
if __debug__:
logger.debug(
'find first stop: check ascent from {}bar by {}min to {}bar (start)'
.format(step.abs_p, t, limit)
)
while step.abs_p > limit and step.abs_p > abs_p:
step = self._step_next_ascent(step, t, gas)
limit = model.ceiling_limit(step.data, step.data.gf)
limit = self._ceil_pressure_3m(limit)
limit = max(abs_p, limit)
t = self._pressure_to_time(step.abs_p - limit, self.ascent_rate)
if __debug__:
logger.debug(
'find first stop: check ascent from {}bar by {}min to {}bar'
.format(step.abs_p, t, limit)
)
stop = step
if __debug__:
depth = self._to_depth(stop.abs_p)
assert depth % 3 == 0, \
'Invalid first stop depth pressure {}bar ({}m)' \
.format(stop.abs_p, depth)
if start is stop:
logger.debug('find first stop: at first deco stop already')
elif stop.abs_p > abs_p:
limit = self.model.ceiling_limit(stop.data)
logger.debug(
'find first stop: found at {}m ({}bar), ascent time={},'
' limit={}'.format(
depth, stop.abs_p, stop.time - start.time, limit
)
)
else:
logger.debug('find first stop: no decompression stop found')
assert stop.abs_p - abs_p > -const.EPSILON, stop
return stop
[docs] def _descent_stages(self, end_abs_p, gas_list):
"""
Calculate stages for dive descent.
Descent stage is a tuple
- absolute pressure of destination depth
- gas mix
The descent stages are calculated using gas mix list. The absolute
pressure of destination depth is switch depth of next gas mix
absolute pressure of destination depth, for
example for `end_abs_p = 6.6bar`::
0m 30% 4.6bar (36m) 30%
36m 21% -> 6.6bar (56m) 21%
If switch depth of last gas mix is equal to the destination depth,
then descent stage is not included for it. It means that descent
is performed to the bottom on travel gas only and it is
responsbility of the caller to perform appropriate bottom gas
switch.
:param end_abs_p: Absolute pressure of destination depth.
:param gas_list: List of gas mixes - travel and bottom gas mixes.
"""
mixes = zip(gas_list[:-1], gas_list[1:])
_pressure = lambda mix: self._to_pressure(mix.depth)
yield from ((_pressure(m2), m1) for m1, m2 in mixes)
last = gas_list[-1]
if abs(_pressure(last) - end_abs_p) > 0:
yield (end_abs_p, last)
[docs] def _free_ascent_stages(self, gas_list):
"""
Calculate stages for deco-free ascent.
Ascent stage is a tuple
- absolute pressure of destination depth
- gas mix
The ascent stages are calculated using gas mix list. The absolute
pressure of destination depth is gas switch depth rounded up to
multiply of 3m and then converted to pressure, for example::
0m 21% 3.4bar (24m) 21%
22m 50% -> 1.6bar (6m) 50%
6m 100% 1.0bar (0m) 100%
:param gas_list: List of gas mixes - bottom and decompression gas
mixes.
"""
mixes = zip(gas_list[:-1], gas_list[1:])
_pressure = lambda mix: \
self._to_pressure(((mix.depth - 1) // 3 + 1) * 3)
yield from ((_pressure(m2), m1) for m1, m2 in mixes)
yield (self.surface_pressure, gas_list[-1])
[docs] def _deco_ascent_stages(self, start_abs_p, gas_list):
"""
Calculate stages for decompression ascent.
Ascent stage is a tuple
- absolute pressure of destination depth
- gas mix
The ascent stages are calculated using gas mix list. The absolute
pressure of destination depth is gas switch depth rounded down to
multiply of 3m and then converted to pressure, for example::
0m 21% 3.1bar (21m) 21%
22m 50% -> 1.6bar (6m) 50%
6m 100% 1.0bar (0m) 100%
Only gas mixes, which switch depth is shallower than start depth,
are used for decompression ascent stages calculation.
:param gas_list: List of gas mixes - bottom and decompression gas
mixes.
:param start_abs_p: Absolute pressure of decompression start depth.
"""
assert start_abs_p > self.surface_pressure
mixes = zip(gas_list[:-1], gas_list[1:])
_pressure = lambda mix: self._to_pressure(mix.depth // 3 * 3)
yield from (
(_pressure(m2), m1) for m1, m2 in mixes
if self._to_pressure(m2.depth) < start_abs_p
)
yield (self.surface_pressure, gas_list[-1])
[docs] def _validate_gas_list(self, depth):
"""
Validate gas mix list.
`ConfigError` is raised if any of gas mix rules are violated.
The gas mix rules are
#. There is one non-travel gas mix on gas mix list.
#. If no travel gas mixes, then first gas mix is bottom gas and its
switch depth is 0m.
#. All travel gas mixes have different switch depth.
#. All decompression gas mixes have different switch depth.
#. All decompression gas mixes have switch depth greater than zero.
#. There is no gas mix with switch depth deeper than maximum dive
depth.
:param depth: Maximum dive depth.
"""
if not self._gas_list:
raise ConfigError('No bottom gas mix configured')
if not self._travel_gas_list and self._gas_list[0].depth != 0:
raise ConfigError('Bottom gas mix switch depth is not 0m')
k = len(self._travel_gas_list)
depths = (m.depth for m in self._travel_gas_list)
if k and len(set(depths)) != k:
raise ConfigError(
'Two or more travel gas mixes have the same switch depth'
)
k = len(self._gas_list[1:])
depths = [m.depth for m in self._gas_list[1:]]
if len(set(depths)) != k:
raise ConfigError(
'Two or more decompression gas mixes have the same'
' switch depth'
)
if any(d == 0 for d in depths):
raise ConfigError('Decompression gas mix switch depth is 0m')
mixes = self._gas_list + self._travel_gas_list
mixes = [m for m in mixes if m.depth > depth]
if mixes:
raise ConfigError(
'Gas mix switch depth deeper than maximum dive depth'
)
[docs] def _ascent_switch_gas(self, step, gas):
"""
Switch to specified gas mix, ascending if necessary.
The method is used to switch gas during dive ascent when ascent is
performed to depth being multiply of 3m. Two scenarios are
implemented
#. Gas mix switch depth is the same as current dive step depth,
then simply perform gas mix switch.
#. Gas mix switch depth is shallower than current dive step depth
- ascend to gas mix switch depth
- perform gas mix switch
- ascend to next depth, which is multiply of 3m
Gas mix switch is done in place, takes no time at the moment, but
in the future this should be configurable.
A tuple of gas mix switch dive steps is returned.
:param step: Current dive step.
:param gas: Gas to switch to.
"""
gp = self._to_pressure(gas.depth)
logger.debug('ascent gas switch to {} at {}bar'.format(gas, step.abs_p))
assert step.abs_p - gp < self._p3m
if abs(step.abs_p - gp) < const.EPSILON:
steps = (self._switch_gas(step, gas),)
else:
assert step.abs_p > gp
time = self._pressure_to_time(step.abs_p - gp, self.ascent_rate)
s1 = self._step_next_ascent(step, time, step.gas)
s2 = self._switch_gas(s1, gas)
p = self._to_pressure(gas.depth // 3 * 3)
time = self._pressure_to_time(s2.abs_p - p, self.ascent_rate)
s3 = self._step_next_ascent(s2, time, gas)
steps = (s1, s2, s3)
return steps
[docs] def _free_staged_ascent(self, start, stages):
"""
Perform staged ascent until first decompression stop.
:param start: Starting dive step.
:param stages: Dive stages.
.. seealso:: :func:`decotengu.Engine._free_ascent_stages`
"""
step = start
for depth, gas in stages:
if step.gas != gas: # first step might not need gas switch
# if gas switch drives us into deco zone, then stop ascent
# leaving `step` as first decompression stop
if __debug__:
logger.debug('attempt to switch gas {} at {}'.format(gas, step))
gs_steps = self._ascent_switch_gas(step, gas)
if self._inv_limit(gs_steps[-1].abs_p, gs_steps[-1].data):
step = gs_steps[-1]
yield from gs_steps
if __debug__:
logger.debug('gas switch performed')
else:
if __debug__:
logger.debug('gas switch into deco zone, revert')
break
# check if there is first decompression stop at this ascent
# stage
s = self._find_first_stop(step, depth, gas)
if s is step:
break # already at deco zone
else:
step = s
yield step
if abs(step.abs_p - depth) > const.EPSILON: # deco stop found
break
# else: at target depth of ascent stage without deco stop,
# so move to next stage
[docs] def _deco_staged_ascent(self, start, stages):
"""
Perform staged asccent within decompression zone.
:param start: Starting dive step.
:param stages: Dive stages.
.. seealso:: :func:`decotengu.Engine._ascent_stages_deco`
"""
if __debug__:
depth = self._to_depth(start.abs_p)
assert depth % 3 == 0 and depth > 0, depth
bottom_gas = self._gas_list[0]
stages = self._deco_stops(start, stages)
step = start
for depth, gas, time, gf in stages:
# switch gas
if step.abs_p >= self._to_pressure(gas.depth) and gas != bottom_gas:
for step in self._ascent_switch_gas(step, gas):
yield step
# execute deco stop
end = self._deco_stop(step, time, gas, gf)
self.deco_table.append(
self._to_depth(step.abs_p),
end.time - step.time
)
step = end
yield step
# ascend to next deco stop
step = self._step_next_ascent(step, time, gas, gf=gf)
yield step
if __debug__:
logger.debug('deco engine: gf at surface={:.4f}'.format(step.data.gf))
[docs] def _deco_stops(self, step, stages):
"""
Calculate collection of decompression stops.
The method returns collection of tuples
- destination depth (see :func:`decotengu.Engine._deco_ascent_stages`
method)
- gas mix (see :func:`decotengu.Engine._deco_ascent_stages` method)
- time required to ascent to next decompression stops (usually time
required to ascent by 3m)
- gradient factor value for next decompression stop or surface
:param step: Current dive step.
:param stages: Decompression ascent stages.
.. seealso:: :func:`decotengu.Engine._deco_ascent_stages`
"""
k = self._n_stops(step.abs_p)
gf_step = (self.model.gf_high - self.model.gf_low) / k
ts_3m = self._pressure_to_time(self._p3m, self.ascent_rate)
gf = step.data.gf
if __debug__:
logger.debug('deco engine: gf step={:.4}'.format(gf_step))
abs_p = step.abs_p
stop_at_6m = self.surface_pressure + 2 * self._p3m
ls_6m = self.last_stop_6m
for depth, gas in stages:
n = self._n_stops(abs_p, depth)
for k in range(n):
gf += gf_step
if ls_6m and abs(abs_p - k * self._p3m - stop_at_6m) < const.EPSILON:
yield depth, gas, 2 * ts_3m, gf + gf_step
assert abs(self.model.gf_high - gf - gf_step) < const.EPSILON
break
else:
yield depth, gas, ts_3m, gf
abs_p = depth
[docs] def _deco_stop(self, step, next_time, gas, gf):
"""
Calculate decompression stop.
The length of a decompression stop is guarded by gradient factor of
next decompression stop - the current decompression stop lasts
until it is allowed to ascent to next stop.
:param step: Start of current decompression stop.
:param next_time: Time required to ascent to next deco stop [min].
:param gas: Gas mix configuration.
:param gf: Gradient factor value of next decompression stop.
"""
if __debug__:
depth = self._to_depth(step.abs_p)
logger.debug('deco stop: calculate at {}m'.format(depth))
assert depth % 3 == 0 and depth > 0, depth
# there are a lot of 1 minute deco stops, so check if we can ascend
# after 1 minute first; otherwise continue searching for the
# decompression stop length
data = self._tissue_pressure_const(
step.abs_p, const.MINUTE, gas, step.data
)
if self._can_ascend(step.abs_p, next_time, data, gf):
return Step(
Phase.DECO_STOP, step.abs_p, step.time + const.MINUTE, gas, data
)
max_time = self._deco_stop_search_time
# next_f(arg=(time, data)): (time, data) <- track both time and deco
# data
next_f = lambda time, data: (
time + max_time,
self._tissue_pressure_const(step.abs_p, max_time, gas, data)
)
inv_f = lambda time, data: \
not self._can_ascend(step.abs_p, next_time, data, gf)
time, data = recurse_while(inv_f, next_f, const.MINUTE, data)
if __debug__:
logger.debug(
'deco stop: linear find finished after {}min'.format(time)
)
logger.debug('deco stop: deco data {}'.format(data))
# start with `data` returned by `recurse_while`, so no need to add
# `time`
next_f = lambda k: self._tissue_pressure_const(step.abs_p, k, gas, data)
# should we stay at deco stop?
exec_deco_stop = lambda k: \
not self._can_ascend(step.abs_p, next_time, next_f(k), gf)
# ascent is possible after self._deco_stop_search_time, so
# check for self._deco_stop_search_time - 1
n = self._deco_stop_search_time - 1
k = bisect_find(n, exec_deco_stop)
k += 1 # at k diver should still stay at deco stop as
# exec_deco_stop is true - ascent minute later
# final time of a deco stop
time = time + k
if __debug__:
logger.debug(
'deco stop: search completed {}bar, {}min, n2={.n2}%,'
' gf={:.4}, next gf={:.4}'.format(
step.abs_p, time, gas, step.data.gf, gf
))
assert time % 1 == 0 and time > 0, time
step = self._step_next(step, time, gas, phase=Phase.DECO_STOP)
return step
[docs] def add_gas(self, depth, o2, he=0, travel=False):
"""
Add gas mix to the gas mix list.
First non-travel gas mix is bottom gas mix. Any other non-travel
gas mix is decompression gas mix.
See :func:`decotengu.engine.Engine._validate_gas_list` method
documentation for more gas mix list rules.
:param depth: Switch depth of gas mix.
:param o2: O2 percentage, i.e. 80.
:param he: Helium percentage, i.e. 18.
:param travel: Travel gas mix if true.
.. seealso:: :func:`decotengu.Engine._validate_gas_list`
"""
if travel:
self._travel_gas_list.append(GasMix(depth, o2, 100 - o2 - he, he))
else:
self._gas_list.append(GasMix(depth, o2, 100 - o2 - he, he))
[docs] def calculate(self, depth, time, descent=True):
"""
Start dive profile calculation for specified dive depth and bottom
time.
The method returns an iterator of dive steps.
Before the calculation the gas mix list is validated. See
:func:`decotengu.engine.Engine._validate_gas_list` method
documentation for the list of gas mix list rules.
:param depth: Maximum depth [m].
:param time: Dive bottom time [min].
:param descent: Skip descent part of a dive if set to false.
.. seealso:: :func:`decotengu.Engine._validate_gas_list`
.. seealso:: :func:`decotengu.Engine.add_gas`
"""
del self.deco_table[:]
self._validate_gas_list(depth)
# prepare travel and bottom gas mixes
depth_key = operator.attrgetter('depth')
bottom_gas = self._gas_list[0]
gas_list = sorted(self._travel_gas_list, key=depth_key)
gas_list.append(bottom_gas)
abs_p = self._to_pressure(depth)
if descent:
for step in self._dive_descent(abs_p, gas_list):
yield step
else:
step = self._step_start(abs_p, bottom_gas)
yield step
# prepare decompression gases, first gas mix is assumed to be
# bottom gas mix
gas_list = sorted(self._gas_list[1:], key=depth_key, reverse=True)
gas_list.insert(0, bottom_gas)
t = time - step.time
if t <= 0:
raise EngineError('Bottom time shorter than descent time')
if __debug__:
logger.debug(
'bottom time {}min (descent is {}min)'.format(t, step.time)
)
assert t > 0
step = self._step_next(step, t, bottom_gas)
yield step
yield from self._dive_ascent(step, gas_list)
[docs]class DecoTable(list):
"""
Decompression table summary.
The class is a list of decompression stops.
The decompression stops time is in minutes.
.. seealso:: :class:`decotengu.engine.DecoStop`
"""
@property
def total(self):
"""
Total decompression time.
"""
return sum(s.time for s in self)
[docs] def append(self, depth, time):
"""
Add decompression stop.
:param depth: Depth of decompression stop [m].
:param time: Time of decompression stop [min].
"""
if __debug__:
logger.debug(
'deco table: adding {}m {}min stop'.format(depth, time)
)
time = round(time, const.SCALE)
stop = DecoStop(depth, time)
assert stop.time > 0
assert stop.depth > 0
super().append(stop)
if __debug__:
logger.debug('deco table: added {}'.format(stop))
# vim: sw=4:et:ai