Workflow
Workflow client module.
- peak.resources.workflows.get_client(session=None)
Returns a Workflow client, If no session is provided, a default session is used.
Workflow Client
- class peak.resources.workflows.Workflow(session=None)
Client class for interacting with workflows resource.
- Parameters:
session (Session) –
- create_or_update_workflow(body)
Creates a new workflow or updates an existing workflow based on workflow name.
- REFERENCE:
- Parameters:
body (Dict[str, Any]) – A dictionary containing the workflow config. Schema can be found below.
self (Workflow) –
- Returns:
Id of the newly created or updated workflow.
- Return type:
Dict[str, int]
- SCHEMA:
{ "name": "string(required)", "triggers": [ { "cron": "string" }, { "webhook": "boolean", "webhookPolicy": "string" } ], "watchers": [ { "events": { "success": "boolean", "fail": "boolean", "runtimeExceeded": "number" }, "user": "string", "webhook": { "name": "string(required)", "url": "string(required)", "payload": "string(required)", } } ], "retryOptions": { "duration": "number", "exitCodes": [], "exponentialBackoff": "boolean", "numberOfRetries": "number" }, "tags": [ { "name": "string" }, { "name": "string" } ], "steps": { "stepName": { "type": "string", "imageId": "number", "imageVersionId": "number", "command": "string", "resources": { "instanceTypeId": "number", "storage": "string" }, "parents": [], "stepTimeout": "number", "clearImageCache": "boolean", "repository": { "branch": "string", "token": "string", "url": "string" }, "parameters": { "env": { "key (string)": "value (string)" }, "secrets": [], }, "outputParameters": { "key (string)": "value (string)" }, "executionParameters": { "conditional": [ { "condition": "string", "paramName": "string", "stepName": "string", "value": "string" } ], "parentStatus": [ { "condition": "string", "parents": [], "status": [] } ] }, "runConfiguration": { "retryOptions": { "duration": "number", "exitCodes": [], "exponentialBackoff": "boolean", "numberOfRetries": "number" }, "skipConfiguration": { "skip": "boolean", "skipDAG": "boolean", } } } } }
- Raises:
BadRequestException – The given request parameters are invalid.
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
InternalServerErrorException – The server failed to process the request.
- Parameters:
self (Workflow) –
body (Dict[str, Any]) –
- Return type:
Dict[str, int]
- create_workflow(body)
Create a new workflow. Workflows with only standard steps are supported.
- REFERENCE:
- Parameters:
body (Dict[str, Any]) – A dictionary containing the workflow config. Schema can be found below.
self (Workflow) –
- Returns:
Id of the newly created workflow.
- Return type:
Dict[str, int]
- SCHEMA:
{ "name": "string(required)", "triggers": [ { "cron": "string" }, { "webhook": "boolean", "webhookPolicy": "string" } ], "watchers": [ { "events": { "success": "boolean", "fail": "boolean", "runtimeExceeded": "number" }, "user": "string", "webhook": { "name": "string(required)", "url": "string(required)", "payload": "string(required)", } } ], "retryOptions": { "duration": "number", "exitCodes": [], "exponentialBackoff": "boolean", "numberOfRetries": "number" }, "tags": [ { "name": "string" }, { "name": "string" } ], "steps": { "stepName": { "type": "string", "imageId": "number", "imageVersionId": "number", "command": "string", "resources": { "instanceTypeId": "number", "storage": "string" }, "parents": [], "stepTimeout": "number", "clearImageCache": "boolean", "repository": { "branch": "string", "token": "string", "url": "string" }, "parameters": { "env": { "key (string)": "value (string)" }, "secrets": [], }, "outputParameters": { "key (string)": "value (string)" }, "executionParameters": { "conditional": [ { "condition": "string", "paramName": "string", "stepName": "string", "value": "string" } ], "parentStatus": [ { "condition": "string", "parents": [], "status": [] } ] }, "runConfiguration": { "retryOptions": { "duration": "number", "exitCodes": [], "exponentialBackoff": "boolean", "numberOfRetries": "number" }, "skipConfiguration": { "skip": "boolean", "skipDAG": "boolean", } } } } }
- Raises:
BadRequestException – The given request parameters are invalid.
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
InternalServerErrorException – The server failed to process the request.
- Parameters:
self (Workflow) –
body (Dict[str, Any]) –
- Return type:
Dict[str, int]
- delete_workflow(workflow_id)
Delete a workflow.
- REFERENCE:
- Parameters:
workflow_id (int) – The ID of the workflow to delete.
self (Workflow) –
- Returns:
Empty dictionary object.
- Return type:
dict
- Raises:
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
NotFoundException – The given workflow does not exist.
ConflictException – If the workflow is in a conflicting state while deleting.
InternalServerErrorException – The server failed to process the request.
- describe_workflow(workflow_id)
Retrieve details of a specific workflow. Workflows with only standard steps can be described.
- REFERENCE:
- Parameters:
workflow_id (int) – The ID of the workflow to retrieve.
self (Workflow) –
- Returns:
A dictionary containing the details of the workflow.
- Return type:
Dict[str, Any]
- Raises:
BadRequestException – The given request parameters are invalid.
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
NotFoundException – The given workflow does not exist.
UnprocessableEntityException – The server was unable to process the request.
InternalServerErrorException – The server failed to process the request.
- execute_workflow(workflow_id, body=None)
Start a workflow run.
- REFERENCE:
- Parameters:
workflow_id (int) – ID of the workflow to delete.
body (Dict[str, Any] | None) – (Dict[str, Any]): The parameters to be passed while running the workflow. More details can be found in the API doc - https://service.peak.ai/workflows/api-docs/index.htm#/Workflows/execute-workflow
self (Workflow) –
- Returns:
Execution ID of the run.
- Return type:
Dict[str, str]
- SCHEMA:
{ "params": { "global": { "key (string)": "value (string)" }, "stepWise": { "stepName (string)": { "key (string)": "value (string)" } } }, "stepsToRun": [ { "stepName": "string", "runDAG": "boolean" } ], }
- Raises:
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
NotFoundException – The given workflow does not exist.
ConflictException – The workflow is in a conflicting state and new run cannot be started.
InternalServerErrorException – The server failed to process the request.
- Parameters:
self (Workflow) –
workflow_id (int) –
body (Dict[str, Any] | None) –
- Return type:
Dict[str, str]
- get_default_resource()
Default resource values that will be used in case resource key is not provided for the workflows.
- REFERENCE:
- Returns:
Default resource values
- Return type:
Dict[str, Any]
- Raises:
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
InternalServerErrorException – The server failed to process the request.
- Parameters:
self (Workflow) –
- get_execution_details(workflow_id, execution_id)
Get details about an execution.
- REFERENCE:
- Parameters:
workflow_id (int) – ID of the workflow to which the execution belongs.
execution_id (str) – ID of the execution to get details for.
self (Workflow) –
- Returns:
A dictionary containing the logs for the given execution.
- Return type:
Dict[str, Any]
- Raises:
BadRequestException – BadRequestException: The given request parameters are invalid.
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
NotFoundException – The given workflow does not exist.
InternalServerErrorException – The server failed to process the request.
- get_execution_logs(workflow_id, execution_id, step_name, next_token=None, save=False, file_name=None)
Get workflow execution logs.
- REFERENCE:
- Parameters:
workflow_id (int) – ID of the workflow to fetch executions.
execution_id (str) – ID of the execution to fetch logs.
step_name (str) – Name of the step to fetch logs.
next_token (str | None) – The token to retrieve the next set of logs. Defaults to None.
save (bool) – Whether to save the logs to a file. Defaults to False.
file_name (str | None) – File name or path where the contents should be saved. Default file name is workflow_execution_logs_<workflow_id>_<execution_id>_<step_name>.log.
self (Workflow) –
- Returns:
A dictionary containing the logs for the given execution.
- Return type:
Dict[str, Any]
- Raises:
BadRequestException – BadRequestException: The given request parameters are invalid.
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
NotFoundException – The given workflow does not exist.
InternalServerErrorException – The server failed to process the request.
- list_executions(workflow_id: int, date_from: str | None = None, date_to: str | None = None, page_size: int | None = None, page_number: int | None = None, *, return_iterator: Literal[False]) Dict[str, Any]
- list_executions(workflow_id: int, date_from: str | None = None, date_to: str | None = None, page_size: int | None = None, page_number: int | None = None, *, return_iterator: Literal[True] = True) Iterator[Dict[str, Any]]
Lists executions for the given workflow.
- REFERENCE:
- Parameters:
workflow_id (int) – ID of the workflow to fetch executions.
date_from (str | None) – The date after which the executions should be included (in ISO format). Defaults to None
date_to (str | None) – The date till which the executions should be included (in ISO format). Defaults to None
page_size (int | None) – Number of executions per page.
page_number (int | None) – Page number to fetch. Only used when return_iterator is False.
return_iterator (bool) – Whether to return an iterator object or list of executions for a specified page number, defaults to True.
- Returns:
An iterator object which returns an element per iteration, until there are no more elements to return. If return_iterator is set to False, a dictionary containing the list and pagination details is returned instead.
Set return_iterator to True if you want automatic client-side pagination, or False if you want server-side pagination.
- Return type:
Iterator[Dict[str, Any]] | Dict[str, Any]
- Raises:
BadRequestException – BadRequestException: The given request parameters are invalid.
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
NotFoundException – The given workflow does not exist.
InternalServerErrorException – The server failed to process the request.
StopIteration – There are no more pages to list
- list_resources()
Lists all available resources for the workflows.
- REFERENCE:
- Returns:
A dictionary containing the list of available resources.
- Return type:
Dict[str, Any]
- Raises:
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
NotFoundException – The given workflow does not exist.
ConflictException – The workflow is in a conflicting state and new run cannot be started.
InternalServerErrorException – The server failed to process the request.
- Parameters:
self (Workflow) –
- list_workflows(workflow_status: List[str] | None = None, last_execution_status: List[str] | None = None, last_modified_by: List[str] | None = None, page_size: int | None = None, page_number: int | None = None, name: str | None = None, *, return_iterator: Literal[False]) Dict[str, Any]
- list_workflows(workflow_status: List[str] | None = None, last_execution_status: List[str] | None = None, last_modified_by: List[str] | None = None, page_size: int | None = None, page_number: int | None = None, name: str | None = None, *, return_iterator: Literal[True] = True) Iterator[Dict[str, Any]]
Retrieve the list of workflows.
- REFERENCE:
- Parameters:
workflow_status (List[str] | None) – List of status to filter workflow. Default is None. Valid values are Draft, Running, Available, Paused.
last_execution_status (str | None) – The last execution status of the workflow. Default is None. Valid values are Success, Running, Stopped, Stopping, Failed.
last_modified_by (str | None) – The user who last modified the workflow. Default is None.
page_size (int | None) – The number of workflows per page.
page_number (int | None) – The page number to retrieve. Only used when return_iterator is False.
name (str | None) – Search workflows by name.
return_iterator (bool) – Whether to return an iterator object or list of workflows for a specified page number, defaults to True.
- Returns:
An iterator object which returns an element per iteration, until there are no more elements to return. If return_iterator is set to False, a dictionary containing the list and pagination details is returned instead.
Set return_iterator to True if you want automatic client-side pagination, or False if you want server-side pagination.
- Return type:
Iterator[Dict[str, Any]] | Dict[str, Any]
- Raises:
BadRequestException – The given request parameters are invalid.
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
InternalServerErrorException – The server failed to process the request.
StopIteration – There are no more pages to list
- patch_workflow(workflow_id, body=None, name=None, repository=None, branch=None, token=None, command=None, image_id=None, image_version_id=None, instance_type_id=None, storage=None, step_timeout=None, clear_image_cache=None, step_names=None)
Update an existing workflow. Workflows with only standard steps are supported.
This function allows to efficiently modify trigger details, watchers, workflow name, and specific step attributes such as repository URL, branch, token, image ID, version ID etc.
By specifying step_names, we can globally update specified steps with provided parameters, streamlining the update process. If step_names is not provided, all the steps for that workflow would be updated.
Alternatively, we can utilize the body parameter to selectively modify individual step attributes across different steps. With this, we can also add new steps to the workflow by providing the parameters required by the step.
If both body and specific parameters are used, the latter takes precedence.
- Parameters:
workflow_id (int) – The ID of the workflow to patch.
body (Dict[str, Any] | None) – A dictionary containing the updated workflow details.
name (str | None) – The name of the workflow.
repository (str | None) – URL of the repository containing the required files.
branch (str | None) – The branch of the repository to use.
token (str | None) – The token to be used to access the repository.
command (str | None) – The command to run when workflow step is executed.
image_id (int | None) – The ID of the image to use for the workflow step.
image_version_id (int | None) – The ID of the image version to use for the workflow step.
instance_type_id (int | None) – The ID of the instance type to use for the workflow step.
storage (str | None) – The storage to use for the workflow step in GB. For example, “10GB”.
step_timeout (int | None) – Time after which the step timeouts.
clear_image_cache (boolean | None) – Whether to clear image cache on workflow execution.
step_names (List[str] | None) – The workflow steps to update. If not provided, all steps will be updated.
self (Workflow) –
- Return type:
Dict[str, Any]
- SCHEMA:
{ "name": "string", "triggers": [ { "cron": "string" }, { "webhook": "boolean", "webhookPolicy": "string" } ], "watchers": [ { "events": { "success": "boolean", "fail": "boolean", "runtimeExceeded": "number" }, "user": "string", "webhook": { "name": "string(required)", "url": "string(required)", "payload": "string(required)", } } ], "retryOptions": { "duration": "number", "exitCodes": [], "exponentialBackoff": "boolean", "numberOfRetries": "number" }, "tags": [ { "name": "string" }, { "name": "string" } ], "steps": { "stepName": { "type": "string", "imageId": "number", "imageVersionId": "number", "command": "string", "resources": { "instanceTypeId": "number", "storage": "string" }, "parents": [], "stepTimeout": "number", "clearImageCache": "boolean", "repository": { "branch": "string", "token": "string", "url": "string" }, "runConfiguration": { "retryOptions": { "duration": "number", "exitCodes": [], "exponentialBackoff": "boolean", "numberOfRetries": "number" }, "skipConfiguration": { "skip": "boolean", "skipDAG": "boolean", } }, "parameters": { "env": { "key (string)": "value (string)" }, "secrets": [], }, "outputParameters": { "key (string)": "value (string)" }, "executionParameters": { "conditional": [ { "condition": "string", "paramName": "string", "stepName": "string", "value": "string" } ], "parentStatus": [ { "condition": "string", "parents": [], "status": [] } ] } } } }
- Returns:
A dictionary containing the workflow ID.
- Return type:
Dict[str, Any]
- Raises:
InvalidParameterException – The given request parameters are invalid.
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
NotFoundException – The given workflow does not exist.
ConflictException – The workflow is in a conflicting state while deleting.
InternalServerErrorException – The server failed to process the request.
- Parameters:
self (Workflow) –
workflow_id (int) –
body (Dict[str, Any] | None) –
name (str | None) –
repository (str | None) –
branch (str | None) –
token (str | None) –
command (str | None) –
image_id (int | None) –
image_version_id (int | None) –
instance_type_id (int | None) –
storage (str | None) –
step_timeout (int | None) –
clear_image_cache (bool | None) –
step_names (List[str] | None) –
- update_workflow(workflow_id, body)
Update an existing workflow. Workflows with only standard steps are supported.
- REFERENCE:
- Parameters:
workflow_id (int) – The ID of the workflow to update.
body (dict) – A dictionary containing the updated workflow details. Schema can be found below.
self (Workflow) –
- Returns:
Id of the updated workflow.
- Return type:
Dict[str, int]
- SCHEMA:
{ "name": "string", "triggers": [ { "cron": "string" }, { "webhook": "boolean", "webhookPolicy": "string" } ], "watchers": [ { "events": { "success": "boolean", "fail": "boolean", "runtimeExceeded": "number" }, "user": "string", "webhook": { "name": "string(required)", "url": "string(required)", "payload": "string(required)", } } ], "retryOptions": { "duration": "number", "exitCodes": [], "exponentialBackoff": "boolean", "numberOfRetries": "number" }, "tags": [ { "name": "string" }, { "name": "string" } ], "steps": { "stepName": { "type": "string", "imageId": "number", "imageVersionId": "number", "command": "string", "resources": { "instanceTypeId": "number", "storage": "string" }, "parents": [], "stepTimeout": "number", "clearImageCache": "boolean", "repository": { "branch": "string", "token": "string", "url": "string" }, "parameters": { "env": { "key (string)": "value (string)" }, "secrets": [], }, "outputParameters": { "key (string)": "value (string)" }, "executionParameters": { "conditional": [ { "condition": "string", "paramName": "string", "stepName": "string", "value": "string" } ], "parentStatus": [ { "condition": "string", "parents": [], "status": [] } ] }, "runConfiguration": { "retryOptions": { "duration": "number", "exitCodes": [], "exponentialBackoff": "boolean", "numberOfRetries": "number" }, "skipConfiguration": { "skip": "boolean", "skipDAG": "boolean", } } } } }
- Raises:
BadRequestException – The given request parameters are invalid.
UnauthorizedException – The credentials are invalid.
ForbiddenException – The user does not have permission to perform the operation.
NotFoundException – The given workflow does not exist.
ConflictException – The workflow is in a conflicting state while deleting.
InternalServerErrorException – The server failed to process the request.
- Parameters:
self (Workflow) –
workflow_id (int) –
body (Dict[str, Any]) –
- Return type:
Dict[str, Any]