Streamlined list processing aka. pipes in Python

One of the most widely admired contributions of Unix to the culture of operating systems and command languages is the pipe, as used in a pipeline of commands. — Dennis M. Ritchie [dmr]

If the pipe is so great; why, then, does your favorite modern, dynamic programming language not have pipes built-in?

If the idea is merely to direct the output of some computation to be the input of another, any programming languages with a procedure abstraction would let you do that simply by composition. Like this,

f(g(h(input)))

However, that is not a pipeline! The procedure f can not start after g has finished, and the procedure h can only start after that.

A pipeline should process data synchronously. For example, in the pipeline

grep $regex input | wc -l

Both processes are forked from the shell and wc can start processing data as soon as grep has found a matching line. This can be very useful when the producer process is doing blocking I/O: a preemptive OS could schedule execution of the consumer process while the producer is blocking. Paralleism also comes in for free when we can run both processes simultaneously on different CPUs.

Thus, what a programming language really needs in order to implement pipes is a way to pause and resume computation. This usually comes in the form of coroutines which allow cooperative multitasking. In this light, the pipeline is a way to express composition of coroutines. Pipes allow us to construct modular pieces and then connect them together, which makes code more reusable.

One thing that is obviously necessary for a pipeline to work is a consistent interface to handle the data being piped. In Unix, processes' standard input and output are uniform streams of bytes. Most modern programming languages can represent ordered streams of elements usually in the form of iterators (but they can implemented using closure, for instance, as in [sicp]).

Now, Python already has iterators and generators, which is just what we need. Implementing pipes is simply a matter of notation, which I suppose, is taken very seriously by Pythonistas. In fact, the documentation of the itertools module describe itself as providing tools that, together, forms an "iterator algebra". Today, we are going to implement an composition operator for it.

Indeed, why write:

hs = (h(i) for i in iterable)
s = (f(j) for j in hs if g(j))
result = [next(s) for _ in range(5)]

in which the data-flow path is covered in a jungle of intermediate variables and syntactic noises?

This is what we want:

result = iterable >> map(h) >> filter(g) >> map(f) >> take(5)

First we define a Stream class with the operator >> overloaded. It will obviously need to act like an iterator.

class Stream(object):
    def __init__(self, iterable=None):
        self.iterator = iter(iterable if iterable else [])

    def __iter__(self):
        return self.iterator

    def next(self):
        return next(self.iterator)

    def __pipe__(self, inpipe):
        self.iterator = self.__call__(inpipe)
        return self

    @staticmethod
    def pipe(inpipe, outpipe):
        if hasattr(outpipe, '__pipe__'):
            return outpipe.__pipe__(inpipe)
        elif hasattr(outpipe, '__call__'):
            return outpipe(inpipe)

    def __rshift__(self, outpipe):
        return Stream.pipe(self, outpipe)

    def __rrshift__(self, inpipe):
        return Stream.pipe(inpipe, self)
When the operator >> connects two Stream instances: inpipe and outpipe, the static method Stream.pipe(inpipe, outpipe) is called which in turns calls outpipe.__pipe__(inpipe). The method __pipe__ defers to __call__ (which must be implemented in Stream's subclasses) in order to get an modified iterator which we will use as the iterator of outpipe. Effectively, the method __call__ of a Stream subclass will define how it will process the input. This is convenient since we will be able to pipe directly to a regular Python function. Alternatively, a subclass could redefine __pipe__ if it does not want to return a Stream instance.

We can now write take, map and filter:

class take(Stream):
    def __init__(self, n):
        super(take, self).__init__()
        self.n = n
        self.items = []

    def __call__(self, inpipe):
        self.items =  [next(inpipe) for _ in xrange(self.n)]
        return iter(self.items)

    def __repr__(self):
        return "Stream(%s)" % self.items

import itertools

class Filter(Stream):
    def __init__(self, function):
        super(Filter, self).__init__()
        self.function = function

class map(Filter):
    def __call__(self, inpipe):
        return itertools.imap(self.function, inpipe)

class filter(Filter):
    def __call__(self, inpipe):
        return itertools.ifilter(self.function, inpipe)

Here, Filter is just a wrapper class for any stream filter that needs a function attribute, which includes map and filter.

To test this out, consider this small problem: grep all lines matching a regex from a file, cut out the 4th and 6th fields separated by ' ', ':' or '.', strip leading zeros, then save as a list. Traditionally, it is solved in Python along these lines:

import re
search = re.compile('[Pp]attern').search
split = re.compile(' |:|\.').split
splitted = (split(line) for line in open('file') if search(line))
unstripped = flatten((field[3], field[5]) for field in splitted)
result = [s.lstrip('0') for s in unstripped]

We can also express the solution concisely as a pipeline:

import re
from operator import itemgetter, methodcaller
result = open('file') \
  >> filter(re.compile('[Pp]attern').search) \
  >> map(re.compile(' |:|\.').split) \
  >> map(itemgetter(3, 5)) \
  >> flatten \
  >> map(methodcaller('lstrip', '0')) \
  >> list
Note that the pipeline articulates exactly each processing step as required in the problem statement, whereas each step in the traditional solution does several things at once with different notations and syntaxes. It is worth noting that the pipelines are more editable and unit-testable than list comprehensions, since they are flat rather than nested, which is definitely preferable in Pythonland.

In the above pipeline, flatten can simply be a plain Python function, like this:

def flatten(listOfLists):
    return itertools.chain.from_iterable(listOfLists)
The pipeline is well-formed as flatten is sandwiched between two instances that understand the stream operator.

Many marvellous programs can be written using the stream paradigm, but that will have to be the topic of another article. For today, I am revealing to you a package, stream.py, from which the code in this post has been derived. It offers, for example, a very cute way of "slicing" an iterator:

c = itertools.count()
c >> item[1:10:2]
[1, 3, 5, 7, 9]
c >> item[:5]
[10, 11, 12, 13, 14]

The module level documentation is also available here. Enjoy! ■

Reference

[dmr] Ritchie M. Dennis. "The Evolution of the Unix Time-sharing System."

[sicp] Abelson, Hal, Jerry Sussman and Julie Sussman. "Streams" Structure and Interpretation of Computer Programs.

Comments