flow2: Pipelines

Install with pip install flow2

_images/flow2.png

Asynchronous task executor.

Features:

  • subclass a Task to implement your own constructor and run code

  • or use CallbackTask to create a task from a bare callback, sync or async

  • parallel or serial task groups to nest tasks

  • logging and colored output

Example:

import flow2

async def your_callback1(context):
    return 'something'

task = flow2.SerialTaskGroup(
    'Your workflow',
    flow2.ParallelTaskGroup(
        'Parallel task',
        flow2.CallbackTask('Task 1', your_callback1),
        flow2.CallbackTask('Task 2', your_callback2),
    ),
    flow2.ParallelTaskGroup(
        'Parallel task',
        YourTask('Task 3', your, args),
        flow2.CallbackTask('Task 4', your_callback4),
    ),
)

result = await task.run()

Result is a dict of the return value of each task, where the keys are snake_cased conversion of the names, ie. ‘Task 4’ becomes ‘task_4’ in the result dict.

The result dict is provisioned by the tasks as they run, and is passed to the next running tasks. As such, in the above example, you are sure that your_callback4 will have the ‘task_3’ key in the context when called.

class flow2.task.CallbackTask(name, callback, **kwargs)[source]

Task decorating a callback

callback

Sync or async task callback, result will be processed by cli2.asyncio.async_resolve().

async run(context)[source]

You should override this in your own Task subclasses.

Raises:

NotImplementedError

class flow2.task.ParallelTaskGroup(name, *tasks, **kwargs)[source]

Run tasks in parallel without caring about success or failures.

async run(context)[source]

You should override this in your own Task subclasses.

Raises:

NotImplementedError

class flow2.task.SerialTaskGroup(name, *tasks, **kwargs)[source]

Run tasks one after another, stop and fail if one fails.

async run(context)[source]

You should override this in your own Task subclasses.

Raises:

NotImplementedError

class flow2.task.Task(name, description=None, register=None, output=True)[source]

Base task class, don’t use it directly.

name

Task name in human readable format.

key

lower_snake_cased conversion of the name

async exception(exception, context, raises=True)[source]

Called when the task raises an exception.

Parameters:
  • execption – Raised exception

  • context – context dict

  • raises – wether to raise exception or not

async process(context=None, raises=True)[source]

Orchestrate the call to the run() method.

Parameters:
  • context – context dict

  • raises – Wether to raise exceptions or not

async run(context)[source]

You should override this in your own Task subclasses.

Raises:

NotImplementedError

async start(context)[source]

Called when the task begins, does printing and logging.

Parameters:

context – context dict

async success(result, context)[source]

Called when the task ends successfully, does printing and logging.

Parameters:
  • result – Result of the task run method

  • context – context dict

class flow2.task.TaskGroup(name, *tasks, **kwargs)[source]

Base TaskGroup, don’t use this directly.

Instead, use ParallelTaskGroup or SerialTaskGroup.

tasks

Task objects