flow2: Pipelines¶
Install with pip install flow2
Asynchronous task executor.
Features:
subclass a Task to implement your own constructor and run code
or use CallbackTask to create a task from a bare callback, sync or async
parallel or serial task groups to nest tasks
logging and colored output
Example:
import flow2
async def your_callback1(context):
return 'something'
task = flow2.SerialTaskGroup(
'Your workflow',
flow2.ParallelTaskGroup(
'Parallel task',
flow2.CallbackTask('Task 1', your_callback1),
flow2.CallbackTask('Task 2', your_callback2),
),
flow2.ParallelTaskGroup(
'Parallel task',
YourTask('Task 3', your, args),
flow2.CallbackTask('Task 4', your_callback4),
),
)
result = await task.run()
Result is a dict of the return value of each task, where the keys are snake_cased conversion of the names, ie. ‘Task 4’ becomes ‘task_4’ in the result dict.
The result dict is provisioned by the tasks as they run, and is passed to the next running tasks. As such, in the above example, you are sure that your_callback4 will have the ‘task_3’ key in the context when called.
- class flow2.task.CallbackTask(name, callback, **kwargs)[source]¶
Task decorating a callback
- callback¶
Sync or async task callback, result will be processed by
cli2.asyncio.async_resolve().
- class flow2.task.ParallelTaskGroup(name, *tasks, **kwargs)[source]¶
Run tasks in parallel without caring about success or failures.
- class flow2.task.SerialTaskGroup(name, *tasks, **kwargs)[source]¶
Run tasks one after another, stop and fail if one fails.
- class flow2.task.Task(name, description=None, register=None, output=True)[source]¶
Base task class, don’t use it directly.
- name¶
Task name in human readable format.
- key¶
lower_snake_cased conversion of the name
- async exception(exception, context, raises=True)[source]¶
Called when the task raises an exception.
- Parameters:
execption – Raised exception
context – context dict
raises – wether to raise exception or not
- async process(context=None, raises=True)[source]¶
Orchestrate the call to the
run()method.- Parameters:
context – context dict
raises – Wether to raise exceptions or not
- async run(context)[source]¶
You should override this in your own Task subclasses.
- Raises:
NotImplementedError
- class flow2.task.TaskGroup(name, *tasks, **kwargs)[source]¶
Base TaskGroup, don’t use this directly.
Instead, use
ParallelTaskGrouporSerialTaskGroup.- tasks¶
Task objects