Workflows
Providing Instance Type and Storage in Workflow
When defining a workflow step, we have the option to set the instance type and storage by including them under the resources
key in the following YAML format:
resources:
instanceTypeId: 23,
storage: 20GB,
To obtain a list of all available instances along with their corresponding instanceTypeId
, we can use the following command:
peak tenants list-instance-options --entity-type workflow
Adding the resources
section is optional. If we don’t specify it for a particular step, the default values will be used. We can retrieve the default values through the following command:
peak workflows list-default-resources
Creating a workflow
We can create a workflow by providing payload containing its configuration.
# workflow.yaml
body:
name: new-workflow
triggers:
- cron: 0 0 * * *
watchers:
- user: abc@peak.ai
events:
success: false
fail: true
- webhook:
name: info
url: "https://abc.com/post"
payload: |
{
"system": "external_system",
"action": "update",
"data": {
"field": "value",
"timestamp": "2024-05-20T12:00:00Z"
}
}
events:
success: false
fail: true
runtimeExceeded: 10
- email:
name: email-watcher-1
recipients:
to:
- user1@peak.ai
- user2@peak.ai
events:
success: false
fail: true
runtimeExceeded: 10
retryOptions:
duration: 5
exitCodes:
- 1
- 2
exponentialBackoff: true
numberOfRetries: 3
tags:
- name: foo
- name: bar
steps:
stepName:
type: standard
imageId: 100
imageVersionId: 100
command: python script.py
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
parameters:
env:
key: value
secrets:
- secret-1
- secret-2
repository:
branch: main
token: random_token
url: "https://github.com/org/repo"
stepName2:
type: http
method: post
url: "https://peak.ai"
payload: "{}"
auth:
type: no-auth
headers:
absolute:
Content-Type: application/json
stepName3:
type: sql
sqlQueryPath: path/to/sql
parameters:
env:
key1: value1
key2: value2
inherit:
key3: value3
key4: value4
stepName4:
type: export
schema: schema_name
table: table_name
sortBy: column_name
sortOrder: asc
compression: false
We can use the following command to create a workflow:
peak workflows create path/to/create_workflow.yaml -v path/to/workflow_params.yaml
Updating a workflow
We can update a workflow by providing payload containing its configuration.
# workflow.yaml
body:
name: updated-workflow
triggers:
- webhook: true
watchers:
- user: abc@peak.ai
events:
success: false
fail: true
- webhook:
name: info
url: "https://abc.com/post"
payload: |
{
"system": "external_system",
"action": "update",
"data": {
"field": "value",
"timestamp": "2024-05-20T12:00:00Z"
}
}
events:
success: false
fail: true
runtimeExceeded: 10
- email:
name: email-watcher-1
recipients:
to:
- user1@peak.ai
- user2@peak.ai
events:
success: false
fail: true
runtimeExceeded: 10
retryOptions:
duration: 5
exitCodes:
- 1
- 2
exponentialBackoff: true
numberOfRetries: 3
tags:
- name: CLI
steps:
step1:
command: echo hello world
type: standard
repository:
branch: main
token: random_token
url: "https://github.com/org/repo"
imageId: 100
imageVersionId: 100
resources:
instanceTypeId: 21
storage: 10GB
step2:
command: echo world
type: standard
imageId: 200
imageVersionId: 200
resources:
instanceTypeId: 21
storage: 40GB
step3:
type: http
method: post
url: "https://peak.ai"
payload: "{}"
auth:
type: no-auth
headers:
absolute:
Content-Type: application/json
step4:
type: sql
repository:
branch: main
token: random_token
url: "https://example.com"
filePath: "path/to/file.sql"
parameters:
env:
key1: value1
key2: value2
inherit:
key3: value3
key4: value4
step5:
type: export
schema: schema_name
table: table_name
sortBy: column_name
sortOrder: asc
compression: false
We can use the following command to update a workflow:
peak workflows update -v path/to/workflow_params.yaml <workflow-id> path/to/update_workflow.yaml
Using the create-or-update
operation
We can create or update a workflow by providing payload containing its configuration.
The search operation will be based on name
.
# workflow.yaml
body:
name: my-new-workflow
triggers:
- webhook: true
watchers:
- user: abc@peak.ai
events:
success: false
fail: true
- webhook:
name: info
url: "https://abc.com/post"
payload: |
{
"system": "external_system",
"action": "update",
"data": {
"field": "value",
"timestamp": "2024-05-20T12:00:00Z"
}
}
events:
success: false
fail: true
runtimeExceeded: 10
- email:
name: email-watcher-1
recipients:
to:
- user1@peak.ai
- user2@peak.ai
events:
success: false
fail: true
runtimeExceeded: 10
retryOptions:
duration: 5
exitCodes:
- 1
- 2
exponentialBackoff: true
numberOfRetries: 3
tags:
- name: CLI
steps:
step1:
command: echo hello world
type: standard
repository:
branch: main
token: random_token
url: "https://example.com"
imageId: 100
imageVersionId: 100
resources:
instanceTypeId: 21
storage: 10GB
step2:
command: echo world
type: standard
imageId: 200
imageVersionId: 200
resources:
instanceTypeId: 21
storage: 40GB
step3:
type: http
method: post
url: "https://peak.ai"
payload: "{}"
auth:
type: no-auth
headers:
absolute:
Content-Type: application/json
step4:
type: sql
repository:
branch: main
token: random_token
url: "https://example.com"
filePath: "path/to/file.sql"
parameters:
env:
key1: value1
key2: value2
inherit:
key3: value3
key4: value4
step5:
type: export
schema: schema_name
table: table_name
sortBy: column_name
sortOrder: asc
compression: false
We can use the following command to create or update a workflow:
peak workflows create-or-update -v path/to/workflow_params.yaml path/to/create_or_update_workflow.yaml
Partial Workflow Update using patch
workflow
Consider an existing workflow with step details:
"name": "existing-workflow",
"triggers": [],
"steps": {
"step1": {
"imageId": 1,
"imageVersionId": 1,
"command": "python test1.py",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
}
},
"step2": {
"imageId": 2,
"imageVersionId": 2,
"command": "python test2.py",
"resources": {
"instanceTypeId": 22,
"storage": "10GB",
}
},
}
We can partially update a workflow in the following ways.
Command Line Arguments Approach
By passing the parameters to be updated as command line arguments. We can provide step names --step-names
, which would update the steps with the provided parameters. In case step names are not provided, all the steps of the workflow would be updated.
peak workflows patch <workflow_id> --name updated-workflow --image-id 100 --image-version-id 100 --step-names step1
This would update imageId
and imageVersionId
of step1
and step2
of the workflow.
"name": "updated-workflow",
"steps": {
"step1": {
"imageId": 100,
"imageVersionId": 100,
"command": "python test1.py",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
},
"step2": {
"imageId": 2,
"imageVersionId": 2,
"command": "python test2.py",
"resources": {
"instanceTypeId": 22,
"storage": "10GB",
},
},
}
YAML File Approach
We can update parameters via a YAML file:
# patch-workflow.yaml
body:
name: updated-workflow
triggers:
- {}
watchers:
- user: abc@peak.ai
events:
success: false
fail: true
- webhook:
name: info
url: "https://abc.com/post"
payload: |
{
"system": "external_system",
"action": "update",
"data": {
"field": "value",
"timestamp": "2024-05-20T12:00:00Z"
}
}
events:
success: false
fail: true
runtimeExceeded: 10
- email:
name: email-watcher-1
recipients:
to:
- user1@peak.ai
- user2@peak.ai
events:
success: false
fail: true
runtimeExceeded: 10
retryOptions:
duration: 5
exitCodes:
- 1
- 2
exponentialBackoff: true
numberOfRetries: 3
steps:
step1:
imageId: 100
imageVersionId: 100
step2:
type: http
method: post
url: "https://peak.ai"
payload: "{}"
auth:
type: no-auth
headers:
absolute:
Content-Type: application/json
step3:
imageId: 400
imageVersionId: 400
command: python test4.py
resources:
instanceTypeId: 24
repository:
url: "https://github.com/org/repo"
branch: main
token: random_token
Use the following command to update the workflow:
peak workflows patch <workflow_id> path/to/patch_workflow.yaml
The updated workflow would look like following:
"name": "updated-workflow",
"triggers": [],
"steps": {
"step1": {
"imageId": 100,
"imageVersionId": 100,
"command": "python test1.py",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
},
"step2": {
"imageId": 2,
"imageVersionId": 2,
"command": "python test2.py",
"resources": {
"instanceTypeId": 22,
"storage": "10GB",
},
},
"step3": {
"imageId": 400,
"imageVersionId": 400,
"command": "python test4.py",
"resources": {
"instanceTypeId": 24,
"storage": "10GB",
},
"repository": {
"branch": "main",
"token": "github",
"url": "https://github.com/org/repo",
},
},
}
Through the above approach, specific keys of step1
are updated, while step2
remains unchanged, and step3
is added as a new step.
We can also combine the YAML file and command line options, with command line options taking precedence.
Note that the keys and values in the above examples are for illustrative purposes.
Setting up advanced workflow configurations
You can customize workflows using options such as Auto retry
, Skippable steps
, Output Parameters
, and Conditional step execution
by including the relevant payload in the workflow creation or update process.
More information about these options can be found in the Peak Platform Knowledge Base.
Auto retry configuration
Auto retry is a powerful feature that automates the retrying of failed steps within a workflow. This configuration can be applied at either the workflow level or the individual step level, with the latter taking precedence over the former.
To implement auto retry at the workflow level, use the following payload example:
# workflow-auto-retry.yaml
body:
name: new-workflow
triggers:
- cron: 0 0 * * *
retryOptions:
duration: 5
exitCodes:
- 1
- 2
exponentialBackoff: true
numberOfRetries: 3
steps:
step-1:
type: standard
imageId: 100
imageVersionId: 100
command: echo hello
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
runConfiguration:
retryOptions:
duration: 5
exitCodes:
- 1
- 137
- 143
exponentialBackoff: true
numberOfRetries: 3
step-2:
type: http
parents: []
url: "https://peak.ai"
payload: "{}"
auth:
type: bearer-token
bearerToken: token
headers:
absolute:
Content-Type: application/json
runConfiguration:
retryOptions:
duration: 2
exitCodes:
- 1
exponentialBackoff: true
numberOfRetries: 3
step-3:
type: sql
repository:
branch: main
token: random_token
url: "https://example.com"
filePath: "path/to/file.sql"
parameters:
env:
key1: value1
key2: value2
inherit:
key3: value3
key4: value4
runConfiguration:
retryOptions:
duration: 2
exitCodes:
- 1
exponentialBackoff: true
numberOfRetries: 3
step-4:
type: export
schema: schema_name
table: table_name
sortBy: column_name
sortOrder: asc
compression: false
runConfiguration:
retryOptions:
duration: 2
exitCodes:
- 1
exponentialBackoff: true
numberOfRetries: 3
In the provided example, auto retry is configured at the workflow level, affecting all steps within the workflow.
Auto retry configuration set at the step level will override the workflow-level settings.
Skippable steps configuration
Configure individual workflow steps or their dependencies to be skipped during execution by providing the necessary payload during workflow creation or update.
Use the payload example below to create a workflow with skippable steps configuration:
# workflow-auto-retry.yaml
body:
name: new-workflow
triggers:
- cron: 0 0 * * *
steps:
step-1:
type: standard
imageId: 100
imageVersionId: 100
command: echo hello
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
runConfiguration:
skipConfiguration:
skip: true
skipDAG: false
step-2:
type: http
parents: []
url: "https://peak.ai"
payload: "{}"
auth:
type: bearer-token
bearerToken: token
headers:
absolute:
Content-Type: application/json
runConfiguration:
skipConfiguration:
skip: false
skipDAG: true
step4:
type: sql
repository:
branch: main
token: random_token
url: "https://example.com"
filePath: "path/to/file.sql"
parameters:
env:
key1: value1
key2: value2
inherit:
key3: value3
key4: value4
runConfiguration:
skipConfiguration:
skip: false
skipDAG: true
step5:
type: export
schema: schema_name
table: table_name
sortBy: column_name
sortOrder: asc
compression: false
runConfiguration:
skipConfiguration:
skip: false
skipDAG: true
In this example, step-1
is configured to be skippable, and the skipDAG
property is set to false
. This will skip only step-1
. For step-2
, skip
is set to false
but skipDAG
is set to true
- this will skip all the children of step-2
but step-2
itself won’t be skipped.
If
skipDAG
istrue
for a parent step, all children steps will be skipped, regardless of their individual configuration.
Input parameters configuration
Input parameters enable you to pass dynamic values as environment variables, which your scripts can utilize at runtime. These values can be used to control the behavior of the step or to provide input to the step.
Input parameters can be employed in three ways:
Using Key-Value pairs
In this method, input parameters are provided as key-value pairs. The key denotes the name of the environment variable, while the corresponding value sets the variable’s runtime value.
Using Secrets or External Credentials
Alternatively, input parameters can be passed as a list of predefined secrets or external credentials, enhancing security and usability.
To create a workflow with input parameters, you can refer the following payload example:
# workflow_input_parameters.yaml
body:
name: new-workflow
triggers:
- cron: 0 0 * * *
steps:
step-1:
type: standard
imageId: 100
imageVersionId: 100
command: echo hello
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
parameters:
env:
key: value
secrets:
- secret-1
- secret-2
step-2:
type: http
parents: []
url: "https://peak.ai"
payload: |
{
"data": "{{data}}",
"secret": "{{SECRET_CUSTOM}}"
}
auth:
type: bearer-token
bearerToken: token
headers:
absolute:
Content-Type: application/json
parameters:
env:
data: data
secrets:
- secret
step-3:
type: sql
repository:
branch: main
token: random_token
url: "https://example.com"
filePath: "path/to/file.sql"
parameters:
env:
key1: value1
key2: value2
inherit:
output: scriptOutput
step-4:
type: export
schema: schema_name
table: table_name
sortBy: column_name
sortOrder: asc
compression: false
Using Output parameters from parents
You can also pass output parameters from parents as input parameters in children. This is done through the inherit
property which is an object where key represent the name of the input parameter and value represents the name of the output parameter.
Here’s an example that uses the inherit
property to get the output parameters of step-1
as input in step-2
:
# workflow_output_parameters.yaml
body:
name: new-workflow
triggers:
- cron: 0 0 * * *
steps:
step-1:
type: standard
imageId: 100
imageVersionId: 100
command: python script1.py > output.txt
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
repository:
branch: main
token: random_token
url: "https://github.com/org/repo"
outputParameters:
scriptOutput: output.txt
step-2:
type: http
parents: []
url: "https://peak.ai"
payload: |
{
"data": "{{output}}",
"secret": "{{SECRET_CUSTOM}}"
}
auth:
type: bearer-token
bearerToken: token
headers:
absolute:
Content-Type: application/json
outputParameters:
http_response: response.txt
parameters:
inherit:
output: scriptOutput
step-3:
type: sql
repository:
branch: main
token: random_token
url: "https://example.com"
filePath: "path/to/file.sql"
parameters:
env:
key1: value1
key2: value2
inherit:
output: scriptOutput
step-4:
type: export
schema: schema_name
table: table_name
sortBy: column_name
sortOrder: asc
compression: false
As we can see in the example, input parameters can also be used in HTTP steps and we can configure the payload to use these parameters using the {{}}
syntax.
Output parameters configuration
Output parameters allows you to capture and share the output generated by a parent step in a workflow. This captured output can then be used as input or reference in subsequent steps within the same workflow.
Output parameters can be defined by providing a name and path to the file that contains the data that should be exported. The file path specified in the output parameter should exist in the step, else an error will be thrown when the step is run. Once a parent has declared an output parameter, the child steps can use it as an input parameter.
To create a workflow with output parameters, use the following payload example:
# workflow_output_parameters.yaml
body:
name: new-workflow
triggers:
- cron: 0 0 * * *
steps:
step-1:
type: standard
imageId: 100
imageVersionId: 100
command: python script1.py > output.txt
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
repository:
branch: main
token: random_token
url: "https://github.com/org/repo"
outputParameters:
scriptOutput: output.txt
step-2:
type: http
parents: []
url: "https://peak.ai"
payload: |
{
"data": "{{data}}",
"secret": "{{SECRET_CUSTOM}}"
}
auth:
type: bearer-token
bearerToken: token
headers:
absolute:
Content-Type: application/json
outputParameters:
http_response: response.txt
In the provided example, an output parameter named scriptOutput
is defined for step-1
, and this parameter can be utilized in any children steps.
For HTTP step, one thing to remember is that it can only have one output parameter whose value must be response.txt
. This is because the HTTP step outputs the response of the API call in a file called response.txt
in its working directory.
Output parameter from one step can be added as input parameter in its child steps using the parameters.inherit
property. You can find more details in the input parameters section.
Execution condition configuration
Configure individual workflow steps to execute based on the output parameters or status of parent steps. This feature allows you to control the flow of your workflow dynamically.
To create a workflow with execution condition configuration, use the following payload example:
# workflow-execution-parameters.yaml
body:
name: new-workflow
triggers:
- cron: 0 0 * * *
steps:
step-1:
type: standard
imageId: 100
imageVersionId: 100
command: python script1.py > output.txt
resources:
instanceTypeId: 21
storage: 10GB
parents: []
stepTimeout: 30
clearImageCache: true
repository:
branch: main
token: random_token
url: "https://github.com/org/repo"
outputParameters:
scriptOutput: output.txt
step-2:
type: standard
imageId: 100
imageVersionId: 100
command: python script2.py
resources:
instanceTypeId: 21
storage: 10GB
parents:
- step-1
stepTimeout: 30
clearImageCache: true
repository:
branch: main
token: random_token
url: "https://github.com/org/repo"
executionParameters:
conditional:
- condition: equals
paramName: scriptOutput
stepName: step-1
value: output-value
parentStatus:
- condition: one-of
parents:
- step-1
status:
- success
step-3:
type: http
parents:
- step-1
url: "https://peak.ai"
payload: "{}"
auth:
type: bearer-token
bearerToken: token
headers:
absolute:
Content-Type: application/json
executionParameters:
conditional:
- condition: equals
paramName: scriptOutput
stepName: step-1
value: output-value
parentStatus:
- condition: all-of
parents:
- step-1
status:
- success
step-4:
type: sql
repository:
branch: main
token: random_token
url: "https://example.com"
filePath: "path/to/file.sql"
parameters:
env:
key1: value1
key2: value2
inherit:
key3: value3
key4: value4
executionParameters:
conditional:
- condition: equals
paramName: scriptOutput
stepName: step-1
value: output-value
parentStatus:
- condition: all-of
parents:
- step-1
status:
- success
step-5:
type: export
schema: schema_name
table: table_name
sortBy: column_name
sortOrder: asc
compression: false
executionParameters:
parentStatus:
- condition: all-of
parents:
- step-4
status:
- success
In this example, step-2
is configured to execute only if the output parameter scriptOutput
of step-1
is equal to output-value
and if the status of step-1
is success
.
Executing a workflow
After creating a workflow, the next step is to execute it using the provided workflow ID. Dynamic parameters, defined as environment variables, can be passed to the steps during execution to enhance flexibility.
# execute_workflow.yaml
body:
params:
global:
param1: value1
stepWise:
step1:
param2: value2
step2:
param3: value3
Use the following command to execute the workflow:
peak workflows execute <workflow-id> path/to/execute_workflow.yaml
Partial Workflow Execution
If you don’t want to execute the entire workflow, you can selectively run specific steps or their dependencies. Provide the steps to be executed in the YAML configuration file, as shown below:
# execute_workflow.yaml
body:
params:
global:
param1: value1
stepWise:
step1:
param2: value2
step2:
param3: value3
stepsToRun:
- stepName: step1
runDAG: true
- stepName: step2
In this example, step1
and step2
are specified for execution. Note that runDAG
is set to true
for step1
, which triggers the execution of all steps dependent on step1
.
Retrieving Workflow Execution Details
When you run a workflow, you might want to get the details about the run - which step ran, which one failed, which one succeeded, etc. To get this detail, we have a command. Just run the following command to get the details about the execution
peak workflows get-execution-details --workflow-id=<workflow-id> --execution-id=<execution-id>
This returns the following details about the execution
{
"executedAt": "2023-10-25T11:39:24.365Z",
"finishedAt": "2023-10-25T11:39:37.342Z",
"runId": "a0ac50ea-23ee-4256-bdd9-6a01a9bbe4f4",
"status": "Failed",
"steps": [
{
"childern": ["step-1", "step-2"],
"finishedAt": "2023-10-25T11:39:28.000Z",
"name": "stepName",
"startedAt": "2023-10-25T11:39:24.000Z",
"status": "Failed",
"stepType": "standard",
"command": "echo Done!",
"image": "<image-url>",
"output": {
"exitCode": "1",
"message": "Error (exit code 1)"
},
"resources": {
"instanceTypeId": "21",
"storage": "40 GB"
}
},
{
"childern": [],
"finishedAt": "2023-10-25T11:38:08.000Z",
"name": "step-1",
"startedAt": "2023-10-25T11:37:54.000Z",
"status": "Failed",
"stepType": "sqlQuery",
"output": {
"exitCode": "0"
}
},
{
"children": [],
"finishedAt": "2023-10-25T11:40:08.000Z",
"name": "step-1",
"startedAt": "2023-10-25T11:38:10.000Z",
"status": "Failed",
"stepType": "http",
"output": {
"exitCode": "0"
},
"authType": "api-key",
"endpoint": "https://service.dev.peak.ai/workflows/api/v1/workflows",
"method": "get"
}
]
}
As clear from the above example, the details returned for a Standard Step differs from the details for other types of steps (SQL Query in the above case). Steps other than Standard miss a few keys like -
image
,resources
, andcommand
. Whereas the HTTP steps contain a bunch of keys that aren’t present in other step types -authType
,endpoint
andmethod
.
Retrieving Workflow Step Execution Logs
To obtain execution logs for a specific step within a workflow, you’ll need to provide the workflow ID, execution ID, and the name of the step.
Using Next Token
To fetch execution logs, use the following command:
peak workflows get-execution-logs --workflow-id <workflow_id> --execution-id <execution_id> --step-name <step_name>
This command returns a set of logs along with a nextToken
that can be used to fetch subsequent logs. The response looks like following:
{
"logs": [
{
"timestamp": "1697089188119",
"message": "log message 1"
},
{
"timestamp": "1697089188119",
"message": "log message 2"
}
],
"nextToken": "next_token",
"stepRunStatus": "Success"
}
To retrieve the next set of logs, use the nextToken
received in the response as follows:
peak workflows get-execution-logs --workflow-id <workflow_id> --execution-id <execution_id> --step-name <step_name> --next-token <next_token>
Polling the logs
To continuously poll the logs for a workflow step, you can use the --follow
flag. This command will keep polling the logs until the execution is completed or all logs are fetched.
peak workflows get-execution-logs --workflow-id <workflow_id> --execution-id <execution_id> --step-name <step_name> --follow
Save the logs locally to a file
Workflow step execution logs can also be saved locally to a file using the --save
option. We can also pass in a --file-name
option which should include the file name and the path to used when saving the logs. If the --file-name
option isn’t passed, the logs would be saved to a file named workflow_execution_logs_<workflow_id>_<execution_id>_<step_name>.log
.
peak workflows get-execution-logs --workflow-id <workflow_id> --execution-id <execution_id> --step-name <step_name> --save --file-name <file_name>
Note The --save
option would take the highest prioroty when both --follow
and --save
options are passed.