Workflows
Note: Workflows with only standard steps are supported.
Import the workflow resource module and instantiate a workflow-service
client
[ ]:
from collections.abc import Iterator
from typing import Any
from peak.resources import workflows
wf_client: workflows.Workflow = workflows.get_client()
List all existing workflow
(s) and iterate over them
[ ]:
list_workflows_iterator: Iterator[dict[str, Any]] = wf_client.list_workflows(
name="test",
workflow_status=["Available"],
last_execution_status=["Running"],
last_modified_by=["test@peak.ai"],
)
list_workflows_iterated: dict[str, Any] = next(list_workflows_iterator)
Prepare the payload and submit a new workflow
resource for creation
[ ]:
cron_workflow_body: dict[str, Any] = {
"name": "cron-workflow",
"triggers": [
{
"cron": "0 0 * * *",
},
],
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 3,
"exponentialBackoff": True,
"duration": 2,
},
"watchers": [
{
"user": "abc@peak.ai",
"events": {
"success": False,
"fail": True,
},
},
{
"webhook": {
"name": "info",
"url": "https://abc.com/post",
"payload": '{ "pingback-url": "https:/workflow/123" }',
},
"events": {
"success": False,
"fail": True,
"runtimeExceeded": 10,
},
},
],
"tags": [
{
"name": "foo",
},
{
"name": "bar",
},
],
"steps": {
"stepName": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
"parents": [],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "https://github.com/org/repo",
},
},
},
}
cron_workflow_id: int = wf_client.create_workflow(cron_workflow_body)["id"]
webhook_workflow_body: dict[str, Any] = {
"name": "webhook-workflow",
"triggers": [
{
"webhook": True,
},
],
"tags": [
{
"name": "foo",
},
{
"name": "bar",
},
],
"steps": {
"step1": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py > output.txt",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": [],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "github.com/org/repo",
},
"runConfiguration": {
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 3,
"exponentialBackoff": True,
"duration": 2,
},
},
"outputParameters": {
"PARAM_1": "output.txt",
},
},
"step2": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": ["step1"],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "github.com/org/repo",
},
"executionParameters": {
"conditional": [
{
"condition": "equals",
"paramName": "PARAM_1",
"stepName": "step1",
"value": "some-value",
},
],
"parentStatus": [
{
"condition": "all-of",
"parents": [
"step1",
],
"status": [
"success",
],
},
],
},
},
},
}
webhook_workflow_id: int = wf_client.create_workflow(webhook_workflow_body)["id"]
manual_workflow_body: dict[str, Any] = {
"name": "manual-workflow",
"triggers": [],
"tags": [
{
"name": "foo",
},
{
"name": "bar",
},
],
"steps": {
"stepName": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": [],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "https://github.com/org/repo",
},
"runConfiguration": {
"skipConfiguration": {
"skip": True,
"skipDAG": True,
},
},
},
},
}
manual_workflow_id: int = wf_client.create_workflow(manual_workflow_body)["id"]
Modify the payload and update the existing workflow
resource
[ ]:
updated_body: dict[str, Any] = {
"name": "updated-workflow",
"tags": [
{
"name": "foo",
},
{
"name": "bar",
},
],
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 3,
"exponentialBackoff": True,
"duration": 2,
},
"steps": {
"step1": {
"type": "standard",
"imageId": 456,
"imageVersionId": 2,
"command": "python script.py > output.txt",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
"parents": [],
"stepTimeout": 30,
"clearImageCache": True,
"repository": {
"branch": "main",
"token": "github",
"url": "https://github.com/org/repo",
},
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"runConfiguration": {
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 5,
"exponentialBackoff": True,
"duration": 5,
},
},
"outputParameters": {
"PARAM_1": "output.txt",
},
},
"step2": {
"type": "standard",
"imageId": 456,
"imageVersionId": 2,
"command": "python script.py",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
"parents": ["step1"],
"stepTimeout": 30,
"clearImageCache": True,
"repository": {
"branch": "main",
"token": "github",
"url": "https://github.com/org/repo",
},
"executionParameters": {
"conditional": [
{
"condition": "equals",
"paramName": "PARAM_1",
"stepName": "step1",
"value": "some-value",
},
],
"parentStatus": [
{
"condition": "all-of",
"parents": [
"step1",
],
"status": [
"success",
],
},
],
},
},
},
}
wf_client.update_workflow(workflow_id=manual_workflow_id, body=updated_body)
Modify the payload and update the existing workflow
resource via patch-workflow
[ ]:
## 1. By passing the keys to update through arguments.
patch_workflow_via_args = wf_client.patch_workflow(
workflow_id=manual_workflow_id,
name="updated-workflow",
image_id=100,
image_version_id=100,
instance_type_id=23,
step_names=["step1"],
)
## 2. By passing body containing keys to update
body: dict[str, Any] = {
"name": "updated-workflow",
"triggers": [
{
"cron": "0 0 * * *",
},
],
"watchers": [
{
"user": "abc@peak.ai",
"events": {
"success": False,
"fail": True,
},
},
{
"webhook": {
"name": "info",
"url": "https://abc.com/post",
"payload": '{ "pingback-url": "https:/workflow/123" }',
},
"events": {
"success": False,
"fail": True,
"runtimeExceeded": 10,
},
},
],
"steps": {
"step1": {
"imageId": 100,
"imageVersionId": 100,
"command": "python script.py",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
"repository": {
"branch": "main",
"token": "github",
"url": "https://github.com/org/repo",
},
},
"step2": {
"imageId": 100,
"imageVersionId": 100,
"command": "python script.py",
"resources": {
"instanceTypeId": 22,
"storage": "20GB",
},
},
},
}
wf_client.patch_workflow(workflow_id=cron_workflow_id, body=body)
Create or update a workflow
resource
[ ]:
cron_workflow_body: dict[str, Any] = {
"name": "cron-workflow",
"triggers": [
{
"cron": "0 0 * * *",
},
],
"watchers": [
{
"user": "abc@peak.ai",
"events": {
"success": False,
"fail": True,
},
},
{
"webhook": {
"name": "info",
"url": "https://abc.com/post",
"payload": '{ "pingback-url": "https:/workflow/123" }',
},
"events": {
"success": False,
"fail": True,
"runtimeExceeded": 10,
},
},
],
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 3,
"exponentialBackoff": True,
"duration": 2,
},
"tags": [
{
"name": "foo",
},
{
"name": "bar",
},
],
"steps": {
"step1": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py > output.txt",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": [],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "github.com/org/repo",
},
"runConfiguration": {
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 3,
"exponentialBackoff": True,
"duration": 2,
},
},
"outputParameters": {
"PARAM_1": "output.txt",
},
},
"step2": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": ["step1"],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "github.com/org/repo",
},
"executionParameters": {
"conditional": [
{
"condition": "equals",
"paramName": "PARAM_1",
"stepName": "step1",
"value": "some-value",
},
],
"parentStatus": [
{
"condition": "all-of",
"parents": [
"step1",
],
"status": [
"success",
],
},
],
},
},
},
}
cron_workflow_id: int = wf_client.create_or_update_workflow(cron_workflow_body)["id"]
updated_body: dict[str, Any] = {
"name": "cron-workflow",
"triggers": [
{
"webhook": True,
},
],
"tags": [
{
"name": "foo",
},
{
"name": "bar",
},
],
"steps": {
"step1": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py > output.txt",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": [],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "github.com/org/repo",
},
"runConfiguration": {
"retryOptions": {
"exitCodes": [1, 2],
"numberOfRetries": 3,
"exponentialBackoff": True,
"duration": 2,
},
},
"outputParameters": {
"PARAM_1": "output.txt",
},
},
"step2": {
"type": "standard",
"imageId": 123,
"imageVersionId": 1,
"command": "python script.py > output.txt",
"resources": {
"instanceTypeId": 23,
"storage": "20GB",
},
"parents": ["step1"],
"stepTimeout": 30,
"clearImageCache": True,
"parameters": {
"env": {
"key": "value",
},
"secrets": [
"secret-1",
"secret-2",
],
},
"repository": {
"branch": "main",
"token": "github",
"url": "github.com/org/repo",
},
"executionParameters": {
"conditional": [
{
"condition": "equals",
"paramName": "PARAM_1",
"stepName": "step1",
"value": "some-value",
},
],
"parentStatus": [
{
"condition": "all-of",
"parents": [
"step1",
],
"status": [
"success",
],
},
],
},
},
},
}
webhook_workflow_id: int = wf_client.create_or_update_workflow(updated_body)
Describe an existing workflow
resource
[ ]:
workflow_description: dict[str, Any] = wf_client.describe_workflow(workflow_id=manual_workflow_id)
Delete an existing workflow
resource
[ ]:
wf_client.delete_workflow(workflow_id=63153)
List all executions of a workflow
resource and iterate over them
Execution history is not available for workflow executions older than 90 days.
[ ]:
workflow_executions_iterator: Iterator[dict[str, Any]] = wf_client.list_executions(workflow_id=manual_workflow_id)
workflow_executions_iterated: dict[str, Any] = next(workflow_executions_iterator)
Get Details about a workflow execution
[ ]:
workflow_execution = wf_client.get_execution_details(
workflow_id=1,
execution_id="execution_id",
)
Get logs for a specific execution of a workflow
resource
[ ]:
workflow_execution_logs = wf_client.get_execution_logs(
workflow_id=1,
execution_id="execution_id",
step_name="step_name",
)
# To get next set of logs, pass the nextToken returned in the response of previous function call
next_set_of_workflow_execution_logs = wf_client.get_execution_logs(
workflow_id=1,
execution_id="execution_id",
step_name="step_name",
next_token=workflow_execution_logs["nextToken"],
)
# To download the logs locally the `save` parameter can be used along with `file_name`
next_set_of_workflow_execution_logs = wf_client.get_execution_logs(
workflow_id=1,
execution_id="execution_id",
step_name="step_name",
save=True,
file_name="workflow_execution_logs.log",
)
Schedule an immediate execution for a workflow
[ ]:
wf_client.execute_workflow(workflow_id=manual_workflow_id)
# Execute a workflow with some parameters
body = {
"params": {
"global": {
"key": "value",
},
"stepWise": {
"step1": {
"key1": "value1",
},
},
},
"stepsToRun": [
{
"stepName": "step1",
"runDAG": True,
},
{
"stepName": "step2",
},
],
}
wf_client.execute_workflow(workflow_id=manual_workflow_id, body=body)
Get all available resources for workflow
[ ]:
available_resources: list[dict[str, Any]] = wf_client.list_resources()
Get default
resource values for workflow
[ ]:
default_resource: dict[str, Any] = wf_client.get_default_resource()