Workflows

Note: Workflows with only standard steps are supported.

Import the workflow resource module and instantiate a workflow-service client

[ ]:
from collections.abc import Iterator
from typing import Any

from peak.resources import workflows

wf_client: workflows.Workflow = workflows.get_client()

Possible field values

Workflow status

The status of the workflow. The following statuses are supported: Draft, Running, Available and Paused.

Execution status or Last execution status

The status of the workflow execution. The following statuses are supported: Success, Running, Stopped, Stopping and Failed.

List all existing workflow(s) and iterate over them

[ ]:
list_workflows_iterator: Iterator[dict[str, Any]] = wf_client.list_workflows(
    name="test",
    workflow_status=["Available"],
    last_execution_status=["Running"],
    last_modified_by=["test@peak.ai"],
)

list_workflows_iterated: dict[str, Any] = next(list_workflows_iterator)

"""
The list of workflows will be returned in the following format:

{
  "pageCount": 1,
  "pageNumber": 1,
  "pageSize": 25,
  "workflows": [
    {
      "id": 1,
      "name": "workflow-name",
      "createdAt": "2020-01-01T18:00:00.000Z",
      "updatedAt": "2020-01-01T18:00:00.000Z",
      "status": "Available",
      "lastExecution": {
        "executionId": "db88c21d-1add-45dd-a72e-8c6b83b68dee",
        "executedAt": "2020-01-01T18:00:00.000Z",
        "status": "Success",
        "rerunId": "1b0b21b8-370d-459d-94e5-253d4d905ef6"
      },
      "triggers": [
        {
          "scheduled": "2020-01-01T18:00:00.000Z",
          "cron": "* * * * 1"
        },
        {
          "webhook": "db88c21d-1add-45dd-a72e-8c6b83b68dee"
        }
      ],
      "tags": [
        {
          "name": "foo"
        },
        {
          "name": "bar"
        }
      ]
    }
  ],
  "workflowsCount": 1
}
"""

Prepare the payload and submit a new workflow resource for creation

[ ]:
cron_workflow_body: dict[str, Any] = {
    "name": "cron-workflow",
    "triggers": [
        {
            "cron": "0 0 * * *",
        },
    ],
    "retryOptions": {
        "exitCodes": [1, 2],
        "numberOfRetries": 3,
        "exponentialBackoff": True,
        "duration": 2,
    },
    "watchers": [
        {
            "user": "abc@peak.ai",
            "events": {
                "success": False,
                "fail": True,
            },
        },
        {
            "webhook": {
                "name": "info",
                "url": "https://abc.com/post",
                "payload": '{ "pingback-url": "https:/workflow/123" }',
            },
            "events": {
                "success": False,
                "fail": True,
                "runtimeExceeded": 10,
            },
        },
        {
            "email": {
                "name": "email-watcher-1",
                "recipients": {
                    "to": ["user1@peak.ai", "user2@peak.ai"],
                },
            },
            "events": {
                "success": False,
                "fail": True,
                "runtimeExceeded": 10,
            },
        },
    ],
    "tags": [
        {
            "name": "foo",
        },
        {
            "name": "bar",
        },
    ],
    "steps": {
        "stepName": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 21,
                "storage": "10GB",
            },
            "parents": [],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "https://github.com/org/repo",
            },
        },
    },
}

cron_workflow_id: int = wf_client.create_workflow(cron_workflow_body)["id"]

webhook_workflow_body: dict[str, Any] = {
    "name": "webhook-workflow",
    "triggers": [
        {
            "webhook": True,
        },
    ],
    "tags": [
        {
            "name": "foo",
        },
        {
            "name": "bar",
        },
    ],
    "steps": {
        "step1": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py > output.txt",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": [],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "github.com/org/repo",
            },
            "runConfiguration": {
                "retryOptions": {
                    "exitCodes": [1, 2],
                    "numberOfRetries": 3,
                    "exponentialBackoff": True,
                    "duration": 2,
                },
            },
            "outputParameters": {
                "PARAM_1": "output.txt",
            },
        },
        "step2": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": ["step1"],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "github.com/org/repo",
            },
            "executionParameters": {
                "conditional": [
                    {
                        "condition": "equals",
                        "paramName": "PARAM_1",
                        "stepName": "step1",
                        "value": "some-value",
                    },
                ],
                "parentStatus": [
                    {
                        "condition": "all-of",
                        "parents": [
                            "step1",
                        ],
                        "status": [
                            "success",
                        ],
                    },
                ],
            },
        },
    },
}

webhook_workflow_id: int = wf_client.create_workflow(webhook_workflow_body)["id"]

