Workflows

Note: Workflows with only standard steps are supported.

Import the workflow resource module and instantiate a workflow-service client

[ ]:
from collections.abc import Iterator
from typing import Any

from peak.resources import workflows

wf_client: workflows.Workflow = workflows.get_client()

List all existing workflow(s) and iterate over them

[ ]:
list_workflows_iterator: Iterator[dict[str, Any]] = wf_client.list_workflows(
    name="test",
    workflow_status=["Available"],
    last_execution_status=["Running"],
    last_modified_by=["test@peak.ai"],
)
list_workflows_iterated: dict[str, Any] = next(list_workflows_iterator)

Prepare the payload and submit a new workflow resource for creation

[ ]:
cron_workflow_body: dict[str, Any] = {
    "name": "cron-workflow",
    "triggers": [
        {
            "cron": "0 0 * * *",
        },
    ],
    "retryOptions": {
        "exitCodes": [1, 2],
        "numberOfRetries": 3,
        "exponentialBackoff": True,
        "duration": 2,
    },
    "watchers": [
        {
            "user": "abc@peak.ai",
            "events": {
                "success": False,
                "fail": True,
            },
        },
        {
            "webhook": {
                "name": "info",
                "url": "https://abc.com/post",
                "payload": '{ "pingback-url": "https:/workflow/123" }',
            },
            "events": {
                "success": False,
                "fail": True,
                "runtimeExceeded": 10,
            },
        },
        {
            "email": {
                "name": "email-watcher-1",
                "recipients": {
                    "to": ["user1@peak.ai", "user2@peak.ai"],
                },
            },
            "events": {
                "success": False,
                "fail": True,
                "runtimeExceeded": 10,
            },
        },
    ],
    "tags": [
        {
            "name": "foo",
        },
        {
            "name": "bar",
        },
    ],
    "steps": {
        "stepName": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 21,
                "storage": "10GB",
            },
            "parents": [],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "https://github.com/org/repo",
            },
        },
    },
}

cron_workflow_id: int = wf_client.create_workflow(cron_workflow_body)["id"]

webhook_workflow_body: dict[str, Any] = {
    "name": "webhook-workflow",
    "triggers": [
        {
            "webhook": True,
        },
    ],
    "tags": [
        {
            "name": "foo",
        },
        {
            "name": "bar",
        },
    ],
    "steps": {
        "step1": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py > output.txt",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": [],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "github.com/org/repo",
            },
            "runConfiguration": {
                "retryOptions": {
                    "exitCodes": [1, 2],
                    "numberOfRetries": 3,
                    "exponentialBackoff": True,
                    "duration": 2,
                },
            },
            "outputParameters": {
                "PARAM_1": "output.txt",
            },
        },
        "step2": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": ["step1"],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "github.com/org/repo",
            },
            "executionParameters": {
                "conditional": [
                    {
                        "condition": "equals",
                        "paramName": "PARAM_1",
                        "stepName": "step1",
                        "value": "some-value",
                    },
                ],
                "parentStatus": [
                    {
                        "condition": "all-of",
                        "parents": [
                            "step1",
                        ],
                        "status": [
                            "success",
                        ],
                    },
                ],
            },
        },
    },
}

webhook_workflow_id: int = wf_client.create_workflow(webhook_workflow_body)["id"]

manual_workflow_body: dict[str, Any] = {
    "name": "manual-workflow",
    "triggers": [],
    "tags": [
        {
            "name": "foo",
        },
        {
            "name": "bar",
        },
    ],
    "steps": {
        "stepName": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": [],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "https://github.com/org/repo",
            },
            "runConfiguration": {
                "skipConfiguration": {
                    "skip": True,
                    "skipDAG": True,
                },
            },
        },
    },
}

manual_workflow_id: int = wf_client.create_workflow(manual_workflow_body)["id"]

Modify the payload and update the existing workflow resource

