Pipeline Classes

Minipipe has two APIs PipeLine and PipeSystem. PipeLine is for sequential pipelines while PipeSystem can be used in any topology.

class pipelines.PipeLine(monitor=False, queue_type='multiprocessing.Queue')

A simplified API for linear PipeSytems.

Toy example:

# Define functors
def genRand(n=10):
    for _ in range(n):
        yield np.random.rand(10)

def batch(batch_size=2):
    x = (yield)
    for i in range(len(x)//batch_size):
        yield x[i*batch_size:(i+1)*batch_size]

def sumBatch(x):
    return x.sum()

def print_out(x):
    print (x)

# Define pipeline
pline = PipeLine()
pline.add(Source(genRand, 'source'))
pline.add(Regulator(batch, 'batcher'), buffer_size = 10)
pline.add(Transform(sumBatch, 'sum'), n_processes = 3)
pline.add(Sink(print_out, 'print'))

# Build pipeline
pline.build()

# Run pipeline
pline.run()
pline.close()
Parameters:monitor – Bool, log stream I/O times
Queue_type:String, multiprocesses queue type to be used. Valid types: ‘multiprocessing.Queue’, ‘multiprocessing.SimpleQueue’
Returns:None
add(pipe, n_processes=1, buffer_size=3)

Adds a pipe segment to the pipeline.

Parameters:
  • pipe – Pipe segment to add to PipeLine
  • n_processes – Number of processes (workers) to assign to pipe segment
  • buffer_size – Size of Stream buffer
Returns:

None

class pipelines.PipeSystem(pipes)

PipeSystem connects Pipes and creates process pool. Pipes are run and closed with a built PipeSystem.

Toy example:

# Define functors
def genRand(n=10):
    for _ in range(n):
        yield np.random.rand(10)

def batch(batch_size=2):
    x = (yield)
    for i in range(len(x)//batch_size):
        yield x[i*batch_size:(i+1)*batch_size]

def sumBatch(x):
    return x.sum()

def split(x):
    return [x, None] if x > 1 else [None, x]

def output_gt_1(x):
    print '1 <',x

def output_lt_1(x):
    print '1 >',x

# Define streams
s1, s2, s3, s4, s5 = Stream(), Stream(), Stream(), Stream(), Stream()

# Create Pipe segments with up/downstreams
# Order is not important
pipes = [
    Source(genRand, 'source1', downstreams=[s1]),
    Source(genRand, 'source2', downstreams=[s1]),
    Regulator(batch, 'batcher', upstreams=[s1], downstreams=[s2]),
    Transform(sumBatch, 'sum', upstreams=[s2], downstreams=[s3]),
    Transform(sumBatch, 'sum', upstreams=[s2], downstreams=[s3]),
    Transform(sumBatch, 'sum', upstreams=[s2], downstreams=[s3]),
    Transform(split, 'split', upstreams=[s2], downstreams=[s4, s5]),
    Sink(output_gt_1, 'print_gt_1', upstreams=[s4]),
    Sink(output_lt_1, 'print_lt_1', upstreams=[s5]),
]

# Build pipesystem
psys = PipeSystem(pipes)
psys.build()

# Run pipesystem
psys.run()
psys.close()
Parameters:name – List[Pipes], List of Pipes with their upstreams/downstreams
Returns:None
build(log_lvl='INFO', monitor=False, ignore_exceptions=None)

Connects pipe segments together and builds graph.

Parameters:
  • name – String, log level, one of: info, debug, warning, error or critical
  • monitor – Bool, log stream I/O times
  • ignore_exceptions – List of exceptions to ignore while pipeline is running
Returns:

None

close()

Joins pipeline.

diagram(draw_streams=False)

Draws a graph diagram of pipeline.

Params draw_streams:
 Bool, if True Streams will be included in graph diagram
Returns:graphviz Digraph object
reset()

Resets pipeline.

run()

Runs pipeline.