Parallel Workflow Execution

  • Workflows can be configured to support parallel execution, allowing multiple workflow runs to execute simultaneously.

  • This is particularly useful when processing multiple batches of data in parallel, improving overall throughput and reducing total processing time.

  • When executing workflows in parallel, it’s important to handle queue full scenarios (HTTP 429) appropriately.

Before we look into examples, let’s see how to configure a workflow for parallel execution and handle parallel execution scenarios.

Parallel Execution Configuration

To enable parallel execution for a workflow, you need to set the allowConcurrentExecution metadata flag to true:

body:
    name: Parallel Data Processing Pipeline
    metadata:
        allowConcurrentExecution: true
    triggers:
        - webhook: true
    steps:
        data_processing:
            type: standard
            imageId: 1
            imageVersionId: 1
            command: "python process_data.py"
            resources:
                instanceTypeId: 1
                storage: 10GB
            parents: []
            parameters:
                env:
                    BATCH_SIZE: "1000"
                    PROCESSING_TIMEOUT: "3600"

Creating Parallel Workflows via Press

When creating a workflow block spec using Press, you can enable parallel execution by setting the allowConcurrentExecution metadata flag to true in the config.metadata section of the block spec.

Here’s an example of how to create a parallel workflow block spec using the Peak SDK:

from collections.abc import Iterator
from typing import Any

from peak.press import blocks

client: blocks.Block = blocks.get_client()

spec: dict[str, Any] = {
    "version": "1",
    "kind": "workflow",
    "metadata": {
        "name": "workflow-parallel-execution",
        "title": "Workflow Parallel Execution",
        "summary": "A workflow block that supports parallel execution",
        "description": "Workflow to test parallel execution capabilities",
        "descriptionContentType": "text/markdown",
        "imageUrl": "https://my-block-pics.com/image-0.jpg",
        "tags": [
            {
                "name": "parallel",
            },
        ],
    },
    "release": {
        "version": "1.0.0",
        "notes": "This is the original release",
    },
    "config": {
        "metadata": {
            "allowConcurrentExecution": True,
        },
        "steps": {
            "step-1": {
                "type": "standard",
                "resources": {
                    "instanceTypeId": 21,
                    "storage": "10GB",
                },
                "imageDetails": {
                    "id": 2942,
                },
                "command": "echo Done!",
            },
        },
    },
}

workflow_spec: dict[str, str] = client.create_spec(
    body=spec,
    featured=True,
    scope="private",
)

print(f"Created workflow block spec with ID: {workflow_spec['id']}")

Key points:

  • The allowConcurrentExecution flag must be set in config.metadata, not in the top-level metadata section.

  • The top-level metadata section contains block spec metadata (name, title, description, etc.).

  • The config.metadata section contains workflow-specific metadata, including the parallel execution flag.

Executing Parallel Workflow Runs

Once a workflow is configured for parallel execution, you can execute multiple runs simultaneously. Each run can have its own set of parameters, allowing you to process different batches of data in parallel.

Basic Parallel Execution

Here’s an example of how to execute multiple workflow runs in parallel using the Peak SDK:

from peak.resources.workflows import get_client
from peak.exceptions import BadRequestException


def execute_parallel_runs(workflow_id, batch_data):
    """Execute multiple parallel workflow runs."""
    workflow_client = get_client()

    execution_ids = []

    for i, batch in enumerate(batch_data):
        try:
            # Execute workflow with batch-specific parameters
            execution = workflow_client.execute_workflow(
                workflow_id=workflow_id,
                body={
                    "params": {
                        "global": {
                            "batch_id": batch["id"],
                            "input_path": batch["input_path"],
                            "output_path": batch["output_path"],
                        }
                    }
                },
            )
            execution_ids.append(execution["executionId"])
            print(f"Started execution {i+1}: {execution['executionId']}")

        except BadRequestException as e:
            print(f"Failed to start execution {i+1}: {str(e)}")
            # Handle queue full scenarios
            if "429" in str(e) or "queue" in str(e).lower():
                print(
                    "Queue is full - SDK will automatically retry. If retries fail, retry later."
                )
            else:
                print(f"Other error occurred: {str(e)}")

    return execution_ids

Handling Queue Full Scenarios

When executing workflows in parallel, you may encounter HTTP 429 (Too Many Requests) errors if the queue is full.

  • HTTP 200: The workflow was accepted and submitted. You receive an executionId in the response.

  • HTTP 429: The workflow was not accepted because the queue is full. No workflow execution was created.

The SDK automatically retries 429 errors with exponential backoff. This is safe because a 429 response means the workflow wasn’t accepted, so retrying won’t create duplicate executions.

If the SDK’s automatic retries fail, you can implement additional retry logic in your application based on your workflow’s expected duration (e.g., short-running vs. long-running workflows).

Here’s an example of proper error handling:

from peak.resources.workflows import get_client
from peak.exceptions import BadRequestException

workflow_client = get_client()

