Introduction to PipeSystems

Minipipe has two API’s, PipeLine and PipeSystem. PipeLine is a simple sequential API for linear pipelines. However, it’s also possible to build more complex pipe systems with Minipipe that may branch and join to form more complex graph topologies. In this section we’ll introduce the concepts of multiple inputs/outputs, switchs and regulators.

[1]:
import minipipe as mp
import numpy as np

# Define functors

def genRand(n=5):
    # generates fake data
    for _ in range(n):
        yield np.random.rand(4)

def batch(batch_size=2):
    # A coroutine for breaking data up into batches of 2
    x = (yield)
    for i in range(len(x)//batch_size):
        yield x[i*batch_size:(i+1)*batch_size]

def sumBatch(x):
    # simple tranform on data
    return x.sum()

def switch(x):
    # if-then switch
    return [x, None] if x > 1 else [None, x]

def output_gt_1(x):
    # sink for greater than 1
    print('1 <',x)

def output_lt_1(x):
    # sink for less than 1
    print('1 >',x)

First we’ve introduced a Python coroutine batch. This coroutine breaks up the data into batches of size 2. This is an example of a Regulator functor.

We’ve also introduced the concept of a switch functor. This functor has two outputs, in this case a list with two elements, however we could have just as easily used a standard tuple. When a Python None is encountered in a pipe segment it is simple ignored. Thus switch returns x in either the first or second output depending on the value of x.

The PipeSystem API allows you to define general graphs that may branch and join however you choose. In the PipeSystem API you explicitly define the nodes (Pipes) and edges (Streams) of your graph.

[2]:
# Define streams

s1, s2, s3, s4, s5 = mp.Stream(), mp.Stream(), mp.Stream(), mp.Stream(), mp.Stream()
[4]:
# Create Pipe segments with up/downstreams
# Order is not important

pipes = [
    mp.Source(genRand(), 'source1', downstreams=[s1]),
    mp.Source(genRand(), 'source2', downstreams=[s1]),
    mp.Regulator(batch, 'batcher', upstreams=[s1], downstreams=[s2]),
    mp.Transform(sumBatch, 'sum', upstreams=[s2], downstreams=[s3]),
    mp.Transform(switch, 'switch', upstreams=[s3], downstreams=[s4, s5]),
    mp.Sink(output_gt_1, 'print_gt_1', upstreams=[s4]),
    mp.Sink(output_lt_1, 'print_lt_1', upstreams=[s5]),
]

Since switch has two outputs it must have two downstreams, otherwise the list/tuple will be interpreted as a single output. Pipe segments may also have multiple inputs, in which case they must have multiple upstreams.

[6]:
# Build pipesystem

psys = mp.PipeSystem(pipes)
psys.build()
psys.diagram(draw_streams=False)
[6]:
../_images/examples_into_pipesys_8_0.svg
[7]:
# Run pipesystem

psys.run()
psys.close()
1 < 1.604944539622751
1 < 1.604944539622751
1 < 1.251717255372735
1 < 1.5286125042703245
1 < 1.2712189351541268
2020-01-03 14:27:28,772 - INFO - source2 - End of stream
2020-01-03 14:27:28,774 - INFO - source1 - End of stream
1 < 1.7101062302775798
2020-01-03 14:27:28,786 - INFO - source2 - Local termination
1 < 1.251717255372735
1 > 0.542044961649252
1 < 1.5286125042703245
1 < 1.5037996350353544
1 > 0.542044961649252
1 < 1.2712189351541268
1 > 0.973419707259716
1 > 0.973419707259716
1 < 1.7101062302775798
1 > 0.8593722337853537
1 < 1.5037996350353544
1 > 0.8593722337853537
1 < 1.5116823320879524
1 < 1.5116823320879524
2020-01-03 14:27:28,806 - INFO - source1 - Local termination
2020-01-03 14:27:28,810 - INFO - batcher - Local termination
2020-01-03 14:27:28,812 - INFO - sum - Local termination
2020-01-03 14:27:28,818 - INFO - print_gt_1 - Local termination
2020-01-03 14:27:28,818 - INFO - print_lt_1 - Local termination
2020-01-03 14:27:28,819 - INFO - switch - Local termination