Workflows
Note: Workflows with only standard steps are supported.
Providing Instance Type and Storage in Workflow
When defining a workflow step, we have the option to set the instance type and storage by including them under the resources
key in the following YAML format:
resources:
instanceTypeId: 23,
storage: 20GB,
To obtain a list of all available instances along with their corresponding instanceTypeId
, we can use the following command:
peak tenants list-instance-options --entity-type workflow
Adding the resources
section is optional. If we don’t specify it for a particular step, the default values will be used. We can retrieve the default values through the following command:
peak workflows list-default-resources
Creating a workflow
We can create a workflow by providing payload containing its configuration.
# workflow.yaml
body:
name: new-workflow
triggers:
- cron: "0 0 * * *"
watchers:
- user: abc@peak.ai
events:
success: false
fail: true
- webhook:
name: info
url: "https://abc.com/post"
payload: |
{
"system": "external_system",
"action": "update",
"data": {
"field": "value",
"timestamp": "2024-05-20T12:00:00Z"
}
}
events:
success: false
fail: true
runtimeExceeded: 10
- email:
name: "email-watcher-1"
recipients:
to:
- user1@peak.ai
- user2@peak.ai
events:
success: false
fail: true
runtimeExceeded: 10
retryOptions:
duration: 5
exitCodes: [1, 2]
exponentialBackoff: true
numberOfRetries: 3
tags:
- name: foo
- name: bar
steps:
stepName:
type: standard
imageId: 100
imageVersionId: 100
command: "python script.py"
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
parameters:
env:
key: value
secrets:
- secret-1
- secret-2
repository:
branch: main
token: random_token
url: "https://github.com/org/repo"
We can use the following command to create a workflow:
peak workflows create path/to/create_workflow.yaml -v path/to/workflow_params.yaml
Updating a workflow
We can update a workflow by providing payload containing its configuration.
# workflow.yaml
body:
name: updated-workflow
triggers:
- webhook: true
watchers:
- user: abc@peak.ai
events:
success: false
fail: true
- webhook:
name: info
url: "https://abc.com/post"
payload: |
{
"system": "external_system",
"action": "update",
"data": {
"field": "value",
"timestamp": "2024-05-20T12:00:00Z"
}
}
events:
success: false
fail: true
runtimeExceeded: 10
- email:
name: "email-watcher-1"
recipients:
to:
- user1@peak.ai
- user2@peak.ai
events:
success: false
fail: true
runtimeExceeded: 10
retryOptions:
duration: 5
exitCodes: [1, 2]
exponentialBackoff: true
numberOfRetries: 3
tags:
- name: CLI
steps:
step1:
command: echo hello world
type: standard
repository:
branch: main
token: random_token
url: "https://github.com/org/repo"
imageId: 100
imageVersionId: 100
resources:
instanceTypeId: 21
storage: 10GB
step2:
command: echo world
type: standard
imageId: 200
imageVersionId: 200
resources:
instanceTypeId: 21
storage: 40GB
We can use the following command to update a workflow:
peak workflows update -v path/to/workflow_params.yaml <workflow-id> path/to/update_workflow.yaml
Using the create-or-update
operation
We can create or update a workflow by providing payload containing its configuration.
The search operation will be based on name
.
# workflow.yaml
body:
name: my-new-workflow
triggers:
- webhook: true
watchers:
- user: abc@peak.ai
events:
success: false
fail: true
- webhook:
name: info
url: "https://abc.com/post"
payload: |
{
"system": "external_system",
"action": "update",
"data": {
"field": "value",
"timestamp": "2024-05-20T12:00:00Z"
}
}
events:
success: false
fail: true
runtimeExceeded: 10
- email:
name: "email-watcher-1"
recipients:
to:
- user1@peak.ai
- user2@peak.ai
events:
success: false
fail: true
runtimeExceeded: 10
retryOptions:
duration: 5
exitCodes: [1, 2]
exponentialBackoff: true
numberOfRetries: 3
tags:
- name: CLI
steps:
step1:
command: echo hello world
type: standard
repository:
branch: main
token: random_token
url: https://example.com
imageId: 100
imageVersionId: 100
resources:
instanceTypeId: 21
storage: 10GB
step2:
command: echo world
type: standard
imageId: 200
imageVersionId: 200
resources:
instanceTypeId: 21
storage: 40GB
We can use the following command to create or update a workflow:
peak workflows create-or-update -v path/to/workflow_params.yaml path/to/create_or_update_workflow.yaml
Partial Workflow Update using patch
workflow
Consider an existing workflow with step details:
"name": "existing-workflow",
"triggers": [],
"steps": {
"step1": {
"imageId": 1,
"imageVersionId": 1,
"command": "python test1.py",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
}
},
"step2": {
"imageId": 2,
"imageVersionId": 2,
"command": "python test2.py",
"resources": {
"instanceTypeId": 22,
"storage": "10GB",
}
},
}
We can partially update a workflow in the following ways.
Command Line Arguments Approach
By passing the parameters to be updated as command line arguments. We can provide step names --step-names
, which would update the steps with the provided parameters. In case step names are not provided, all the steps of the workflow would be updated.
peak workflows patch <workflow_id> --name updated-workflow --image-id 100 --image-version-id 100 --step-names step1
This would update imageId
and imageVersionId
of step1
and step2
of the workflow.
"name": "updated-workflow",
"steps": {
"step1": {
"imageId": 100,
"imageVersionId": 100,
"command": "python test1.py",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
},
"step2": {
"imageId": 2,
"imageVersionId": 2,
"command": "python test2.py",
"resources": {
"instanceTypeId": 22,
"storage": "10GB",
},
},
}
YAML File Approach
We can update parameters via a YAML file:
# patch-workflow.yaml
body:
name: "updated-workflow"
triggers:
- {}
watchers:
- user: abc@peak.ai
events:
success: false
fail: true
- webhook:
name: info
url: "https://abc.com/post"
payload: |
{
"system": "external_system",
"action": "update",
"data": {
"field": "value",
"timestamp": "2024-05-20T12:00:00Z"
}
}
events:
success: false
fail: true
runtimeExceeded: 10
- email:
name: "email-watcher-1"
recipients:
to:
- user1@peak.ai
- user2@peak.ai
events:
success: false
fail: true
runtimeExceeded: 10
retryOptions:
duration: 5
exitCodes: [1, 2]
exponentialBackoff: true
numberOfRetries: 3
steps:
step1:
imageId: 100
imageVersionId: 100
step3:
imageId: 400
imageVersionId: 400
command: "python test4.py"
resources:
instanceTypeId: 24
repository:
url: "https://github.com/org/repo"
branch: main
token: random_token
Use the following command to update the workflow:
peak workflows patch <workflow_id> path/to/patch_workflow.yaml
The updated workflow would look like following:
"name": "updated-workflow",
"triggers": [],
"steps": {
"step1": {
"imageId": 100,
"imageVersionId": 100,
"command": "python test1.py",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
},
"step2": {
"imageId": 2,
"imageVersionId": 2,
"command": "python test2.py",
"resources": {
"instanceTypeId": 22,
"storage": "10GB",
},
},
"step3": {
"imageId": 400,
"imageVersionId": 400,
"command": "python test4.py",
"resources": {
"instanceTypeId": 24,
"storage": "10GB",
},
"repository": {
"branch": "main",
"token": "github",
"url": "https://github.com/org/repo",
},
},
}
Through the above approach, specific keys of step1
are updated, while step2
remains unchanged, and step3
is added as a new step.
We can also combine the YAML file and command line options, with command line options taking precedence.
Note that the keys and values in the above examples are for illustrative purposes.
Setting up advanced workflow configurations
You can customize workflows using options such as Auto retry
, Skippable steps
, Output Parameters
, and Conditional step execution
by including the relevant payload in the workflow creation or update process.
More information about these options can be found in the Peak Platform Knowledge Base.
Auto retry configuration
Auto retry is a powerful feature that automates the retrying of failed steps within a workflow. This configuration can be applied at either the workflow level or the individual step level, with the latter taking precedence over the former.
To implement auto retry at the workflow level, use the following payload example:
# workflow-auto-retry.yaml
body:
name: new-workflow
triggers:
- cron: "0 0 * * *"
retryOptions:
duration: 5
exitCodes: [1, 2]
exponentialBackoff: true
numberOfRetries: 3
steps:
step-1:
type: standard
imageId: 100
imageVersionId: 100
command: echo hello
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
runConfiguration:
retryOptions:
duration: 5
exitCodes: [1, 137, 143]
exponentialBackoff: true
numberOfRetries: 3
step-2:
type: standard
imageId: 100
imageVersionId: 100
command: echo world
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
In the provided example, auto retry is configured at the workflow level, affecting all steps within the workflow.
Auto retry configuration set at the step level will override the workflow-level settings.
Skippable steps configuration
Configure individual workflow steps or their dependencies to be skipped during execution by providing the necessary payload during workflow creation or update.
Use the payload example below to create a workflow with skippable steps configuration:
# workflow-auto-retry.yaml
body:
name: new-workflow
triggers:
- cron: "0 0 * * *"
steps:
step-1:
type: standard
imageId: 100
imageVersionId: 100
command: echo hello
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
runConfiguration:
skipConfiguration:
skip: true
skipDAG: true
step-2:
type: standard
imageId: 100
imageVersionId: 100
command: echo world
resources:
instanceTypeId: 21
storage: 10GB
parents: ["step-1"]
stepTimeout: 30
clearImageCache: true
In this example, step-1
is configured to be skippable, and the skipDAG
property is set to true
, skipping all steps dependent on step-1
.
If
skipDAG
istrue
for a parent step, all children steps will be skipped, regardless of their individual configuration.
Input parameters configuration
Input parameters enable you to pass dynamic values as environment variables, which your scripts can utilize at runtime. These values can be used to control the behavior of the step or to provide input to the step.
Input parameters can be employed in two ways:
Using Key-Value pairs
In this method, input parameters are provided as key-value pairs. The key denotes the name of the environment variable, while the corresponding value sets the variable’s runtime value.
Using Secrets or External Credentials
Alternatively, input parameters can be passed as a list of predefined secrets or external credentials, enhancing security and usability.
To create a workflow with input parameters, you can refer the following payload example:
# workflow_input_parameters.yaml
body:
name: new-workflow
triggers:
- cron: "0 0 * * *"
steps:
step-1:
type: standard
imageId: 100
imageVersionId: 100
command: "echo hello"
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
parameters:
env:
key: value
secrets:
- secret-1
- secret-2
Output parameters configuration
Output parameters allows you to capture and share the output generated by a parent step in a workflow. This captured output can then be used as input or reference in subsequent steps within the same workflow.
Output parameters can be defined by providing a name and path to the file that contains the data that should be exported. The file path specified in the output parameter should exist in the step, else an error will be thrown when the step is run. Once a parent has declared an output parameter, the child steps can use it as an input parameter.
To create a workflow with output parameters, use the following payload example:
# workflow_output_parameters.yaml
body:
name: new-workflow
triggers:
- cron: "0 0 * * *"
steps:
step-1:
type: standard
imageId: 100
imageVersionId: 100
command: "python script1.py > output.txt"
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
repository:
branch: main
token: random_token
url: "https://github.com/org/repo"
outputParameters:
scriptOutput: "output.txt"
In the provided example, an output parameter named scriptOutput
is defined for step-1
, and this parameter can be utilized in any children steps.
Execution condition configuration
Configure individual workflow steps to execute based on the output parameters or status of parent steps. This feature allows you to control the flow of your workflow dynamically.
To create a workflow with execution condition configuration, use the following payload example:
# workflow-execution-parameters.yaml
body:
name: new-workflow
triggers:
- cron: "0 0 * * *"
steps:
step-1:
type: standard
imageId: 100
imageVersionId: 100
command: "python script1.py > output.txt"
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
repository:
branch: main
token: random_token
url: "https://github.com/org/repo"
outputParameters:
scriptOutput: "output.txt"
step-2:
type: standard
imageId: 100
imageVersionId: 100
command: "python script2.py"
resources:
instanceTypeId: 21
storage: 10GB
parents: ["step-1"]
stepTimeout: 30
clearImageCache: true
repository:
branch: main
token: random_token
url: "https://github.com/org/repo"
executionParameters:
conditional:
- condition: "equals"
paramName: "scriptOutput"
stepName: "step-1"
value: "output-value"
parentStatus:
- condition: "one-of"
parents:
- step-1
status:
- success
In this example, step-2
is configured to execute only if the output parameter scriptOutput
of step-1
is equal to output-value
and if the status of step-1
is success
.
Executing a workflow
After creating a workflow, the next step is to execute it using the provided workflow ID. Dynamic parameters, defined as environment variables, can be passed to the steps during execution to enhance flexibility.
# execute_workflow.yaml
body:
params:
global:
param1: value1
stepWise:
step1:
param2: value2
step2:
param3: value3
Use the following command to execute the workflow:
peak workflows execute <workflow-id> path/to/execute_workflow.yaml
Partial Workflow Execution
If you don’t want to execute the entire workflow, you can selectively run specific steps or their dependencies. Provide the steps to be executed in the YAML configuration file, as shown below:
# execute_workflow.yaml
body:
params:
global:
param1: value1
stepWise:
step1:
param2: value2
step2:
param3: value3
stepsToRun:
- stepName: step1
runDAG: true
- stepName: step2
In this example, step1
and step2
are specified for execution. Note that runDAG
is set to true
for step1
, which triggers the execution of all steps dependent on step1
.
Retrieving Workflow Execution Details
When you run a workflow, you might want to get the details about the run - which step ran, which one failed, which one succeeded, etc. To get this detail, we have a command. Just run the following command to get the details about the execution
peak workflows get-execution-details --workflow-id=<workflow-id> --execution-id=<execution-id>
This returns the following details about the execution
{
"executedAt": "2023-10-25T11:39:24.365Z",
"finishedAt": "2023-10-25T11:39:37.342Z",
"runId": "a0ac50ea-23ee-4256-bdd9-6a01a9bbe4f4",
"status": "Failed",
"steps": [
{
"childern": ["step-1"],
"finishedAt": "2023-10-25T11:39:28.000Z",
"name": "stepName",
"startedAt": "2023-10-25T11:39:24.000Z",
"status": "Failed",
"stepType": "standard",
"command": "echo Done!",
"image": "<image-url>",
"output": {
"exitCode": "1",
"message": "Error (exit code 1)"
},
"resources": {
"instanceTypeId": "21",
"storage": "40 GB"
}
},
{
"childern": [],
"finishedAt": "2023-10-25T11:38:08.000Z",
"name": "step-1",
"startedAt": "2023-10-25T11:37:54.000Z",
"status": "Failed",
"stepType": "sqlQuery",
"output": {
"exitCode": "0"
}
}
]
}
As clear from the above example, the details returned for a Standard Step differs from the details for other types of steps (SQL Query in the above case). Steps other than Standard miss a few keys like -
image
,resources
, andcommand
.
Retrieving Workflow Step Execution Logs
To obtain execution logs for a specific step within a workflow, you’ll need to provide the workflow ID, execution ID, and the name of the step.
Using Next Token
To fetch execution logs, use the following command:
peak workflows get-execution-logs --workflow-id <workflow_id> --execution-id <execution_id> --step-name <step_name>
This command returns a set of logs along with a nextToken
that can be used to fetch subsequent logs. The response looks like following:
{
"logs": [
{
"timestamp": "1697089188119",
"message": "log message 1"
},
{
"timestamp": "1697089188119",
"message": "log message 2"
}
],
"nextToken": "next_token",
"stepRunStatus": "Success"
}
To retrieve the next set of logs, use the nextToken
received in the response as follows:
peak workflows get-execution-logs --workflow-id <workflow_id> --execution-id <execution_id> --step-name <step_name> --next-token <next_token>
Polling the logs
To continuously poll the logs for a workflow step, you can use the --follow
flag. This command will keep polling the logs until the execution is completed or all logs are fetched.
peak workflows get-execution-logs --workflow-id <workflow_id> --execution-id <execution_id> --step-name <step_name> --follow
Save the logs locally to a file
Workflow step execution logs can also be saved locally to a file using the --save
option. We can also pass in a --file-name
option which should include the file name and the path to used when saving the logs. If the --file-name
option isn’t passed, the logs would be saved to a file named workflow_execution_logs_<workflow_id>_<execution_id>_<step_name>.log
.
peak workflows get-execution-logs --workflow-id <workflow_id> --execution-id <execution_id> --step-name <step_name> --save --file-name <file_name>
Note The --save
option would take the highest prioroty when both --follow
and --save
options are passed.