[ ]:
updated_body: dict[str, Any] = {
    "name": "updated-workflow",
    "tags": [
        {
            "name": "foo",
        },
        {
            "name": "bar",
        },
    ],
    "retryOptions": {
        "exitCodes": [1, 2],
        "numberOfRetries": 3,
        "exponentialBackoff": True,
        "duration": 2,
    },
    "steps": {
        "step1": {
            "type": "standard",
            "imageId": 456,
            "imageVersionId": 2,
            "command": "python script.py > output.txt",
            "resources": {
                "instanceTypeId": 21,
                "storage": "10GB",
            },
            "parents": [],
            "stepTimeout": 30,
            "clearImageCache": True,
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "https://github.com/org/repo",
            },
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "runConfiguration": {
                "retryOptions": {
                    "exitCodes": [1, 2],
                    "numberOfRetries": 5,
                    "exponentialBackoff": True,
                    "duration": 5,
                },
            },
            "outputParameters": {
                "PARAM_1": "output.txt",
            },
        },
        "step2": {
            "type": "standard",
            "imageId": 456,
            "imageVersionId": 2,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 21,
                "storage": "10GB",
            },
            "parents": ["step1"],
            "stepTimeout": 30,
            "clearImageCache": True,
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "https://github.com/org/repo",
            },
            "executionParameters": {
                "conditional": [
                    {
                        "condition": "equals",
                        "paramName": "PARAM_1",
                        "stepName": "step1",
                        "value": "some-value",
                    },
                ],
                "parentStatus": [
                    {
                        "condition": "all-of",
                        "parents": [
                            "step1",
                        ],
                        "status": [
                            "success",
                        ],
                    },
                ],
            },
        },
    },
}

wf_client.update_workflow(workflow_id=manual_workflow_id, body=updated_body)

Modify the payload and update the existing workflow resource via patch-workflow

[ ]:
## 1. By passing the keys to update through arguments.

patch_workflow_via_args = wf_client.patch_workflow(
    workflow_id=manual_workflow_id,
    name="updated-workflow",
    image_id=100,
    image_version_id=100,
    instance_type_id=23,
    step_names=["step1"],
)


## 2. By passing body containing keys to update

body: dict[str, Any] = {
    "name": "updated-workflow",
    "triggers": [
        {
            "cron": "0 0 * * *",
        },
    ],
    "watchers": [
        {
            "user": "abc@peak.ai",
            "events": {
                "success": False,
                "fail": True,
            },
        },
        {
            "webhook": {
                "name": "info",
                "url": "https://abc.com/post",
                "payload": '{ "pingback-url": "https:/workflow/123" }',
            },
            "events": {
                "success": False,
                "fail": True,
                "runtimeExceeded": 10,
            },
        },
        {
            "email": {
                "name": "email-watcher-1",
                "recipients": {
                    "to": ["user1@peak.ai", "user2@peak.ai"],
                },
            },
            "events": {
                "success": False,
                "fail": True,
                "runtimeExceeded": 10,
            },
        },
    ],
    "steps": {
        "step1": {
            "imageId": 100,
            "imageVersionId": 100,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 21,
                "storage": "10GB",
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "https://github.com/org/repo",
            },
        },
        "step2": {
            "imageId": 100,
            "imageVersionId": 100,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 22,
                "storage": "20GB",
            },
        },
    },
}

wf_client.patch_workflow(workflow_id=cron_workflow_id, body=body)

Create or update a workflow resource

[ ]:
cron_workflow_body: dict[str, Any] = {
    "name": "cron-workflow",
    "triggers": [
        {
            "cron": "0 0 * * *",
        },
    ],
    "watchers": [
        {
            "user": "abc@peak.ai",
            "events": {
                "success": False,
                "fail": True,
            },
        },
        {
            "webhook": {
                "name": "info",
                "url": "https://abc.com/post",
                "payload": '{ "pingback-url": "https:/workflow/123" }',
            },
            "events": {
                "success": False,
                "fail": True,
                "runtimeExceeded": 10,
            },
        },
        {
            "email": {
                "name": "email-watcher-1",
                "recipients": {
                    "to": ["user1@peak.ai", "user2@peak.ai"],
                },
            },
            "events": {
                "success": False,
                "fail": True,
                "runtimeExceeded": 10,
            },
        },
    ],
    "retryOptions": {
        "exitCodes": [1, 2],
        "numberOfRetries": 3,
        "exponentialBackoff": True,
        "duration": 2,
    },
    "tags": [
        {
            "name": "foo",
        },
        {
            "name": "bar",
        },
    ],
    "steps": {
        "step1": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py > output.txt",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": [],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "github.com/org/repo",
            },
            "runConfiguration": {
                "retryOptions": {
                    "exitCodes": [1, 2],
                    "numberOfRetries": 3,
                    "exponentialBackoff": True,
                    "duration": 2,
                },
            },
            "outputParameters": {
                "PARAM_1": "output.txt",
            },
        },
        "step2": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": ["step1"],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "github.com/org/repo",
            },
            "executionParameters": {
                "conditional": [
                    {
                        "condition": "equals",
                        "paramName": "PARAM_1",
                        "stepName": "step1",
                        "value": "some-value",
                    },
                ],
                "parentStatus": [
                    {
                        "condition": "all-of",
                        "parents": [
                            "step1",
                        ],
                        "status": [
                            "success",
                        ],
                    },
                ],
            },
        },
    },
}

