One of the most widely admired contributions of Unix to the culture of operating systems and command languages is the pipe, as used in a pipeline of commands. — Dennis M. Ritchie [dmr]
If the pipe is so great; why, then, does your favorite modern, dynamic programming language not have pipes built-in?
If the idea is merely to direct the output of some computation to be the input of another, any programming languages with a procedure abstraction would let you do that simply by composition. Like this,
f(g(h(input)))
However, that is not a pipeline! The procedure f can not
start after g has finished, and the procedure h can only
start after that.
A pipeline should process data synchronously. For example, in the pipeline
grep $regex input | wc -l
Both processes are forked from the shell and wc can start processing
data as soon as grep has found a matching line. This can be very
useful when the producer process is doing blocking I/O: a preemptive OS could
schedule execution of the consumer process while the producer is blocking.
Paralleism also comes in for free when we can run both processes simultaneously on
different CPUs.
Thus, what a programming language really needs in order to implement pipes is a way to pause and resume computation. This usually comes in the form of coroutines which allow cooperative multitasking. In this light, the pipeline is a way to express composition of coroutines. Pipes allow us to construct modular pieces and then connect them together, which makes code more reusable.
One thing that is obviously necessary for a pipeline to work is a consistent interface to handle the data being piped. In Unix, processes' standard input and output are uniform streams of bytes. Most modern programming languages can represent ordered streams of elements usually in the form of iterators (but they can implemented using closure, for instance, as in [sicp]).
Now, Python already has iterators and generators, which is just what we need.
Implementing pipes is simply a matter of notation, which I suppose, is
taken very seriously by Pythonistas. In fact, the documentation of the
itertools module describe itself as providing tools that, together,
forms an
"iterator algebra". Today, we are going to implement an composition
operator for it.
Indeed, why write:
hs = (h(i) for i in iterable) s = (f(j) for j in hs if g(j)) result = [next(s) for _ in range(5)]
in which the data-flow path is covered in a jungle of intermediate variables and syntactic noises?
This is what we want:
result = iterable >> map(h) >> filter(g) >> map(f) >> take(5)
First we define a Stream class with the operator >> overloaded. It
will obviously need to act like an iterator.
class Stream(object):
def __init__(self, iterable=None):
self.iterator = iter(iterable if iterable else [])
def __iter__(self):
return self.iterator
def next(self):
return next(self.iterator)
def __pipe__(self, inpipe):
self.iterator = self.__call__(inpipe)
return self
@staticmethod
def pipe(inpipe, outpipe):
if hasattr(outpipe, '__pipe__'):
return outpipe.__pipe__(inpipe)
elif hasattr(outpipe, '__call__'):
return outpipe(inpipe)
def __rshift__(self, outpipe):
return Stream.pipe(self, outpipe)
def __rrshift__(self, inpipe):
return Stream.pipe(inpipe, self)
When the operator >> connects two Stream instances:
inpipe and outpipe, the static method
Stream.pipe(inpipe, outpipe) is called which in turns calls
outpipe.__pipe__(inpipe). The method __pipe__ defers to
__call__ (which must be implemented in Stream's subclasses) in order
to get an modified iterator which we will use as the iterator of outpipe.
Effectively, the method __call__ of a Stream subclass will define
how it will process the input. This is convenient since we will be able to
pipe directly to a regular Python function. Alternatively, a subclass could
redefine __pipe__ if it does not want to return a Stream instance.
We can now write take, map and filter:
class take(Stream):
def __init__(self, n):
super(take, self).__init__()
self.n = n
self.items = []
def __call__(self, inpipe):
self.items = [next(inpipe) for _ in xrange(self.n)]
return iter(self.items)
def __repr__(self):
return "Stream(%s)" % self.items
import itertools
class Filter(Stream):
def __init__(self, function):
super(Filter, self).__init__()
self.function = function
class map(Filter):
def __call__(self, inpipe):
return itertools.imap(self.function, inpipe)
class filter(Filter):
def __call__(self, inpipe):
return itertools.ifilter(self.function, inpipe)
Here, Filter is just a wrapper class for any stream filter that
needs a function attribute, which includes map and
filter.
To test this out, consider this small problem: grep all lines matching a regex from a file, cut out the 4th and 6th fields separated by ' ', ':' or '.', strip leading zeros, then save as a list. Traditionally, it is solved in Python along these lines:
import re
search = re.compile('[Pp]attern').search
split = re.compile(' |:|\.').split
splitted = (split(line) for line in open('file') if search(line))
unstripped = flatten((field[3], field[5]) for field in splitted)
result = [s.lstrip('0') for s in unstripped]
We can also express the solution concisely as a pipeline:
import re
from operator import itemgetter, methodcaller
result = open('file') \
>> filter(re.compile('[Pp]attern').search) \
>> map(re.compile(' |:|\.').split) \
>> map(itemgetter(3, 5)) \
>> flatten \
>> map(methodcaller('lstrip', '0')) \
>> list
Note that the pipeline articulates exactly each processing step as required in
the problem statement, whereas each step in the traditional solution does
several things at once with different notations and syntaxes. It is worth
noting that the pipelines are more editable and
unit-testable than list comprehensions, since they are flat
rather than nested, which is definitely preferable in
Pythonland.
In the above pipeline, flatten can simply be a plain Python
function, like this:
def flatten(listOfLists):
return itertools.chain.from_iterable(listOfLists)
The pipeline is well-formed as flatten is sandwiched between two
instances that understand the stream operator.
Many marvellous programs can be written using the stream paradigm, but that will have to be the topic of another article. For today, I am revealing to you a package, stream.py, from which the code in this post has been derived. It offers, for example, a very cute way of "slicing" an iterator:
c = itertools.count() c >> item[1:10:2] [1, 3, 5, 7, 9] c >> item[:5] [10, 11, 12, 13, 14]
The module level documentation is also available here. Enjoy! ■
Reference
[dmr] Ritchie M. Dennis. "The Evolution of the Unix Time-sharing System."
[sicp] Abelson, Hal, Jerry Sussman and Julie Sussman. "Streams" Structure and Interpretation of Computer Programs.
Previous Post Next Post