try:
    execution = workflow_client.execute_workflow(
        workflow_id=workflow_id,
        body={
            "params": {
                "global": {
                    "batch_id": "batch-001",
                    "input_path": "/data/batch-001",
                    "output_path": "/output/batch-001",
                }
            }
        },
    )
    print(f"Started execution: {execution['executionId']}")

except BadRequestException as e:
    if "429" in str(e) or "queue" in str(e).lower():
        print(
            "Queue is full - SDK will automatically retry. If retries fail, retry later."
        )
    else:
        print(f"Other error occurred: {str(e)}")

Complete Example

Here’s a complete example demonstrating parallel workflow execution:

from peak.resources.workflows import get_client
from peak.exceptions import BadRequestException


def create_parallel_workflow():
    """Create a workflow that supports parallel execution."""
    workflow_client = get_client()

    # Define a workflow with parallel execution metadata
    workflow_config = {
        "name": "Parallel Data Processing Pipeline",
        "metadata": {"allowConcurrentExecution": True},
        "triggers": [{"webhook": True}],
        "steps": {
            "data_processing": {
                "type": "standard",
                "imageId": 1,
                "imageVersionId": 1,
                "command": "python process_data.py",
                "resources": {"instanceTypeId": 1, "storage": "10GB"},
                "parents": [],
                "parameters": {
                    "env": {"BATCH_SIZE": "1000", "PROCESSING_TIMEOUT": "3600"}
                },
            }
        },
    }

    # Create the workflow
    workflow = workflow_client.create_workflow(workflow_config)
    print(f"Created workflow with ID: {workflow['id']}")
    return workflow["id"]


def execute_parallel_runs(workflow_id, batch_data):
    """Execute multiple parallel workflow runs."""
    workflow_client = get_client()

    execution_ids = []

    for i, batch in enumerate(batch_data):
        try:
            # Execute workflow with batch-specific parameters
            execution = workflow_client.execute_workflow(
                workflow_id=workflow_id,
                body={
                    "params": {
                        "global": {
                            "batch_id": batch["id"],
                            "input_path": batch["input_path"],
                            "output_path": batch["output_path"],
                        }
                    }
                },
            )
            execution_ids.append(execution["executionId"])
            print(f"Started execution {i+1}: {execution['executionId']}")

        except BadRequestException as e:
            print(f"Failed to start execution {i+1}: {str(e)}")
            # Handle queue full scenarios
            if "429" in str(e) or "queue" in str(e).lower():
                print(
                    "Queue is full - SDK will automatically retry. If retries fail, retry later."
                )
            else:
                print(f"Other error occurred: {str(e)}")

    return execution_ids


def monitor_executions(workflow_id, execution_ids):
    """Monitor the status of workflow executions."""
    workflow_client = get_client()

    print("\nMonitoring executions...")
    for execution_id in execution_ids:
        try:
            # Get execution details
            execution_details = workflow_client.get_execution_details(
                workflow_id, execution_id
            )
            if execution_details:
                status = execution_details.get("status", "Unknown")
                print(f"Execution {execution_id}: {status}")
        except Exception as e:
            print(f"Error monitoring execution {execution_id}: {str(e)}")


# Main execution
if __name__ == "__main__":
    print("=== Peak SDK Parallel Workflow Example ===\n")

    # Step 1: Create a parallel workflow
    print("1. Creating parallel workflow...")
    workflow_id = create_parallel_workflow()

    # Step 2: Prepare batch data for parallel processing
    batch_data = [
        {
            "id": "batch-001",
            "input_path": "/data/batch-001",
            "output_path": "/output/batch-001",
        },
        {
            "id": "batch-002",
            "input_path": "/data/batch-002",
            "output_path": "/output/batch-002",
        },
        {
            "id": "batch-003",
            "input_path": "/data/batch-003",
            "output_path": "/output/batch-003",
        },
        {
            "id": "batch-004",
            "input_path": "/data/batch-004",
            "output_path": "/output/batch-004",
        },
        {
            "id": "batch-005",
            "input_path": "/data/batch-005",
            "output_path": "/output/batch-005",
        },
    ]

    # Step 3: Execute parallel runs
    print(f"\n2. Executing {len(batch_data)} parallel workflow runs...")
    execution_ids = execute_parallel_runs(workflow_id, batch_data)

    # Step 4: Monitor executions
    print(f"\n3. Monitoring {len(execution_ids)} executions...")
    monitor_executions(workflow_id, execution_ids)

    print(f"\n=== Example completed ===")
    print(f"Workflow ID: {workflow_id}")
    print(f"Execution IDs: {execution_ids}")

Important Considerations

  • Resource Management: Ensure your infrastructure can handle the concurrent load when executing workflows in parallel.

  • Rate Limiting: The SDK automatically retries HTTP 429 errors with exponential backoff. If all retries fail, implement additional retry logic based on your workflow’s expected duration.

  • Error Handling:

    • HTTP 200: Workflow accepted and submitted. You receive an executionId.

    • HTTP 429: Workflow not accepted because queue is full. The SDK automatically retries.

  • Monitoring: Use get_execution_details to monitor the status of parallel executions.

  • Parameter Isolation: Ensure each parallel execution has its own set of parameters to avoid conflicts.