cron_workflow_id: int = wf_client.create_or_update_workflow(cron_workflow_body)["id"]

updated_body: dict[str, Any] = {
    "name": "cron-workflow",
    "triggers": [
        {
            "webhook": True,
        },
    ],
    "tags": [
        {
            "name": "foo",
        },
        {
            "name": "bar",
        },
    ],
    "steps": {
        "step1": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py > output.txt",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": [],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "github.com/org/repo",
            },
            "runConfiguration": {
                "retryOptions": {
                    "exitCodes": [1, 2],
                    "numberOfRetries": 3,
                    "exponentialBackoff": True,
                    "duration": 2,
                },
            },
            "outputParameters": {
                "PARAM_1": "output.txt",
            },
        },
        "step2": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py > output.txt",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": ["step1"],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "github.com/org/repo",
            },
            "executionParameters": {
                "conditional": [
                    {
                        "condition": "equals",
                        "paramName": "PARAM_1",
                        "stepName": "step1",
                        "value": "some-value",
                    },
                ],
                "parentStatus": [
                    {
                        "condition": "all-of",
                        "parents": [
                            "step1",
                        ],
                        "status": [
                            "success",
                        ],
                    },
                ],
            },
        },
    },
}

webhook_workflow_id: int = wf_client.create_or_update_workflow(updated_body)

Describe an existing workflow resource

[ ]:
workflow_description: dict[str, Any] = wf_client.describe_workflow(workflow_id=manual_workflow_id)

Delete an existing workflow resource

[ ]:
wf_client.delete_workflow(workflow_id=63153)

List all executions of a workflow resource and iterate over them

Execution history is not available for workflow executions older than 90 days.

[ ]:
workflow_executions_iterator: Iterator[dict[str, Any]] = wf_client.list_executions(
    workflow_id=manual_workflow_id,
    status=["Running", "Success"],
    count=10,
    date_from="2024-01-07T18:00:00.000Z",
    date_to="2024-01-07T19:00:00.000Z",
)
workflow_executions_iterated: dict[str, Any] = next(workflow_executions_iterator)

Get Details about a workflow execution

[ ]:
workflow_execution = wf_client.get_execution_details(
    workflow_id=1,
    execution_id="execution_id",
)

Get logs for a specific execution of a workflow resource

[ ]:
workflow_execution_logs = wf_client.get_execution_logs(
    workflow_id=1,
    execution_id="execution_id",
    step_name="step_name",
)

# To get next set of logs, pass the nextToken returned in the response of previous function call
next_set_of_workflow_execution_logs = wf_client.get_execution_logs(
    workflow_id=1,
    execution_id="execution_id",
    step_name="step_name",
    next_token=workflow_execution_logs["nextToken"],
)

# To download the logs locally the `save` parameter can be used along with `file_name`
next_set_of_workflow_execution_logs = wf_client.get_execution_logs(
    workflow_id=1,
    execution_id="execution_id",
    step_name="step_name",
    save=True,
    file_name="workflow_execution_logs.log",
)

Schedule an immediate execution for a workflow

[ ]:
wf_client.execute_workflow(workflow_id=manual_workflow_id)

# Execute a workflow with some parameters
body = {
    "params": {
        "global": {
            "key": "value",
        },
        "stepWise": {
            "step1": {
                "key1": "value1",
            },
        },
    },
    "stepsToRun": [
        {
            "stepName": "step1",
            "runDAG": True,
        },
        {
            "stepName": "step2",
        },
    ],
}

wf_client.execute_workflow(workflow_id=manual_workflow_id, body=body)

Get all available resources for workflow

[ ]:
available_resources: list[dict[str, Any]] = wf_client.list_resources()

Get default resource values for workflow

[ ]:
default_resource: dict[str, Any] = wf_client.get_default_resource()