Parallel Workflow Execution
Workflows can be configured to support parallel execution, allowing multiple workflow runs to execute simultaneously.
This is particularly useful when processing multiple batches of data in parallel, improving overall throughput and reducing total processing time.
When executing workflows in parallel, it’s important to handle queue full scenarios (HTTP 429) appropriately.
Before we look into examples, let’s see how to configure a workflow for parallel execution and handle parallel execution scenarios.
Parallel Execution Configuration
To enable parallel execution for a workflow, you need to set the allowConcurrentExecution metadata flag to true:
body:
name: Parallel Data Processing Pipeline
metadata:
allowConcurrentExecution: true
triggers:
- webhook: true
steps:
data_processing:
type: standard
imageId: 1
imageVersionId: 1
command: "python process_data.py"
resources:
instanceTypeId: 1
storage: 10GB
parents: []
parameters:
env:
BATCH_SIZE: "1000"
PROCESSING_TIMEOUT: "3600"
Creating Parallel Workflows via Press
When creating a workflow block spec using Press, you can enable parallel execution by setting the allowConcurrentExecution metadata flag to true in the config.metadata section of the block spec.
Here’s an example of how to create a parallel workflow block spec using the Peak SDK:
from collections.abc import Iterator
from typing import Any
from peak.press import blocks
client: blocks.Block = blocks.get_client()
spec: dict[str, Any] = {
"version": "1",
"kind": "workflow",
"metadata": {
"name": "workflow-parallel-execution",
"title": "Workflow Parallel Execution",
"summary": "A workflow block that supports parallel execution",
"description": "Workflow to test parallel execution capabilities",
"descriptionContentType": "text/markdown",
"imageUrl": "https://my-block-pics.com/image-0.jpg",
"tags": [
{
"name": "parallel",
},
],
},
"release": {
"version": "1.0.0",
"notes": "This is the original release",
},
"config": {
"metadata": {
"allowConcurrentExecution": True,
},
"steps": {
"step-1": {
"type": "standard",
"resources": {
"instanceTypeId": 21,
"storage": "10GB",
},
"imageDetails": {
"id": 2942,
},
"command": "echo Done!",
},
},
},
}
workflow_spec: dict[str, str] = client.create_spec(
body=spec,
featured=True,
scope="private",
)
print(f"Created workflow block spec with ID: {workflow_spec['id']}")
Key points:
The
allowConcurrentExecutionflag must be set inconfig.metadata, not in the top-levelmetadatasection.The top-level
metadatasection contains block spec metadata (name, title, description, etc.).The
config.metadatasection contains workflow-specific metadata, including the parallel execution flag.
Executing Parallel Workflow Runs
Once a workflow is configured for parallel execution, you can execute multiple runs simultaneously. Each run can have its own set of parameters, allowing you to process different batches of data in parallel.
Basic Parallel Execution
Here’s an example of how to execute multiple workflow runs in parallel using the Peak SDK:
from peak.resources.workflows import get_client
from peak.exceptions import BadRequestException
def execute_parallel_runs(workflow_id, batch_data):
"""Execute multiple parallel workflow runs."""
workflow_client = get_client()
execution_ids = []
for i, batch in enumerate(batch_data):
try:
# Execute workflow with batch-specific parameters
execution = workflow_client.execute_workflow(
workflow_id=workflow_id,
body={
"params": {
"global": {
"batch_id": batch["id"],
"input_path": batch["input_path"],
"output_path": batch["output_path"],
}
}
},
)
execution_ids.append(execution["executionId"])
print(f"Started execution {i+1}: {execution['executionId']}")
except BadRequestException as e:
print(f"Failed to start execution {i+1}: {str(e)}")
# Handle queue full scenarios
if "429" in str(e) or "queue" in str(e).lower():
print(
"Queue is full - SDK will automatically retry. If retries fail, retry later."
)
else:
print(f"Other error occurred: {str(e)}")
return execution_ids
Handling Queue Full Scenarios
When executing workflows in parallel, you may encounter HTTP 429 (Too Many Requests) errors if the queue is full.
HTTP 200: The workflow was accepted and submitted. You receive an
executionIdin the response.HTTP 429: The workflow was not accepted because the queue is full. No workflow execution was created.
The SDK automatically retries 429 errors with exponential backoff. This is safe because a 429 response means the workflow wasn’t accepted, so retrying won’t create duplicate executions.
If the SDK’s automatic retries fail, you can implement additional retry logic in your application based on your workflow’s expected duration (e.g., short-running vs. long-running workflows).
Here’s an example of proper error handling:
from peak.resources.workflows import get_client
from peak.exceptions import BadRequestException
workflow_client = get_client()
try:
execution = workflow_client.execute_workflow(
workflow_id=workflow_id,
body={
"params": {
"global": {
"batch_id": "batch-001",
"input_path": "/data/batch-001",
"output_path": "/output/batch-001",
}
}
},
)
print(f"Started execution: {execution['executionId']}")
except BadRequestException as e:
if "429" in str(e) or "queue" in str(e).lower():
print(
"Queue is full - SDK will automatically retry. If retries fail, retry later."
)
else:
print(f"Other error occurred: {str(e)}")
Complete Example
Here’s a complete example demonstrating parallel workflow execution:
from peak.resources.workflows import get_client
from peak.exceptions import BadRequestException
def create_parallel_workflow():
"""Create a workflow that supports parallel execution."""
workflow_client = get_client()
# Define a workflow with parallel execution metadata
workflow_config = {
"name": "Parallel Data Processing Pipeline",
"metadata": {"allowConcurrentExecution": True},
"triggers": [{"webhook": True}],
"steps": {
"data_processing": {
"type": "standard",
"imageId": 1,
"imageVersionId": 1,
"command": "python process_data.py",
"resources": {"instanceTypeId": 1, "storage": "10GB"},
"parents": [],
"parameters": {
"env": {"BATCH_SIZE": "1000", "PROCESSING_TIMEOUT": "3600"}
},
}
},
}
# Create the workflow
workflow = workflow_client.create_workflow(workflow_config)
print(f"Created workflow with ID: {workflow['id']}")
return workflow["id"]
def execute_parallel_runs(workflow_id, batch_data):
"""Execute multiple parallel workflow runs."""
workflow_client = get_client()
execution_ids = []
for i, batch in enumerate(batch_data):
try:
# Execute workflow with batch-specific parameters
execution = workflow_client.execute_workflow(
workflow_id=workflow_id,
body={
"params": {
"global": {
"batch_id": batch["id"],
"input_path": batch["input_path"],
"output_path": batch["output_path"],
}
}
},
)
execution_ids.append(execution["executionId"])
print(f"Started execution {i+1}: {execution['executionId']}")
except BadRequestException as e:
print(f"Failed to start execution {i+1}: {str(e)}")
# Handle queue full scenarios
if "429" in str(e) or "queue" in str(e).lower():
print(
"Queue is full - SDK will automatically retry. If retries fail, retry later."
)
else:
print(f"Other error occurred: {str(e)}")
return execution_ids
def monitor_executions(workflow_id, execution_ids):
"""Monitor the status of workflow executions."""
workflow_client = get_client()
print("\nMonitoring executions...")
for execution_id in execution_ids:
try:
# Get execution details
execution_details = workflow_client.get_execution_details(
workflow_id, execution_id
)
if execution_details:
status = execution_details.get("status", "Unknown")
print(f"Execution {execution_id}: {status}")
except Exception as e:
print(f"Error monitoring execution {execution_id}: {str(e)}")
# Main execution
if __name__ == "__main__":
print("=== Peak SDK Parallel Workflow Example ===\n")
# Step 1: Create a parallel workflow
print("1. Creating parallel workflow...")
workflow_id = create_parallel_workflow()
# Step 2: Prepare batch data for parallel processing
batch_data = [
{
"id": "batch-001",
"input_path": "/data/batch-001",
"output_path": "/output/batch-001",
},
{
"id": "batch-002",
"input_path": "/data/batch-002",
"output_path": "/output/batch-002",
},
{
"id": "batch-003",
"input_path": "/data/batch-003",
"output_path": "/output/batch-003",
},
{
"id": "batch-004",
"input_path": "/data/batch-004",
"output_path": "/output/batch-004",
},
{
"id": "batch-005",
"input_path": "/data/batch-005",
"output_path": "/output/batch-005",
},
]
# Step 3: Execute parallel runs
print(f"\n2. Executing {len(batch_data)} parallel workflow runs...")
execution_ids = execute_parallel_runs(workflow_id, batch_data)
# Step 4: Monitor executions
print(f"\n3. Monitoring {len(execution_ids)} executions...")
monitor_executions(workflow_id, execution_ids)
print(f"\n=== Example completed ===")
print(f"Workflow ID: {workflow_id}")
print(f"Execution IDs: {execution_ids}")
Important Considerations
Resource Management: Ensure your infrastructure can handle the concurrent load when executing workflows in parallel.
Rate Limiting: The SDK automatically retries HTTP 429 errors with exponential backoff. If all retries fail, implement additional retry logic based on your workflow’s expected duration.
Error Handling:
HTTP 200: Workflow accepted and submitted. You receive an
executionId.HTTP 429: Workflow not accepted because queue is full. The SDK automatically retries.
Monitoring: Use
get_execution_detailsto monitor the status of parallel executions.Parameter Isolation: Ensure each parallel execution has its own set of parameters to avoid conflicts.