Workflow

Workflow client module.

peak.resources.workflows.get_client(session=None)

Returns a Workflow client, If no session is provided, a default session is used.

Parameters:

session (Optional[Session]) – A Session Object. Default is None.

Returns:

the workflow client object

Return type:

Workflow

Workflow Client

class peak.resources.workflows.Workflow(session=None)

Client class for interacting with workflows resource.

Parameters:

session (Session) –

create_or_update_workflow(body)

Creates a new workflow or updates an existing workflow based on workflow name.

REFERENCE:

🔗 API Documentation

Parameters:
  • body (Dict[str, Any]) – A dictionary containing the workflow config. Schema can be found below.

  • self (Workflow) –

Returns:

Id of the newly created or updated workflow.

Return type:

Dict[str, int]

SCHEMA:
{
    "name": "string(required)",
    "triggers": [
        {
            "cron": "string"
        },
        {
            "webhook": "boolean",
            "webhookPolicy": "string"
        }
    ],
    "watchers": [
        {
            "events": {
                "success": "boolean",
                "fail": "boolean",
                "runtimeExceeded": "number"
            },
            "user": "string",
            "webhook": {
                "name": "string(required)",
                "url": "string(required)",
                "payload": "string(required)",
            }
        }
    ],
    "retryOptions": {
        "duration": "number",
        "exitCodes": [],
        "exponentialBackoff": "boolean",
        "numberOfRetries": "number"
    },
    "tags": [
        {
            "name": "string"
        },
        {
            "name": "string"
        }
    ],
    "steps": {
        "stepName": {
            "type": "string",
            "imageId": "number",
            "imageVersionId": "number",
            "command": "string",
            "resources": {
                "instanceTypeId": "number",
                "storage": "string"
            },
            "parents": [],
            "stepTimeout": "number",
            "clearImageCache": "boolean",
            "repository": {
                "branch": "string",
                "token": "string",
                "url": "string"
            },
            "parameters": {
                "env": {
                    "key (string)": "value (string)"
                },
                "secrets": [],
            },
            "outputParameters": {
                "key (string)": "value (string)"
            },
            "executionParameters": {
                "conditional": [
                    {
                        "condition": "string",
                        "paramName": "string",
                        "stepName": "string",
                        "value": "string"
                    }
                ],
                "parentStatus": [
                    {
                        "condition": "string",
                        "parents": [],
                        "status": []
                    }
                ]
            },
            "runConfiguration": {
                "retryOptions": {
                    "duration": "number",
                    "exitCodes": [],
                    "exponentialBackoff": "boolean",
                    "numberOfRetries": "number"
                },
                "skipConfiguration": {
                    "skip": "boolean",
                    "skipDAG": "boolean",
                }
            }
        }
    }
}
Raises:
  • BadRequestException – The given request parameters are invalid.

  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • InternalServerErrorException – The server failed to process the request.

Parameters:
  • self (Workflow) –

  • body (Dict[str, Any]) –

Return type:

Dict[str, int]

create_workflow(body)

Create a new workflow. Workflows with only standard steps are supported.

REFERENCE:

🔗 API Documentation

Parameters:
  • body (Dict[str, Any]) – A dictionary containing the workflow config. Schema can be found below.

  • self (Workflow) –

Returns:

Id of the newly created workflow.

Return type:

Dict[str, int]

