Workflows

Note: Workflows with only standard steps are supported.

Providing Instance Type and Storage in Workflow

When defining a workflow step, we have the option to set the instance type and storage by including them under the resources key in the following YAML format:

resources:
    instanceTypeId: 23,
    storage: 20GB,

To obtain a list of all available instances along with their corresponding instanceTypeId, we can use the following command:

peak tenants list-instance-options --entity-type workflow

Adding the resources section is optional. If we don’t specify it for a particular step, the default values will be used. We can retrieve the default values through the following command:

peak workflows list-default-resources

Creating a workflow

We can create a workflow by providing payload containing its configuration.

# workflow.yaml

body:
  name: new-workflow
  triggers:
    - cron: "0 0 * * *"
  watchers:
    - user: abc@peak.ai
      events:
        success: false
        fail: true
    - webhook:
        name: info
        url: "https://abc.com/post"
        payload: '{ "pingback-url": "https:/workflow/123" }'
      events:
        success: false
        fail: true
        runtimeExceeded: 10
    - email:
        name: "email-watcher-1"
        recipients:
          to:
            - user1@peak.ai
            - user2@peak.ai
      events:
        success: false
        fail: true
        runtimeExceeded: 10
  retryOptions:
    duration: 5
    exitCodes: [1, 2]
    exponentialBackoff: true
    numberOfRetries: 3
  tags:
    - name: foo
    - name: bar
  steps:
    stepName:
      type: standard
      imageId: 100
      imageVersionId: 100
      command: "python script.py"
      resources:
        instanceTypeId: 21
        storage: 10GB
      parents: []
      stepTimeout: 30
      clearImageCache: true
      parameters:
        env:
          key: value
        secrets:
          - secret-1
          - secret-2
      repository:
        branch: main
        token: random_token
        url: "https://github.com/org/repo"

We can use the following command to create a workflow:

peak workflows create path/to/create_workflow.yaml -v path/to/workflow_params.yaml

Updating a workflow

We can update a workflow by providing payload containing its configuration.

# workflow.yaml

body:
  name: updated-workflow
  triggers:
    - webhook: true
  watchers:
    - user: abc@peak.ai
      events:
        success: false
        fail: true
    - webhook:
        name: info
        url: "https://abc.com/post"
        payload: '{ "pingback-url": "https:/workflow/123" }'
      events:
        success: false
        fail: true
        runtimeExceeded: 10
    - email:
        name: "email-watcher-1"
        recipients:
          to:
            - user1@peak.ai
            - user2@peak.ai
      events:
        success: false
        fail: true
        runtimeExceeded: 10
  retryOptions:
    duration: 5
    exitCodes: [1, 2]
    exponentialBackoff: true
    numberOfRetries: 3
  tags:
    - name: CLI
  steps:
    step1:
      command: echo hello world
      type: standard
      repository:
        branch: main
        token: random_token
        url: "https://github.com/org/repo"
      imageId: 100
      imageVersionId: 100
      resources:
        instanceTypeId: 21
        storage: 10GB
    step2:
      command: echo world
      type: standard
      imageId: 200
      imageVersionId: 200
      resources:
        instanceTypeId: 21
        storage: 40GB

We can use the following command to update a workflow:

peak workflows update  -v path/to/workflow_params.yaml <workflow-id> path/to/update_workflow.yaml

Using the create-or-update operation

We can create or update a workflow by providing payload containing its configuration. The search operation will be based on name.

# workflow.yaml

body:
  name: my-new-workflow
  triggers:
    - webhook: true
  watchers:
    - user: abc@peak.ai
      events:
        success: false
        fail: true
    - webhook:
        name: info
        url: "https://abc.com/post"
        payload: '{ "pingback-url": "https:/workflow/123" }'
      events:
        success: false
        fail: true
        runtimeExceeded: 10
    - email:
        name: "email-watcher-1"
        recipients:
          to:
            - user1@peak.ai
            - user2@peak.ai
      events:
        success: false
        fail: true
        runtimeExceeded: 10
  retryOptions:
    duration: 5
    exitCodes: [1, 2]
    exponentialBackoff: true
    numberOfRetries: 3
  tags:
    - name: CLI
  steps:
    step1:
      command: echo hello world
      type: standard
      repository:
        branch: main
        token: random_token
        url: https://example.com
      imageId: 100
      imageVersionId: 100
      resources:
        instanceTypeId: 21
        storage: 10GB
    step2:
      command: echo world
      type: standard
      imageId: 200
      imageVersionId: 200
      resources:
        instanceTypeId: 21
        storage: 40GB

We can use the following command to create or update a workflow:

peak workflows create-or-update  -v path/to/workflow_params.yaml path/to/create_or_update_workflow.yaml

