Pipeline Classes¶
Minipipe has two APIs PipeLine and PipeSystem. PipeLine is for sequential pipelines while PipeSystem can be used in any topology.
-
class
pipelines.PipeLine(monitor=False, queue_type='multiprocessing.Queue')¶ A simplified API for linear PipeSytems.
Toy example:
# Define functors def genRand(n=10): for _ in range(n): yield np.random.rand(10) def batch(batch_size=2): x = (yield) for i in range(len(x)//batch_size): yield x[i*batch_size:(i+1)*batch_size] def sumBatch(x): return x.sum() def print_out(x): print (x) # Define pipeline pline = PipeLine() pline.add(Source(genRand, 'source')) pline.add(Regulator(batch, 'batcher'), buffer_size = 10) pline.add(Transform(sumBatch, 'sum'), n_processes = 3) pline.add(Sink(print_out, 'print')) # Build pipeline pline.build() # Run pipeline pline.run() pline.close()
Parameters: monitor – Bool, log stream I/O times Queue_type: String, multiprocesses queue type to be used. Valid types: ‘multiprocessing.Queue’, ‘multiprocessing.SimpleQueue’ Returns: None -
add(pipe, n_processes=1, buffer_size=3)¶ Adds a pipe segment to the pipeline.
Parameters: - pipe – Pipe segment to add to PipeLine
- n_processes – Number of processes (workers) to assign to pipe segment
- buffer_size – Size of Stream buffer
Returns: None
-
-
class
pipelines.PipeSystem(pipes)¶ PipeSystem connects Pipes and creates process pool. Pipes are run and closed with a built PipeSystem.
Toy example:
# Define functors def genRand(n=10): for _ in range(n): yield np.random.rand(10) def batch(batch_size=2): x = (yield) for i in range(len(x)//batch_size): yield x[i*batch_size:(i+1)*batch_size] def sumBatch(x): return x.sum() def split(x): return [x, None] if x > 1 else [None, x] def output_gt_1(x): print '1 <',x def output_lt_1(x): print '1 >',x # Define streams s1, s2, s3, s4, s5 = Stream(), Stream(), Stream(), Stream(), Stream() # Create Pipe segments with up/downstreams # Order is not important pipes = [ Source(genRand, 'source1', downstreams=[s1]), Source(genRand, 'source2', downstreams=[s1]), Regulator(batch, 'batcher', upstreams=[s1], downstreams=[s2]), Transform(sumBatch, 'sum', upstreams=[s2], downstreams=[s3]), Transform(sumBatch, 'sum', upstreams=[s2], downstreams=[s3]), Transform(sumBatch, 'sum', upstreams=[s2], downstreams=[s3]), Transform(split, 'split', upstreams=[s2], downstreams=[s4, s5]), Sink(output_gt_1, 'print_gt_1', upstreams=[s4]), Sink(output_lt_1, 'print_lt_1', upstreams=[s5]), ] # Build pipesystem psys = PipeSystem(pipes) psys.build() # Run pipesystem psys.run() psys.close()
Parameters: name – List[Pipes], List of Pipes with their upstreams/downstreams Returns: None -
build(log_lvl='INFO', monitor=False, ignore_exceptions=None)¶ Connects pipe segments together and builds graph.
Parameters: - name – String, log level, one of: info, debug, warning, error or critical
- monitor – Bool, log stream I/O times
- ignore_exceptions – List of exceptions to ignore while pipeline is running
Returns: None
-
close()¶ Joins pipeline.
-
diagram(draw_streams=False)¶ Draws a graph diagram of pipeline.
Params draw_streams: Bool, if True Streams will be included in graph diagram Returns: graphviz Digraph object
-
reset()¶ Resets pipeline.
-
run()¶ Runs pipeline.
-