Workflows
Note: Workflows with only standard steps are supported.
Import the workflow resource module and instantiate a workflow-service
client
[ ]:
from collections.abc import Iterator
from typing import Any
from peak.resources import workflows
wf_client: workflows.Workflow = workflows.get_client()
Possible field values
Workflow status
The status of the workflow. The following statuses are supported: Draft
, Running
, Available
and Paused
.
Execution status or Last execution status
The status of the workflow execution. The following statuses are supported: Success
, Running
, Stopped
, Stopping
and Failed
.
List all existing workflow
(s) and iterate over them
[ ]:
list_workflows_iterator: Iterator[dict[str, Any]] = wf_client.list_workflows(
name="test",
workflow_status=["Available"],
last_execution_status=["Running"],
last_modified_by=["test@peak.ai"],
)
list_workflows_iterated: dict[str, Any] = next(list_workflows_iterator)
"""
The list of workflows will be returned in the following format:
{
"pageCount": 1,
"pageNumber": 1,
"pageSize": 25,
"workflows": [
{
"id": 1,
"name": "workflow-name",
"createdAt": "2020-01-01T18:00:00.000Z",
"updatedAt": "2020-01-01T18:00:00.000Z",
"status": "Available",
"lastExecution": {
"executionId": "db88c21d-1add-45dd-a72e-8c6b83b68dee",
"executedAt": "2020-01-01T18:00:00.000Z",
"status": "Success",
"rerunId": "1b0b21b8-370d-459d-94e5-253d4d905ef6"
},
"triggers": [
{
"scheduled": "2020-01-01T18:00:00.000Z",
"cron": "* * * * 1"
},
{
"webhook": "db88c21d-1add-45dd-a72e-8c6b83b68dee"
}
],
"tags": [
{
"name": "foo"
},
{
"name": "bar"
}
]
}
],
"workflowsCount": 1
}
"""
Prepare the payload and submit a new workflow
resource for creation
[ ]:
cron_workflow_body: dict[str, Any] = {
"name": "cron-workflow",
"triggers": [
{
"cron": "0 0 * * *",
},
],
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 3,
"exponentialBackoff": True,
"duration": 2,
},
"watchers": [
{
"user": "abc@peak.ai",
"events": {
"success": False,
"fail": True,
},
},
{
"webhook": {
"name": "info",
"url": "https://abc.com/post",
"payload": '{ "pingback-url": "https:/workflow/123" }',
},
"events": {
"success": False,
"fail": True,
"runtimeExceeded": 10,
},
},
{
"email": {
"name": "email-watcher-1",
"recipients": {
"to": ["user1@peak.ai", "user2@peak.ai"],
},
},
"events": {
"success": False,
"fail": True,
"runtimeExceeded": 10,
},
},
],
"tags": [
{
"name": "foo",
},
{
"name": "bar",
},
],
"steps": {
"stepName": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
"parents": [],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "https://github.com/org/repo",
},
},
},
}
cron_workflow_id: int = wf_client.create_workflow(cron_workflow_body)["id"]
webhook_workflow_body: dict[str, Any] = {
"name": "webhook-workflow",
"triggers": [
{
"webhook": True,
},
],
"tags": [
{
"name": "foo",
},
{
"name": "bar",
},
],
"steps": {
"step1": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py > output.txt",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": [],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "github.com/org/repo",
},
"runConfiguration": {
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 3,
"exponentialBackoff": True,
"duration": 2,
},
},
"outputParameters": {
"PARAM_1": "output.txt",
},
},
"step2": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": ["step1"],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "github.com/org/repo",
},
"executionParameters": {
"conditional": [
{
"condition": "equals",
"paramName": "PARAM_1",
"stepName": "step1",
"value": "some-value",
},
],
"parentStatus": [
{
"condition": "all-of",
"parents": [
"step1",
],
"status": [
"success",
],
},
],
},
},
},
}
webhook_workflow_id: int = wf_client.create_workflow(webhook_workflow_body)["id"]
manual_workflow_body: dict[str, Any] = {
"name": "manual-workflow",
"triggers": [],
"tags": [
{
"name": "foo",
},
{
"name": "bar",
},
],
"steps": {
"stepName": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": [],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "https://github.com/org/repo",
},
"runConfiguration": {
"skipConfiguration": {
"skip": True,
"skipDAG": True,
},
},
},
},
}
manual_workflow_id: int = wf_client.create_workflow(manual_workflow_body)["id"]
Modify the payload and update the existing workflow
resource
[ ]:
updated_body: dict[str, Any] = {
"name": "updated-workflow",
"tags": [
{
"name": "foo",
},
{
"name": "bar",
},
],
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 3,
"exponentialBackoff": True,
"duration": 2,
},
"steps": {
"step1": {
"type": "standard",
"imageId": 456,
"imageVersionId": 2,
"command": "python script.py > output.txt",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
"parents": [],
"stepTimeout": 30,
"clearImageCache": True,
"repository": {
"branch": "main",
"token": "github",
"url": "https://github.com/org/repo",
},
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"runConfiguration": {
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 5,
"exponentialBackoff": True,
"duration": 5,
},
},
"outputParameters": {
"PARAM_1": "output.txt",
},
},
"step2": {
"type": "standard",
"imageId": 456,
"imageVersionId": 2,
"command": "python script.py",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
"parents": ["step1"],
"stepTimeout": 30,
"clearImageCache": True,
"repository": {
"branch": "main",
"token": "github",
"url": "https://github.com/org/repo",
},
"executionParameters": {
"conditional": [
{
"condition": "equals",
"paramName": "PARAM_1",
"stepName": "step1",
"value": "some-value",
},
],
"parentStatus": [
{
"condition": "all-of",
"parents": [
"step1",
],
"status": [
"success",
],
},
],
},
},
},
}
wf_client.update_workflow(workflow_id=manual_workflow_id, body=updated_body)
Modify the payload and update the existing workflow
resource via patch-workflow
[ ]:
## 1. By passing the keys to update through arguments.
patch_workflow_via_args = wf_client.patch_workflow(
workflow_id=manual_workflow_id,
name="updated-workflow",
image_id=100,
image_version_id=100,
instance_type_id=23,
step_names=["step1"],
)
## 2. By passing body containing keys to update
body: dict[str, Any] = {
"name": "updated-workflow",
"triggers": [
{
"cron": "0 0 * * *",
},
],
"watchers": [
{
"user": "abc@peak.ai",
"events": {
"success": False,
"fail": True,
},
},
{
"webhook": {
"name": "info",
"url": "https://abc.com/post",
"payload": '{ "pingback-url": "https:/workflow/123" }',
},
"events": {
"success": False,
"fail": True,
"runtimeExceeded": 10,
},
},
{
"email": {
"name": "email-watcher-1",
"recipients": {
"to": ["user1@peak.ai", "user2@peak.ai"],
},
},
"events": {
"success": False,
"fail": True,
"runtimeExceeded": 10,
},
},
],
"steps": {
"step1": {
"imageId": 100,
"imageVersionId": 100,
"command": "python script.py",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
"repository": {
"branch": "main",
"token": "github",
"url": "https://github.com/org/repo",
},
},
"step2": {
"imageId": 100,
"imageVersionId": 100,
"command": "python script.py",
"resources": {
"instanceTypeId": 22,
"storage": "20GB",
},
},
},
}
wf_client.patch_workflow(workflow_id=cron_workflow_id, body=body)
Create or update a workflow
resource
[ ]:
cron_workflow_body: dict[str, Any] = {
"name": "cron-workflow",
"triggers": [
{
"cron": "0 0 * * *",
},
],
"watchers": [
{
"user": "abc@peak.ai",
"events": {
"success": False,
"fail": True,
},
},
{
"webhook": {
"name": "info",
"url": "https://abc.com/post",
"payload": '{ "pingback-url": "https:/workflow/123" }',
},
"events": {
"success": False,
"fail": True,
"runtimeExceeded": 10,
},
},
{
"email": {
"name": "email-watcher-1",
"recipients": {
"to": ["user1@peak.ai", "user2@peak.ai"],
},
},
"events": {
"success": False,
"fail": True,
"runtimeExceeded": 10,
},
},
],
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 3,
"exponentialBackoff": True,
"duration": 2,
},
"tags": [
{
"name": "foo",
},
{
"name": "bar",
},
],
"steps": {
"step1": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py > output.txt",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": [],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "github.com/org/repo",
},
"runConfiguration": {
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 3,
"exponentialBackoff": True,
"duration": 2,
},
},
"outputParameters": {
"PARAM_1": "output.txt",
},
},
"step2": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": ["step1"],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "github.com/org/repo",
},
"executionParameters": {
"conditional": [
{
"condition": "equals",
"paramName": "PARAM_1",
"stepName": "step1",
"value": "some-value",
},
],
"parentStatus": [
{
"condition": "all-of",
"parents": [
"step1",
],
"status": [
"success",
],
},
],
},
},
},
}
cron_workflow_id: int = wf_client.create_or_update_workflow(cron_workflow_body)["id"]
updated_body: dict[str, Any] = {
"name": "cron-workflow",
"triggers": [
{
"webhook": True,
},
],
"tags": [
{
"name": "foo",
},
{
"name": "bar",
},
],
"steps": {
"step1": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py > output.txt",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": [],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "github.com/org/repo",
},
"runConfiguration": {
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 3,
"exponentialBackoff": True,
"duration": 2,
},
},
"outputParameters": {
"PARAM_1": "output.txt",
},
},
"step2": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py > output.txt",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": ["step1"],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "github.com/org/repo",
},
"executionParameters": {
"conditional": [
{
"condition": "equals",
"paramName": "PARAM_1",
"stepName": "step1",
"value": "some-value",
},
],
"parentStatus": [
{
"condition": "all-of",
"parents": [
"step1",
],
"status": [
"success",
],
},
],
},
},
},
}
webhook_workflow_id: int = wf_client.create_or_update_workflow(updated_body)
Describe an existing workflow
resource
[ ]:
workflow_description: dict[str, Any] = wf_client.describe_workflow(workflow_id=manual_workflow_id)
"""
The workflow details will be returned in the following format:
{
"id": 1,
"name": "workflow-name",
"createdAt": "2020-01-01T18:00:00.000Z",
"updatedAt": "2020-01-01T18:00:00.000Z",
"status": "Available",
"steps": {
"stepName": {
"type": "standard",
"repository": {
"url": "https://github.com/org/repo",
"branch": "main",
"token": ""
},
"imageId": 0,
"imageVersionId": 0,
"imageVersion": "string",
"clearImageCache": true,
"command": "string",
"stepTimeout": 0,
"resources": {
"instanceTypeId": 23,
"storage": "10GB"
},
"parents": [
"string"
],
"parameters": {
"env": {...},
"secrets": [...]
},
"runConfiguration": {...},
"executionParameters": {...},
"outputParameters": {...},
"imageName": "testImage"
}
},
"lastExecution": {
"executionId": "db88c21d-1add-45dd-a72e-8c6b83b68dee",
"executedAt": "2020-01-01T18:00:00.000Z",
"status": "Success",
"rerunId": "1b0b21b8-370d-459d-94e5-253d4d905ef6"
},
"triggers": [...],
"watchers": [...],
"tags": [
{
"name": "foo"
},
{
"name": "bar"
}
]
}
"""
Delete an existing workflow
resource
[ ]:
wf_client.delete_workflow(workflow_id=63153)
List all executions of a workflow
resource and iterate over them
Execution history is not available for workflow executions older than 90 days.
[ ]:
workflow_executions_iterator: Iterator[dict[str, Any]] = wf_client.list_executions(
workflow_id=manual_workflow_id,
status=["Running", "Success"],
count=10,
date_from="2024-01-07T18:00:00.000Z",
date_to="2024-01-07T19:00:00.000Z",
)
workflow_executions_iterated: dict[str, Any] = next(workflow_executions_iterator)
"""
The list of workflow executions will be returned in the following format:
{
"executions": [
{
"duration": "01:05:03",
"executedAt": "2020-01-01T00:00:00Z",
"executionId": "46a914da-29169-4344-9dbb-8b827ab316f",
"status": "Success",
"finishedAt": "2020-01-01T00:10:00Z",
"stoppedBy": "Someone",
"reruns": [
{
"duration": "01:05:03",
"executedAt": "2020-01-01T00:11:00Z",
"rerunId": "46a914da-29169-4344-9dbb-8b827ab3112",
"status": "Success",
"finishedAt": "2020-01-01T00:15:00Z",
"stoppedBy": "Someone"
}
]
}
],
"pageSize": 25,
"pageNumber": 1,
"pageCount": 1,
"workflowId": 101,
"executionCount": 1
}
"""
Get Details about a workflow execution
[ ]:
workflow_execution = wf_client.get_execution_details(
workflow_id=1,
execution_id="execution_id",
)
"""
{
"steps": [
{
"finishedAt": "2020-01-01T18:00:00.000Z",
"startedAt": "2020-01-01T18:00:00.000Z",
"status": "Success",
"stepType": "standard",
"children": [
"step-name"
],
"name": "step-name",
"command": "echo Done!",
"image": "111111111.dkr.ecr.eu-west-1.amazonaws.com/default-images-latest:workflow-python-standard-pack-v1-latest",
"output": {
"exitCode": "0",
"message": "terminated"
},
"resources": {
"instanceTypeId": 21,
"storage": "20 GB"
},
"rerunId": "1b0b21b8-370d-459d-94e5-253d4d905ef6",
"source": {
"filePath": "workflow/test.sql",
"type": "GITHUB",
"url": "https://www.github.com/org/repo"
}
}
],
"executedAt": "2020-01-01T18:00:00.000Z",
"finishedAt": "2020-01-01T18:00:00.000Z",
"executionId": "db88c21d-1add-45dd-a72e-8c6b83b68dee",
"status": "Success"
}
"""
Get logs for a specific execution of a workflow
resource
[ ]:
workflow_execution_logs = wf_client.get_execution_logs(
workflow_id=1,
execution_id="execution_id",
step_name="step_name",
)
# To get next set of logs, pass the nextToken returned in the response of previous function call
next_set_of_workflow_execution_logs = wf_client.get_execution_logs(
workflow_id=1,
execution_id="execution_id",
step_name="step_name",
next_token=workflow_execution_logs["nextToken"],
)
# To download the logs locally the `save` parameter can be used along with `file_name`
next_set_of_workflow_execution_logs = wf_client.get_execution_logs(
workflow_id=1,
execution_id="execution_id",
step_name="step_name",
save=True,
file_name="workflow_execution_logs.log",
)
"""
The logs will be returned in the following format:
{
"logs": [
{
"message": "Cloning into service-workflows-api...",
"timestamp": 1697089188119
}
],
"nextToken": "f/37846375578651780454915234936364900527730394239380553728/s",
"stepRunStatus": "Running"
}
"""
Schedule an immediate execution for a workflow
[ ]:
wf_client.execute_workflow(workflow_id=manual_workflow_id)
# Execute a workflow with some parameters
body = {
"params": {
"global": {
"key": "value",
},
"stepWise": {
"step1": {
"key1": "value1",
},
},
},
"stepsToRun": [
{
"stepName": "step1",
"runDAG": True,
},
{
"stepName": "step2",
},
],
}
wf_client.execute_workflow(workflow_id=manual_workflow_id, body=body)
Get all available resources for workflow
[ ]:
available_resources: list[dict[str, Any]] = wf_client.list_resources()
"""
The list of resources will be returned in the following format:
[
{
"cpu": 0.25,
"gpu": null,
"gpuMemory": null,
"id": 21,
"memory": 0.5
}
]
"""
Get default
resource values for workflow
[ ]:
default_resource: dict[str, Any] = wf_client.get_default_resource()
"""
The default resource will be returned in the following format:
{
"instanceTypeId": 21,
"storage": "10GB"
}
"""