SCHEMA:
{
    "name": "string(required)",
    "triggers": [
        {
            "cron": "string"
        },
        {
            "webhook": "boolean",
            "webhookPolicy": "string"
        }
    ],
    "watchers": [
        {
            "events": {
                "success": "boolean",
                "fail": "boolean",
                "runtimeExceeded": "number"
            },
            "user": "string",
            "webhook": {
                "name": "string(required)",
                "url": "string(required)",
                "payload": "string(required)",
            }
        }
    ],
    "retryOptions": {
        "duration": "number",
        "exitCodes": [],
        "exponentialBackoff": "boolean",
        "numberOfRetries": "number"
    },
    "tags": [
        {
            "name": "string"
        },
        {
            "name": "string"
        }
    ],
    "steps": {
        "stepName": {
            "type": "string",
            "imageId": "number",
            "imageVersionId": "number",
            "command": "string",
            "resources": {
                "instanceTypeId": "number",
                "storage": "string"
            },
            "parents": [],
            "stepTimeout": "number",
            "clearImageCache": "boolean",
            "repository": {
                "branch": "string",
                "token": "string",
                "url": "string"
            },
            "parameters": {
                "env": {
                    "key (string)": "value (string)"
                },
                "secrets": [],
            },
            "outputParameters": {
                "key (string)": "value (string)"
            },
            "executionParameters": {
                "conditional": [
                    {
                        "condition": "string",
                        "paramName": "string",
                        "stepName": "string",
                        "value": "string"
                    }
                ],
                "parentStatus": [
                    {
                        "condition": "string",
                        "parents": [],
                        "status": []
                    }
                ]
            },
            "runConfiguration": {
                "retryOptions": {
                    "duration": "number",
                    "exitCodes": [],
                    "exponentialBackoff": "boolean",
                    "numberOfRetries": "number"
                },
                "skipConfiguration": {
                    "skip": "boolean",
                    "skipDAG": "boolean",
                }
            }
        }
    }
}
Raises:
  • BadRequestException – The given request parameters are invalid.

  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • InternalServerErrorException – The server failed to process the request.

Parameters:
  • self (Workflow) –

  • body (Dict[str, Any]) –

Return type:

Dict[str, int]

delete_workflow(workflow_id)

Delete a workflow.

REFERENCE:

🔗 API Documentation

Parameters:
  • workflow_id (int) – The ID of the workflow to delete.

  • self (Workflow) –

Returns:

Empty dictionary object.

Return type:

dict

Raises:
  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • NotFoundException – The given workflow does not exist.

  • ConflictException – If the workflow is in a conflicting state while deleting.

  • InternalServerErrorException – The server failed to process the request.

describe_workflow(workflow_id)

Retrieve details of a specific workflow. Workflows with only standard steps can be described.

REFERENCE:

🔗 API Documentation

Parameters:
  • workflow_id (int) – The ID of the workflow to retrieve.

  • self (Workflow) –

Returns:

A dictionary containing the details of the workflow.

Return type:

Dict[str, Any]

Raises:
  • BadRequestException – The given request parameters are invalid.

  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • NotFoundException – The given workflow does not exist.

  • UnprocessableEntityException – The server was unable to process the request.

  • InternalServerErrorException – The server failed to process the request.

execute_workflow(workflow_id, body=None)

Start a workflow run.

REFERENCE:

🔗 API Documentation

Parameters:
Returns:

Execution ID of the run.

Return type:

Dict[str, str]

SCHEMA:
{
    "params": {
        "global": {
            "key (string)": "value (string)"
        },
        "stepWise": {
            "stepName (string)": {
                "key (string)": "value (string)"
            }
        }
    },
    "stepsToRun": [
        {
            "stepName": "string",
            "runDAG": "boolean"
        }
    ],
}
Raises:
  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • NotFoundException – The given workflow does not exist.

  • ConflictException – The workflow is in a conflicting state and new run cannot be started.

  • InternalServerErrorException – The server failed to process the request.

Parameters:
  • self (Workflow) –

  • workflow_id (int) –

  • body (Dict[str, Any] | None) –

Return type:

Dict[str, str]

get_default_resource()

Default resource values that will be used in case resource key is not provided for the workflows.

REFERENCE:

🔗 API Documentation

Returns:

Default resource values

Return type:

Dict[str, Any]

Raises:
  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • InternalServerErrorException – The server failed to process the request.

Parameters:

self (Workflow) –

get_execution_details(workflow_id, execution_id)

Get details about an execution.

REFERENCE:

🔗 API Documentation

Parameters:
  • workflow_id (int) – ID of the workflow to which the execution belongs.

  • execution_id (str) – ID of the execution to get details for.

  • self (Workflow) –

Returns:

A dictionary containing the logs for the given execution.

Return type:

Dict[str, Any]