Partial Workflow Update using patch workflow

Consider an existing workflow with step details:

"name": "existing-workflow",
"triggers": [],
"steps": {
    "step1": {
        "imageId": 1,
        "imageVersionId": 1,
        "command": "python test1.py",
        "resources": {
            "instanceTypeId": 21,
            "storage": "10GB",
        }
    },
    "step2": {
        "imageId": 2,
        "imageVersionId": 2,
        "command": "python test2.py",
         "resources": {
            "instanceTypeId": 22,
            "storage": "10GB",
        }
    },
}

We can partially update a workflow in the following ways.

  1. Command Line Arguments Approach

By passing the parameters to be updated as command line arguments. We can provide step names --step-names, which would update the steps with the provided parameters. In case step names are not provided, all the steps of the workflow would be updated.

peak workflows patch <workflow_id> --name updated-workflow --image-id 100 --image-version-id 100 --step-names step1

This would update imageId and imageVersionId of step1 and step2 of the workflow.

"name": "updated-workflow",
"steps": {
    "step1": {
        "imageId": 100,
        "imageVersionId": 100,
        "command": "python test1.py",
        "resources": {
            "instanceTypeId": 21,
            "storage": "10GB",
        },
    },
    "step2": {
        "imageId": 2,
        "imageVersionId": 2,
        "command": "python test2.py",
        "resources": {
            "instanceTypeId": 22,
            "storage": "10GB",
        },
    },
}
  1. YAML File Approach

We can update parameters via a YAML file:

# patch-workflow.yaml

body:
  name: "updated-workflow"
  triggers:
    - {}
  watchers:
    - user: abc@peak.ai
      events:
        success: false
        fail: true
    - webhook:
        name: info
        url: "https://abc.com/post"
        payload: '{ "pingback-url": "https:/workflow/123" }'
      events:
        success: false
        fail: true
        runtimeExceeded: 10
    - email:
        name: "email-watcher-1"
        recipients:
          to:
            - user1@peak.ai
            - user2@peak.ai
      events:
        success: false
        fail: true
        runtimeExceeded: 10
  retryOptions:
    duration: 5
    exitCodes: [1, 2]
    exponentialBackoff: true
    numberOfRetries: 3
  steps:
    step1:
      imageId: 100
      imageVersionId: 100
    step3:
      imageId: 400
      imageVersionId: 400
      command: "python test4.py"
      resources:
        instanceTypeId: 24
      repository:
        url: "https://github.com/org/repo"
        branch: main
        token: random_token

Use the following command to update the workflow:

peak workflows patch <workflow_id> path/to/patch_workflow.yaml

The updated workflow would look like following:

"name": "updated-workflow",
"triggers": [],
"steps": {
    "step1": {
        "imageId": 100,
        "imageVersionId": 100,
        "command": "python test1.py",
        "resources": {
            "instanceTypeId": 21,
            "storage": "10GB",
        },
    },
    "step2": {
        "imageId": 2,
        "imageVersionId": 2,
        "command": "python test2.py",
        "resources": {
            "instanceTypeId": 22,
            "storage": "10GB",
        },
    },
    "step3": {
        "imageId": 400,
        "imageVersionId": 400,
        "command": "python test4.py",
        "resources": {
            "instanceTypeId": 24,
            "storage": "10GB",
        },
        "repository": {
            "branch": "main",
            "token": "github",
            "url": "https://github.com/org/repo",
        },
    },
}

Through the above approach, specific keys of step1 are updated, while step2 remains unchanged, and step3 is added as a new step.

We can also combine the YAML file and command line options, with command line options taking precedence.

Note that the keys and values in the above examples are for illustrative purposes.

Setting up advanced workflow configurations

You can customize workflows using options such as Auto retry, Skippable steps, Output Parameters, and Conditional step execution by including the relevant payload in the workflow creation or update process.

More information about these options can be found in the Peak Platform Knowledge Base.

Auto retry configuration

Auto retry is a powerful feature that automates the retrying of failed steps within a workflow. This configuration can be applied at either the workflow level or the individual step level, with the latter taking precedence over the former.

To implement auto retry at the workflow level, use the following payload example:

# workflow-auto-retry.yaml

body:
  name: new-workflow
  triggers:
    - cron: "0 0 * * *"
  retryOptions:
    duration: 5
    exitCodes: [1, 2]
    exponentialBackoff: true
    numberOfRetries: 3
  steps:
    step-1:
      type: standard
      imageId: 100
      imageVersionId: 100
      command: echo hello
      resources:
        instanceTypeId: 21
        storage: 10GB
      parents: []
      stepTimeout: 30
      clearImageCache: true
      runConfiguration:
        retryOptions:
          duration: 5
          exitCodes: [1, 137, 143]
          exponentialBackoff: true
          numberOfRetries: 3
    step-2:
      type: standard
      imageId: 100
      imageVersionId: 100
      command: echo world
      resources:
        instanceTypeId: 21
        storage: 10GB
      parents: []
      stepTimeout: 30
      clearImageCache: true