manual_workflow_body: dict[str, Any] = {
    "name": "manual-workflow",
    "triggers": [],
    "tags": [
        {
            "name": "foo",
        },
        {
            "name": "bar",
        },
    ],
    "steps": {
        "stepName": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": [],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "https://github.com/org/repo",
            },
            "runConfiguration": {
                "skipConfiguration": {
                    "skip": True,
                    "skipDAG": True,
                },
            },
        },
    },
}

manual_workflow_id: int = wf_client.create_workflow(manual_workflow_body)["id"]

Modify the payload and update the existing workflow resource

[ ]:
updated_body: dict[str, Any] = {
    "name": "updated-workflow",
    "tags": [
        {
            "name": "foo",
        },
        {
            "name": "bar",
        },
    ],
    "retryOptions": {
        "exitCodes": [1, 2],
        "numberOfRetries": 3,
        "exponentialBackoff": True,
        "duration": 2,
    },
    "steps": {
        "step1": {
            "type": "standard",
            "imageId": 456,
            "imageVersionId": 2,
            "command": "python script.py > output.txt",
            "resources": {
                "instanceTypeId": 21,
                "storage": "10GB",
            },
            "parents": [],
            "stepTimeout": 30,
            "clearImageCache": True,
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "https://github.com/org/repo",
            },
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "runConfiguration": {
                "retryOptions": {
                    "exitCodes": [1, 2],
                    "numberOfRetries": 5,
                    "exponentialBackoff": True,
                    "duration": 5,
                },
            },
            "outputParameters": {
                "PARAM_1": "output.txt",
            },
        },
        "step2": {
            "type": "standard",
            "imageId": 456,
            "imageVersionId": 2,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 21,
                "storage": "10GB",
            },
            "parents": ["step1"],
            "stepTimeout": 30,
            "clearImageCache": True,
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "https://github.com/org/repo",
            },
            "executionParameters": {
                "conditional": [
                    {
                        "condition": "equals",
                        "paramName": "PARAM_1",
                        "stepName": "step1",
                        "value": "some-value",
                    },
                ],
                "parentStatus": [
                    {
                        "condition": "all-of",
                        "parents": [
                            "step1",
                        ],
                        "status": [
                            "success",
                        ],
                    },
                ],
            },
        },
    },
}

wf_client.update_workflow(workflow_id=manual_workflow_id, body=updated_body)

Modify the payload and update the existing workflow resource via patch-workflow

[ ]:
## 1. By passing the keys to update through arguments.

patch_workflow_via_args = wf_client.patch_workflow(
    workflow_id=manual_workflow_id,
    name="updated-workflow",
    image_id=100,
    image_version_id=100,
    instance_type_id=23,
    step_names=["step1"],
)


## 2. By passing body containing keys to update

body: dict[str, Any] = {
    "name": "updated-workflow",
    "triggers": [
        {
            "cron": "0 0 * * *",
        },
    ],
    "watchers": [
        {
            "user": "abc@peak.ai",
            "events": {
                "success": False,
                "fail": True,
            },
        },
        {
            "webhook": {
                "name": "info",
                "url": "https://abc.com/post",
                "payload": '{ "pingback-url": "https:/workflow/123" }',
            },
            "events": {
                "success": False,
                "fail": True,
                "runtimeExceeded": 10,
            },
        },
        {
            "email": {
                "name": "email-watcher-1",
                "recipients": {
                    "to": ["user1@peak.ai", "user2@peak.ai"],
                },
            },
            "events": {
                "success": False,
                "fail": True,
                "runtimeExceeded": 10,
            },
        },
    ],
    "steps": {
        "step1": {
            "imageId": 100,
            "imageVersionId": 100,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 21,
                "storage": "10GB",
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "https://github.com/org/repo",
            },
        },
        "step2": {
            "imageId": 100,
            "imageVersionId": 100,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 22,
                "storage": "20GB",
            },
        },
    },
}

wf_client.patch_workflow(workflow_id=cron_workflow_id, body=body)

Create or update a workflow resource