Raises:
  • BadRequestException – BadRequestException: The given request parameters are invalid.

  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • NotFoundException – The given workflow does not exist.

  • InternalServerErrorException – The server failed to process the request.

get_execution_logs(workflow_id, execution_id, step_name, next_token=None, save=False, file_name=None)

Get workflow execution logs.

REFERENCE:

🔗 API Documentation

Parameters:
  • workflow_id (int) – ID of the workflow to fetch executions.

  • execution_id (str) – ID of the execution to fetch logs.

  • step_name (str) – Name of the step to fetch logs.

  • next_token (str | None) – The token to retrieve the next set of logs. Defaults to None.

  • save (bool) – Whether to save the logs to a file. Defaults to False.

  • file_name (str | None) – File name or path where the contents should be saved. Default file name is workflow_execution_logs_<workflow_id>_<execution_id>_<step_name>.log.

  • self (Workflow) –

Returns:

A dictionary containing the logs for the given execution.

Return type:

Dict[str, Any]

Raises:
  • BadRequestException – BadRequestException: The given request parameters are invalid.

  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • NotFoundException – The given workflow does not exist.

  • InternalServerErrorException – The server failed to process the request.

list_executions(workflow_id: int, date_from: str | None = None, date_to: str | None = None, page_size: int | None = None, page_number: int | None = None, *, return_iterator: Literal[False]) Dict[str, Any]
list_executions(workflow_id: int, date_from: str | None = None, date_to: str | None = None, page_size: int | None = None, page_number: int | None = None, *, return_iterator: Literal[True] = True) Iterator[Dict[str, Any]]

Lists executions for the given workflow.

REFERENCE:

🔗 API Documentation

Parameters:
  • workflow_id (int) – ID of the workflow to fetch executions.

  • date_from (str | None) – The date after which the executions should be included (in ISO format). Defaults to None

  • date_to (str | None) – The date till which the executions should be included (in ISO format). Defaults to None

  • page_size (int | None) – Number of executions per page.

  • page_number (int | None) – Page number to fetch. Only used when return_iterator is False.

  • return_iterator (bool) – Whether to return an iterator object or list of executions for a specified page number, defaults to True.

Returns:

An iterator object which returns an element per iteration, until there are no more elements to return. If return_iterator is set to False, a dictionary containing the list and pagination details is returned instead.

Set return_iterator to True if you want automatic client-side pagination, or False if you want server-side pagination.

Return type:

Iterator[Dict[str, Any]] | Dict[str, Any]

Raises:
  • BadRequestException – BadRequestException: The given request parameters are invalid.

  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • NotFoundException – The given workflow does not exist.

  • InternalServerErrorException – The server failed to process the request.

  • StopIteration – There are no more pages to list

list_resources()

Lists all available resources for the workflows.

REFERENCE:

🔗 API Documentation

Returns:

A dictionary containing the list of available resources.

Return type:

Dict[str, Any]

Raises:
  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • NotFoundException – The given workflow does not exist.

  • ConflictException – The workflow is in a conflicting state and new run cannot be started.

  • InternalServerErrorException – The server failed to process the request.

Parameters:

self (Workflow) –

list_workflows(workflow_status: List[str] | None = None, last_execution_status: List[str] | None = None, last_modified_by: List[str] | None = None, page_size: int | None = None, page_number: int | None = None, name: str | None = None, *, return_iterator: Literal[False]) Dict[str, Any]
list_workflows(workflow_status: List[str] | None = None, last_execution_status: List[str] | None = None, last_modified_by: List[str] | None = None, page_size: int | None = None, page_number: int | None = None, name: str | None = None, *, return_iterator: Literal[True] = True) Iterator[Dict[str, Any]]

Retrieve the list of workflows.

REFERENCE:

🔗 API Documentation

Parameters:
  • workflow_status (List[str] | None) – List of status to filter workflow. Default is None. Valid values are Draft, Running, Available, Paused.

  • last_execution_status (str | None) – The last execution status of the workflow. Default is None. Valid values are Success, Running, Stopped, Stopping, Failed.

  • last_modified_by (str | None) – The user who last modified the workflow. Default is None.

  • page_size (int | None) – The number of workflows per page.

  • page_number (int | None) – The page number to retrieve. Only used when return_iterator is False.

  • name (str | None) – Search workflows by name.

  • return_iterator (bool) – Whether to return an iterator object or list of workflows for a specified page number, defaults to True.