In the provided example, auto retry is configured at the workflow level, affecting all steps within the workflow.

Auto retry configuration set at the step level will override the workflow-level settings.

Skippable steps configuration

Configure individual workflow steps or their dependencies to be skipped during execution by providing the necessary payload during workflow creation or update.

Use the payload example below to create a workflow with skippable steps configuration:

# workflow-auto-retry.yaml

body:
  name: new-workflow
  triggers:
    - cron: "0 0 * * *"
  steps:
    step-1:
      type: standard
      imageId: 100
      imageVersionId: 100
      command: echo hello
      resources:
        instanceTypeId: 21
        storage: 10GB
      parents: []
      stepTimeout: 30
      clearImageCache: true
      runConfiguration:
        skipConfiguration:
          skip: true
          skipDAG: true
    step-2:
      type: standard
      imageId: 100
      imageVersionId: 100
      command: echo world
      resources:
        instanceTypeId: 21
        storage: 10GB
      parents: ["step-1"]
      stepTimeout: 30
      clearImageCache: true

In this example, step-1 is configured to be skippable, and the skipDAG property is set to true, skipping all steps dependent on step-1.

If skipDAG is true for a parent step, all children steps will be skipped, regardless of their individual configuration.

Input parameters configuration

Input parameters enable you to pass dynamic values as environment variables, which your scripts can utilize at runtime. These values can be used to control the behavior of the step or to provide input to the step.

Input parameters can be employed in two ways:

  1. Using Key-Value pairs

In this method, input parameters are provided as key-value pairs. The key denotes the name of the environment variable, while the corresponding value sets the variable’s runtime value.

  1. Using Secrets or External Credentials

Alternatively, input parameters can be passed as a list of predefined secrets or external credentials, enhancing security and usability.

To create a workflow with input parameters, you can refer the following payload example:

# workflow_input_parameters.yaml

body:
  name: new-workflow
  triggers:
    - cron: "0 0 * * *"
  steps:
    step-1:
      type: standard
      imageId: 100
      imageVersionId: 100
      command: "echo hello"
      resources:
        instanceTypeId: 21
        storage: 10GB
      parents: []
      stepTimeout: 30
      clearImageCache: true
      parameters:
        env:
          key: value
        secrets:
          - secret-1
          - secret-2

Output parameters configuration

Output parameters allows you to capture and share the output generated by a parent step in a workflow. This captured output can then be used as input or reference in subsequent steps within the same workflow.

Output parameters can be defined by providing a name and path to the file that contains the data that should be exported. The file path specified in the output parameter should exist in the step, else an error will be thrown when the step is run. Once a parent has declared an output parameter, the child steps can use it as an input parameter.

To create a workflow with output parameters, use the following payload example:

# workflow_output_parameters.yaml

body:
  name: new-workflow
  triggers:
    - cron: "0 0 * * *"
  steps:
    step-1:
      type: standard
      imageId: 100
      imageVersionId: 100
      command: "python script1.py > output.txt"
      resources:
        instanceTypeId: 21
        storage: 10GB
      parents: []
      stepTimeout: 30
      clearImageCache: true
      repository:
        branch: main
        token: random_token
        url: "https://github.com/org/repo"
      outputParameters:
        scriptOutput: "output.txt"

In the provided example, an output parameter named scriptOutput is defined for step-1, and this parameter can be utilized in any children steps.

Execution condition configuration

Configure individual workflow steps to execute based on the output parameters or status of parent steps. This feature allows you to control the flow of your workflow dynamically.

To create a workflow with execution condition configuration, use the following payload example:

# workflow-execution-parameters.yaml

body:
  name: new-workflow
  triggers:
    - cron: "0 0 * * *"
  steps:
    step-1:
      type: standard
      imageId: 100
      imageVersionId: 100
      command: "python script1.py > output.txt"
      resources:
        instanceTypeId: 21
        storage: 10GB
      parents: []
      stepTimeout: 30
      clearImageCache: true
      repository:
        branch: main
        token: random_token
        url: "https://github.com/org/repo"
      outputParameters:
        scriptOutput: "output.txt"
    step-2:
      type: standard
      imageId: 100
      imageVersionId: 100
      command: "python script2.py"
      resources:
        instanceTypeId: 21
        storage: 10GB
      parents: ["step-1"]
      stepTimeout: 30
      clearImageCache: true
      repository:
        branch: main
        token: random_token
        url: "https://github.com/org/repo"
      executionParameters:
        conditional:
          - condition: "equals"
            paramName: "scriptOutput"
            stepName: "step-1"
            value: "output-value"
        parentStatus:
          - condition: "one-of"
            parents:
              - step-1
            status:
              - success