[ ]:
cron_workflow_body: dict[str, Any] = {
    "name": "cron-workflow",
    "triggers": [
        {
            "cron": "0 0 * * *",
        },
    ],
    "watchers": [
        {
            "user": "abc@peak.ai",
            "events": {
                "success": False,
                "fail": True,
            },
        },
        {
            "webhook": {
                "name": "info",
                "url": "https://abc.com/post",
                "payload": '{ "pingback-url": "https:/workflow/123" }',
            },
            "events": {
                "success": False,
                "fail": True,
                "runtimeExceeded": 10,
            },
        },
        {
            "email": {
                "name": "email-watcher-1",
                "recipients": {
                    "to": ["user1@peak.ai", "user2@peak.ai"],
                },
            },
            "events": {
                "success": False,
                "fail": True,
                "runtimeExceeded": 10,
            },
        },
    ],
    "retryOptions": {
        "exitCodes": [1, 2],
        "numberOfRetries": 3,
        "exponentialBackoff": True,
        "duration": 2,
    },
    "tags": [
        {
            "name": "foo",
        },
        {
            "name": "bar",
        },
    ],
    "steps": {
        "step1": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py > output.txt",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": [],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "github.com/org/repo",
            },
            "runConfiguration": {
                "retryOptions": {
                    "exitCodes": [1, 2],
                    "numberOfRetries": 3,
                    "exponentialBackoff": True,
                    "duration": 2,
                },
            },
            "outputParameters": {
                "PARAM_1": "output.txt",
            },
        },
        "step2": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": ["step1"],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "github.com/org/repo",
            },
            "executionParameters": {
                "conditional": [
                    {
                        "condition": "equals",
                        "paramName": "PARAM_1",
                        "stepName": "step1",
                        "value": "some-value",
                    },
                ],
                "parentStatus": [
                    {
                        "condition": "all-of",
                        "parents": [
                            "step1",
                        ],
                        "status": [
                            "success",
                        ],
                    },
                ],
            },
        },
    },
}

cron_workflow_id: int = wf_client.create_or_update_workflow(cron_workflow_body)["id"]

updated_body: dict[str, Any] = {
    "name": "cron-workflow",
    "triggers": [
        {
            "webhook": True,
        },
    ],
    "tags": [
        {
            "name": "foo",
        },
        {
            "name": "bar",
        },
    ],
    "steps": {
        "step1": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py > output.txt",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": [],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "github.com/org/repo",
            },
            "runConfiguration": {
                "retryOptions": {
                    "exitCodes": [1, 2],
                    "numberOfRetries": 3,
                    "exponentialBackoff": True,
                    "duration": 2,
                },
            },
            "outputParameters": {
                "PARAM_1": "output.txt",
            },
        },
        "step2": {
            "type": "standard",
            "imageId": 123,
            "imageVersionId": 1,
            "command": "python script.py > output.txt",
            "resources": {
                "instanceTypeId": 23,
                "storage": "20GB",
            },
            "parents": ["step1"],
            "stepTimeout": 30,
            "clearImageCache": True,
            "parameters": {
                "env": {
                    "key": "value",
                },
                "secrets": [
                    "secret-1",
                    "secret-2",
                ],
            },
            "repository": {
                "branch": "main",
                "token": "github",
                "url": "github.com/org/repo",
            },
            "executionParameters": {
                "conditional": [
                    {
                        "condition": "equals",
                        "paramName": "PARAM_1",
                        "stepName": "step1",
                        "value": "some-value",
                    },
                ],
                "parentStatus": [
                    {
                        "condition": "all-of",
                        "parents": [
                            "step1",
                        ],
                        "status": [
                            "success",
                        ],
                    },
                ],
            },
        },
    },
}

webhook_workflow_id: int = wf_client.create_or_update_workflow(updated_body)

Describe an existing workflow resource

[ ]:
workflow_description: dict[str, Any] = wf_client.describe_workflow(workflow_id=manual_workflow_id)

"""
The workflow details will be returned in the following format:

{
  "id": 1,
  "name": "workflow-name",
  "createdAt": "2020-01-01T18:00:00.000Z",
  "updatedAt": "2020-01-01T18:00:00.000Z",
  "status": "Available",
  "steps": {
    "stepName": {
      "type": "standard",
      "repository": {
        "url": "https://github.com/org/repo",
        "branch": "main",
        "token": ""
      },
      "imageId": 0,
      "imageVersionId": 0,
      "imageVersion": "string",
      "clearImageCache": true,
      "command": "string",
      "stepTimeout": 0,
      "resources": {
        "instanceTypeId": 23,
        "storage": "10GB"
      },
      "parents": [
        "string"
      ],
      "parameters": {
        "env": {...},
        "secrets": [...]
      },
      "runConfiguration": {...},
      "executionParameters": {...},
      "outputParameters": {...},
      "imageName": "testImage"
    }
  },
  "lastExecution": {
    "executionId": "db88c21d-1add-45dd-a72e-8c6b83b68dee",
    "executedAt": "2020-01-01T18:00:00.000Z",
    "status": "Success",
    "rerunId": "1b0b21b8-370d-459d-94e5-253d4d905ef6"
  },
  "triggers": [...],
  "watchers": [...],
  "tags": [
    {
      "name": "foo"
    },
    {
      "name": "bar"
    }
  ]
}
"""

Delete an existing workflow resource

[ ]:
wf_client.delete_workflow(workflow_id=63153)

List all executions of a workflow resource and iterate over them

Execution history is not available for workflow executions older than 90 days.