Returns:

An iterator object which returns an element per iteration, until there are no more elements to return. If return_iterator is set to False, a dictionary containing the list and pagination details is returned instead.

Set return_iterator to True if you want automatic client-side pagination, or False if you want server-side pagination.

Return type:

Iterator[Dict[str, Any]] | Dict[str, Any]

Raises:
  • BadRequestException – The given request parameters are invalid.

  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • InternalServerErrorException – The server failed to process the request.

  • StopIteration – There are no more pages to list

patch_workflow(workflow_id, body=None, name=None, repository=None, branch=None, token=None, command=None, image_id=None, image_version_id=None, instance_type_id=None, storage=None, step_timeout=None, clear_image_cache=None, step_names=None)

Update an existing workflow. Workflows with only standard steps are supported.

  • This function allows to efficiently modify trigger details, watchers, workflow name, and specific step attributes such as repository URL, branch, token, image ID, version ID etc.

  • By specifying step_names, we can globally update specified steps with provided parameters, streamlining the update process. If step_names is not provided, all the steps for that workflow would be updated.

  • Alternatively, we can utilize the body parameter to selectively modify individual step attributes across different steps. With this, we can also add new steps to the workflow by providing the parameters required by the step.

  • If both body and specific parameters are used, the latter takes precedence.

Parameters:
  • workflow_id (int) – The ID of the workflow to patch.

  • body (Dict[str, Any] | None) – A dictionary containing the updated workflow details.

  • name (str | None) – The name of the workflow.

  • repository (str | None) – URL of the repository containing the required files.

  • branch (str | None) – The branch of the repository to use.

  • token (str | None) – The token to be used to access the repository.

  • command (str | None) – The command to run when workflow step is executed.

  • image_id (int | None) – The ID of the image to use for the workflow step.

  • image_version_id (int | None) – The ID of the image version to use for the workflow step.

  • instance_type_id (int | None) – The ID of the instance type to use for the workflow step.

  • storage (str | None) – The storage to use for the workflow step in GB. For example, “10GB”.

  • step_timeout (int | None) – Time after which the step timeouts.

  • clear_image_cache (boolean | None) – Whether to clear image cache on workflow execution.

  • step_names (List[str] | None) – The workflow steps to update. If not provided, all steps will be updated.

  • self (Workflow) –

Return type:

Dict[str, Any]

SCHEMA:
{
    "name": "string",
    "triggers": [
        {
            "cron": "string"
        },
        {
            "webhook": "boolean",
            "webhookPolicy": "string"
        }
    ],
    "watchers": [
        {
            "events": {
                "success": "boolean",
                "fail": "boolean",
                "runtimeExceeded": "number"
            },
            "user": "string",
            "webhook": {
                "name": "string(required)",
                "url": "string(required)",
                "payload": "string(required)",
            }
        }
    ],
    "retryOptions": {
        "duration": "number",
        "exitCodes": [],
        "exponentialBackoff": "boolean",
        "numberOfRetries": "number"
    },
    "tags": [
        {
            "name": "string"
        },
        {
            "name": "string"
        }
    ],
    "steps": {
        "stepName": {
            "type": "string",
            "imageId": "number",
            "imageVersionId": "number",
            "command": "string",
            "resources": {
                "instanceTypeId": "number",
                "storage": "string"
            },
            "parents": [],
            "stepTimeout": "number",
            "clearImageCache": "boolean",
            "repository": {
                "branch": "string",
                "token": "string",
                "url": "string"
            },
            "runConfiguration": {
                "retryOptions": {
                    "duration": "number",
                    "exitCodes": [],
                    "exponentialBackoff": "boolean",
                    "numberOfRetries": "number"
                },
                "skipConfiguration": {
                    "skip": "boolean",
                    "skipDAG": "boolean",
                }
            },
            "parameters": {
                "env": {
                    "key (string)": "value (string)"
                },
                "secrets": [],
            },
            "outputParameters": {
                "key (string)": "value (string)"
            },
            "executionParameters": {
                "conditional": [
                    {
                        "condition": "string",
                        "paramName": "string",
                        "stepName": "string",
                        "value": "string"
                    }
                ],
                "parentStatus": [
                    {
                        "condition": "string",
                        "parents": [],
                        "status": []
                    }
                ]
            }
        }
    }
}
Returns:

