Package kenozooid :: Module flow

Source Code for Module kenozooid.flow

  1  # 
  2  # Kenozooid - dive planning and analysis toolbox. 
  3  # 
  4  # Copyright (C) 2009-2019 by Artur Wroblewski <wrobell@riseup.net> 
  5  # 
  6  # This program is free software: you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation, either version 3 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, 
 12  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 14  # GNU General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program.  If not, see <http://www.gnu.org/licenses/>. 
 18  # 
 19   
 20  """ 
 21  Kenozooid data flow processing functions and coroutines. 
 22  """ 
 23   
 24  import itertools 
 25  import os 
 26  from contextlib import contextmanager 
 27  from tempfile import mkstemp 
28 29 -def coroutine(func):
30 """ 31 Decorator for a coroutine function. 32 33 Advances a coroutine to its first ``(yield)`` statement. 34 """ 35 def start(*args, **kwargs): 36 cr = func(*args, **kwargs) 37 next(cr) 38 return cr
39 return start 40
41 42 -def pipe(data, *gens):
43 """ 44 Pipe data through list of generators. 45 46 :Parameters: 47 data 48 Data to pipe through the generators. 49 gens 50 List of generators to process the data. 51 """ 52 for g in gens: 53 data = g(data) 54 return data
55
56 57 -def send(data, tc):
58 """ 59 Send data from iterator to target coroutine. 60 61 :Parameters: 62 data 63 Iterator of data to send. 64 tc 65 Coroutine to receive the data. 66 """ 67 for v in data: 68 tc.send(v) 69 tc.close()
70
71 72 @coroutine 73 -def split(*tc):
74 """ 75 Coroutine to receive a value and send it to all coroutines specified 76 in ``tc`` list. 77 78 :Parameters: 79 tc 80 List of target coroutines. 81 """ 82 while True: 83 v = yield 84 for c in tc: 85 c.send(v)
86
87 88 @coroutine 89 -def concat(n, cat=itertools.chain, tc=None):
90 """ 91 Coroutine to concatenate data from ``n`` sources. 92 93 The coroutines receives ``n`` sources of data and passes them to 94 ``cat`` function. The concatenate result is sent to ``tc`` target 95 coroutine. 96 97 :Parameters: 98 n 99 Amount of data sources. 100 cat 101 Function to concatenate data. 102 tc 103 Target coroutine. 104 """ 105 values = [] 106 for i in range(n): 107 values.append((yield)) 108 rv = cat(*values) 109 if tc is not None: 110 tc.send(rv)
111
112 113 @coroutine 114 -def sink(f):
115 """ 116 A sink coroutine to receive a value and execute with function ``f``. 117 """ 118 while True: 119 v = yield 120 f(v)
121
122 123 @coroutine 124 -def buffer(f, tc=None):
125 """ 126 Coroutine buffer, which stores received data in a file. 127 128 When coroutine is closed, then file object is sent to target coroutine. 129 130 :Parameters: 131 f 132 File object to store buffered data. 133 tc 134 Target coroutine. 135 """ 136 try: 137 while True: 138 lines = yield 139 f.writelines(lines) 140 except GeneratorExit: 141 if tc is not None: 142 tc.send(f)
143
144 145 @contextmanager 146 -def buffer_open(n):
147 """ 148 Open ``n`` buffer files. 149 """ 150 files = [] 151 files.extend(mkstemp() for i in range(n)) 152 fds = tuple(open(fd, 'w+') for fd, fn in files) 153 yield fds 154 for (_, fn), fd in zip(files, fds): 155 fd.close() 156 os.remove(fn)
157 158 # vim: sw=4:et:ai 159