[ ]:
workflow_executions_iterator: Iterator[dict[str, Any]] = wf_client.list_executions(
    workflow_id=manual_workflow_id,
    status=["Running", "Success"],
    count=10,
    date_from="2024-01-07T18:00:00.000Z",
    date_to="2024-01-07T19:00:00.000Z",
)

workflow_executions_iterated: dict[str, Any] = next(workflow_executions_iterator)

"""
The list of workflow executions will be returned in the following format:

{
  "executions": [
    {
      "duration": "01:05:03",
      "executedAt": "2020-01-01T00:00:00Z",
      "executionId": "46a914da-29169-4344-9dbb-8b827ab316f",
      "status": "Success",
      "finishedAt": "2020-01-01T00:10:00Z",
      "stoppedBy": "Someone",
      "reruns": [
        {
          "duration": "01:05:03",
          "executedAt": "2020-01-01T00:11:00Z",
          "rerunId": "46a914da-29169-4344-9dbb-8b827ab3112",
          "status": "Success",
          "finishedAt": "2020-01-01T00:15:00Z",
          "stoppedBy": "Someone"
        }
      ]
    }
  ],
  "pageSize": 25,
  "pageNumber": 1,
  "pageCount": 1,
  "workflowId": 101,
  "executionCount": 1
}
"""

Get Details about a workflow execution

[ ]:
workflow_execution = wf_client.get_execution_details(
    workflow_id=1,
    execution_id="execution_id",
)

"""
{
  "steps": [
    {
      "finishedAt": "2020-01-01T18:00:00.000Z",
      "startedAt": "2020-01-01T18:00:00.000Z",
      "status": "Success",
      "stepType": "standard",
      "children": [
        "step-name"
      ],
      "name": "step-name",
      "command": "echo Done!",
      "image": "111111111.dkr.ecr.eu-west-1.amazonaws.com/default-images-latest:workflow-python-standard-pack-v1-latest",
      "output": {
        "exitCode": "0",
        "message": "terminated"
      },
      "resources": {
        "instanceTypeId": 21,
        "storage": "20 GB"
      },
      "rerunId": "1b0b21b8-370d-459d-94e5-253d4d905ef6",
      "source": {
        "filePath": "workflow/test.sql",
        "type": "GITHUB",
        "url": "https://www.github.com/org/repo"
      }
    }
  ],
  "executedAt": "2020-01-01T18:00:00.000Z",
  "finishedAt": "2020-01-01T18:00:00.000Z",
  "executionId": "db88c21d-1add-45dd-a72e-8c6b83b68dee",
  "status": "Success"
}
"""

Get logs for a specific execution of a workflow resource

[ ]:
workflow_execution_logs = wf_client.get_execution_logs(
    workflow_id=1,
    execution_id="execution_id",
    step_name="step_name",
)

# To get next set of logs, pass the nextToken returned in the response of previous function call
next_set_of_workflow_execution_logs = wf_client.get_execution_logs(
    workflow_id=1,
    execution_id="execution_id",
    step_name="step_name",
    next_token=workflow_execution_logs["nextToken"],
)

# To download the logs locally the `save` parameter can be used along with `file_name`
next_set_of_workflow_execution_logs = wf_client.get_execution_logs(
    workflow_id=1,
    execution_id="execution_id",
    step_name="step_name",
    save=True,
    file_name="workflow_execution_logs.log",
)

"""
The logs will be returned in the following format:

{
  "logs": [
    {
      "message": "Cloning into service-workflows-api...",
      "timestamp": 1697089188119
    }
  ],
  "nextToken": "f/37846375578651780454915234936364900527730394239380553728/s",
  "stepRunStatus": "Running"
}
"""

Schedule an immediate execution for a workflow

[ ]:
wf_client.execute_workflow(workflow_id=manual_workflow_id)

# Execute a workflow with some parameters
body = {
    "params": {
        "global": {
            "key": "value",
        },
        "stepWise": {
            "step1": {
                "key1": "value1",
            },
        },
    },
    "stepsToRun": [
        {
            "stepName": "step1",
            "runDAG": True,
        },
        {
            "stepName": "step2",
        },
    ],
}

wf_client.execute_workflow(workflow_id=manual_workflow_id, body=body)

Get all available resources for workflow

[ ]:
available_resources: list[dict[str, Any]] = wf_client.list_resources()

"""
The list of resources will be returned in the following format:

[
  {
    "cpu": 0.25,
    "gpu": null,
    "gpuMemory": null,
    "id": 21,
    "memory": 0.5
  }
]
"""

Get default resource values for workflow

[ ]:
default_resource: dict[str, Any] = wf_client.get_default_resource()

"""
The default resource will be returned in the following format:

{
  "instanceTypeId": 21,
  "storage": "10GB"
}
"""