In this example, step-2 is configured to execute only if the output parameter scriptOutput of step-1 is equal to output-value and if the status of step-1 is success.

Executing a workflow

After creating a workflow, the next step is to execute it using the provided workflow ID. Dynamic parameters, defined as environment variables, can be passed to the steps during execution to enhance flexibility.

# execute_workflow.yaml

body:
  params:
    global:
      param1: value1
    stepWise:
      step1:
        param2: value2
      step2:
        param3: value3

Use the following command to execute the workflow:

peak workflows execute <workflow-id> path/to/execute_workflow.yaml

Partial Workflow Execution

If you don’t want to execute the entire workflow, you can selectively run specific steps or their dependencies. Provide the steps to be executed in the YAML configuration file, as shown below:

# execute_workflow.yaml

body:
  params:
    global:
      param1: value1
    stepWise:
      step1:
        param2: value2
      step2:
        param3: value3
  stepsToRun:
    - stepName: step1
      runDAG: true
    - stepName: step2

In this example, step1 and step2 are specified for execution. Note that runDAG is set to true for step1, which triggers the execution of all steps dependent on step1.

Retrieving Workflow Execution Details

When you run a workflow, you might want to get the details about the run - which step ran, which one failed, which one succeeded, etc. To get this detail, we have a command. Just run the following command to get the details about the execution

peak workflows get-execution-details --workflow-id=<workflow-id> --execution-id=<execution-id>

This returns the following details about the execution

{
    "executedAt": "2023-10-25T11:39:24.365Z",
    "finishedAt": "2023-10-25T11:39:37.342Z",
    "runId": "a0ac50ea-23ee-4256-bdd9-6a01a9bbe4f4",
    "status": "Failed",
    "steps": [
        {
            "childern": ["step-1"],
            "finishedAt": "2023-10-25T11:39:28.000Z",
            "name": "stepName",
            "startedAt": "2023-10-25T11:39:24.000Z",
            "status": "Failed",
            "stepType": "standard",
            "command": "echo Done!",
            "image": "<image-url>",
            "output": {
                "exitCode": "1",
                "message": "Error (exit code 1)"
            },
            "resources": {
                "instanceTypeId": "21",
                "storage": "40 GB"
            }
        },
        {
            "childern": [],
            "finishedAt": "2023-10-25T11:38:08.000Z",
            "name": "step-1",
            "startedAt": "2023-10-25T11:37:54.000Z",
            "status": "Failed",
            "stepType": "sqlQuery",
            "output": {
                "exitCode": "0"
            }
        }
    ]
}

As clear from the above example, the details returned for a Standard Step differs from the details for other types of steps (SQL Query in the above case). Steps other than Standard miss a few keys like - image, resources, and command.

Retrieving Workflow Step Execution Logs

To obtain execution logs for a specific step within a workflow, you’ll need to provide the workflow ID, execution ID, and the name of the step.

  1. Using Next Token

To fetch execution logs, use the following command:

peak workflows get-execution-logs --workflow-id <workflow_id> --execution-id <execution_id> --step-name <step_name>

This command returns a set of logs along with a nextToken that can be used to fetch subsequent logs. The response looks like following:

{
    "logs": [
        {
            "timestamp": "1697089188119",
            "message": "log message 1"
        },
        {
            "timestamp": "1697089188119",
            "message": "log message 2"
        }
    ],
    "nextToken": "next_token",
    "stepRunStatus": "Success"
}

To retrieve the next set of logs, use the nextToken received in the response as follows:

peak workflows get-execution-logs --workflow-id <workflow_id> --execution-id <execution_id> --step-name <step_name> --next-token <next_token>
  1. Polling the logs

To continuously poll the logs for a workflow step, you can use the --follow flag. This command will keep polling the logs until the execution is completed or all logs are fetched.

peak workflows get-execution-logs --workflow-id <workflow_id> --execution-id <execution_id> --step-name <step_name> --follow
  1. Save the logs locally to a file

Workflow step execution logs can also be saved locally to a file using the --save option. We can also pass in a --file-name option which should include the file name and the path to used when saving the logs. If the --file-name option isn’t passed, the logs would be saved to a file named workflow_execution_logs_<workflow_id>_<execution_id>_<step_name>.log.

peak workflows get-execution-logs --workflow-id <workflow_id> --execution-id <execution_id> --step-name <step_name> --save --file-name <file_name>

Note The --save option would take the highest prioroty when both --follow and --save options are passed.