Workflows resource

Import the workflow resource module and instantiate a workflow-service client

[ ]:
from collections.abc import Iterator
from typing import Any

from peak.resources import workflows

wf_client: workflows.Workflow = workflows.get_client()

List all existing workflow(s) and iterate over them

[ ]:
list_workflows_iterator: Iterator[dict[str, Any]] = wf_client.list_workflows()
list_workflows_iterated: dict[str, Any] = next(list_workflows_iterator)

Prepare the payload and submit a new workflow resource for creation

[ ]:
body: dict[str, Any] = {
    "name": "sdk-12345",
    "triggers": [],
    "steps": {"s1": {"type": "standard", "imageId": 16849, "imageVersionId": 101, "command": "python script.py"}},
}

created_id: int = wf_client.create_workflow(body)["id"]

Modify the payload and update the existing workflow resource

[ ]:
updated_body: dict[str, Any] = {
    "name": "sdk-12345",
    "triggers": [],
    "steps": {
        "s1": {"type": "standard", "imageId": 16849, "imageVersionId": 102, "command": "python script.py --arg 1"},
    },
}

wf_client.update_workflow(workflow_id=created_id, body=updated_body)

Describe an existing workflow resource

[ ]:
workflow_description: dict[str, Any] = wf_client.describe_workflow(workflow_id=created_id)

Delete an existing workflow resource

[ ]:
wf_client.delete_workflow(workflow_id=63153)

List all executions of a workflow resource and iterate over them

  • execution history is not available for workflow executions older than 90 days.

[ ]:
workflow_executions_iterator: Iterator[dict[str, Any]] = wf_client.list_executions(workflow_id=created_id)
workflow_executions_iterated: dict[str, Any] = next(workflow_executions_iterator)

Schedule an immediate execution for a workflow

[ ]:
wf_client.execute_workflow(workflow_id=created_id)

# Execute a workflow with some parameters
body = {
    "params": {
        "global": {
            "key": "value",
        },
        "stepWise": {
            "step1": {
                "key1": "value1",
            },
        },
    },
}

wf_client.execute_workflow(workflow_id=created_id, body=body)

Get all available resource templates for a workflow resource

[ ]:
available_resources: list[dict[str, Any]] = wf_client.list_resources()

Get default resource template for a workflow resource

[ ]:
default_resource: dict[str, Any] = wf_client.get_default_resource()