1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 Kenozooid data flow processing functions and coroutines.
22 """
23
24 import itertools
25 import os
26 from contextlib import contextmanager
27 from tempfile import mkstemp
30 """
31 Decorator for a coroutine function.
32
33 Advances a coroutine to its first ``(yield)`` statement.
34 """
35 def start(*args, **kwargs):
36 cr = func(*args, **kwargs)
37 next(cr)
38 return cr
39 return start
40
41
42 -def pipe(data, *gens):
43 """
44 Pipe data through list of generators.
45
46 :Parameters:
47 data
48 Data to pipe through the generators.
49 gens
50 List of generators to process the data.
51 """
52 for g in gens:
53 data = g(data)
54 return data
55
56
57 -def send(data, tc):
58 """
59 Send data from iterator to target coroutine.
60
61 :Parameters:
62 data
63 Iterator of data to send.
64 tc
65 Coroutine to receive the data.
66 """
67 for v in data:
68 tc.send(v)
69 tc.close()
70
71
72 @coroutine
73 -def split(*tc):
74 """
75 Coroutine to receive a value and send it to all coroutines specified
76 in ``tc`` list.
77
78 :Parameters:
79 tc
80 List of target coroutines.
81 """
82 while True:
83 v = yield
84 for c in tc:
85 c.send(v)
86
87
88 @coroutine
89 -def concat(n, cat=itertools.chain, tc=None):
90 """
91 Coroutine to concatenate data from ``n`` sources.
92
93 The coroutines receives ``n`` sources of data and passes them to
94 ``cat`` function. The concatenate result is sent to ``tc`` target
95 coroutine.
96
97 :Parameters:
98 n
99 Amount of data sources.
100 cat
101 Function to concatenate data.
102 tc
103 Target coroutine.
104 """
105 values = []
106 for i in range(n):
107 values.append((yield))
108 rv = cat(*values)
109 if tc is not None:
110 tc.send(rv)
111
112
113 @coroutine
114 -def sink(f):
115 """
116 A sink coroutine to receive a value and execute with function ``f``.
117 """
118 while True:
119 v = yield
120 f(v)
121
122
123 @coroutine
124 -def buffer(f, tc=None):
125 """
126 Coroutine buffer, which stores received data in a file.
127
128 When coroutine is closed, then file object is sent to target coroutine.
129
130 :Parameters:
131 f
132 File object to store buffered data.
133 tc
134 Target coroutine.
135 """
136 try:
137 while True:
138 lines = yield
139 f.writelines(lines)
140 except GeneratorExit:
141 if tc is not None:
142 tc.send(f)
143
147 """
148 Open ``n`` buffer files.
149 """
150 files = []
151 files.extend(mkstemp() for i in range(n))
152 fds = tuple(open(fd, 'w+') for fd, fn in files)
153 yield fds
154 for (_, fn), fd in zip(files, fds):
155 fd.close()
156 os.remove(fn)
157
158
159