Workflows resource
Import the workflow resource module and instantiate a workflow-service
client
[ ]:
from collections.abc import Iterator
from typing import Any
from peak.resources import workflows
wf_client: workflows.Workflow = workflows.get_client()
List all existing workflow
(s) and iterate over them
[ ]:
list_workflows_iterator: Iterator[dict[str, Any]] = wf_client.list_workflows()
list_workflows_iterated: dict[str, Any] = next(list_workflows_iterator)
Prepare the payload and submit a new workflow
resource for creation
[ ]:
body: dict[str, Any] = {
"name": "sdk-12345",
"triggers": [],
"steps": {"s1": {"type": "standard", "imageId": 16849, "imageVersionId": 101, "command": "python script.py"}},
}
created_id: int = wf_client.create_workflow(body)["id"]
Modify the payload and update the existing workflow
resource
[ ]:
updated_body: dict[str, Any] = {
"name": "sdk-12345",
"triggers": [],
"steps": {
"s1": {"type": "standard", "imageId": 16849, "imageVersionId": 102, "command": "python script.py --arg 1"},
},
}
wf_client.update_workflow(workflow_id=created_id, body=updated_body)
Describe an existing workflow
resource
[ ]:
workflow_description: dict[str, Any] = wf_client.describe_workflow(workflow_id=created_id)
Delete an existing workflow
resource
[ ]:
wf_client.delete_workflow(workflow_id=63153)
List all executions of a workflow
resource and iterate over them
execution history is not available for workflow executions older than 90 days.
[ ]:
workflow_executions_iterator: Iterator[dict[str, Any]] = wf_client.list_executions(workflow_id=created_id)
workflow_executions_iterated: dict[str, Any] = next(workflow_executions_iterator)
Schedule an immediate execution for a workflow
[ ]:
wf_client.execute_workflow(workflow_id=created_id)
# Execute a workflow with some parameters
body = {
"params": {
"global": {
"key": "value",
},
"stepWise": {
"step1": {
"key1": "value1",
},
},
},
}
wf_client.execute_workflow(workflow_id=created_id, body=body)
Get all available resource templates for a workflow
resource
[ ]:
available_resources: list[dict[str, Any]] = wf_client.list_resources()
Get default
resource template for a workflow
resource
[ ]:
default_resource: dict[str, Any] = wf_client.get_default_resource()