A dictionary containing the workflow ID.

Return type:

Dict[str, Any]

Raises:
  • InvalidParameterException – The given request parameters are invalid.

  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • NotFoundException – The given workflow does not exist.

  • ConflictException – The workflow is in a conflicting state while deleting.

  • InternalServerErrorException – The server failed to process the request.

Parameters:
  • self (Workflow) –

  • workflow_id (int) –

  • body (Dict[str, Any] | None) –

  • name (str | None) –

  • repository (str | None) –

  • branch (str | None) –

  • token (str | None) –

  • command (str | None) –

  • image_id (int | None) –

  • image_version_id (int | None) –

  • instance_type_id (int | None) –

  • storage (str | None) –

  • step_timeout (int | None) –

  • clear_image_cache (bool | None) –

  • step_names (List[str] | None) –

update_workflow(workflow_id, body)

Update an existing workflow. Workflows with only standard steps are supported.

REFERENCE:

🔗 API Documentation

Parameters:
  • workflow_id (int) – The ID of the workflow to update.

  • body (dict) – A dictionary containing the updated workflow details. Schema can be found below.

  • self (Workflow) –

Returns:

Id of the updated workflow.

Return type:

Dict[str, int]

SCHEMA:
{
    "name": "string",
    "triggers": [
        {
            "cron": "string"
        },
        {
            "webhook": "boolean",
            "webhookPolicy": "string"
        }
    ],
    "watchers": [
        {
            "events": {
                "success": "boolean",
                "fail": "boolean",
                "runtimeExceeded": "number"
            },
            "user": "string",
            "webhook": {
                "name": "string(required)",
                "url": "string(required)",
                "payload": "string(required)",
            }
        }
    ],
    "retryOptions": {
        "duration": "number",
        "exitCodes": [],
        "exponentialBackoff": "boolean",
        "numberOfRetries": "number"
    },
    "tags": [
        {
            "name": "string"
        },
        {
            "name": "string"
        }
    ],
    "steps": {
        "stepName": {
            "type": "string",
            "imageId": "number",
            "imageVersionId": "number",
            "command": "string",
            "resources": {
                "instanceTypeId": "number",
                "storage": "string"
            },
            "parents": [],
            "stepTimeout": "number",
            "clearImageCache": "boolean",
            "repository": {
                "branch": "string",
                "token": "string",
                "url": "string"
            },
            "parameters": {
                "env": {
                    "key (string)": "value (string)"
                },
                "secrets": [],
            },
            "outputParameters": {
                "key (string)": "value (string)"
            },
            "executionParameters": {
                "conditional": [
                    {
                        "condition": "string",
                        "paramName": "string",
                        "stepName": "string",
                        "value": "string"
                    }
                ],
                "parentStatus": [
                    {
                        "condition": "string",
                        "parents": [],
                        "status": []
                    }
                ]
            },
            "runConfiguration": {
                "retryOptions": {
                    "duration": "number",
                    "exitCodes": [],
                    "exponentialBackoff": "boolean",
                    "numberOfRetries": "number"
                },
                "skipConfiguration": {
                    "skip": "boolean",
                    "skipDAG": "boolean",
                }
            }
        }
    }
}
Raises:
  • BadRequestException – The given request parameters are invalid.

  • UnauthorizedException – The credentials are invalid.

  • ForbiddenException – The user does not have permission to perform the operation.

  • NotFoundException – The given workflow does not exist.

  • ConflictException – The workflow is in a conflicting state while deleting.

  • InternalServerErrorException – The server failed to process the request.

Parameters:
  • self (Workflow) –

  • workflow_id (int) –

  • body (Dict[str, Any]) –

Return type:

Dict[str, Any]