MaClient API Reference

This page provides detailed API documentation for the Maze Client SDK.

MaClient

class MaClient(server_url='http://localhost:8000')

The main client class for connecting to the Maze server and managing workflows.

Parameters:

server_url (str) – The URL of the Maze server. Defaults to "http://localhost:8000".

Example:

from maze import MaClient

# Connect to local server
client = MaClient()

# Connect to remote server
client = MaClient("http://192.168.1.100:8000")
create_workflow()

Create a new workflow on the server.

Returns:

A new workflow instance

Return type:

MaWorkflow

Raises:

Exception – If the workflow creation fails

Example:

workflow = client.create_workflow()
print(f"Created workflow: {workflow.workflow_id}")

MaWorkflow

class MaWorkflow(workflow_id, server_url)

Represents a workflow that manages tasks and their execution.

Note

Workflows are typically created using MaClient.create_workflow() rather than direct instantiation.

workflow_id

The unique identifier of the workflow.

Type:

str

server_url

The URL of the Maze server.

Type:

str

add_task(task_func=None, inputs=None, task_type='code', task_name=None, **kwargs)

Add a task to the workflow.

Parameters:
  • task_func (callable) – A function decorated with @task. When provided, the task is automatically configured from the decorator metadata.

  • inputs (dict) –

    Dictionary of input parameters. Values can be:

    • Direct values: {"key": "value"}

    • Task output references: {"key": other_task.outputs["output_key"]}

  • task_type (str) – Type of task (default: "code")

  • task_name (str) – Custom name for the task. If not provided, uses the function name.

Returns:

The created task instance

Return type:

MaTask

Raises:

Exception – If task creation or configuration fails

Example 1 - Using decorated function (recommended):

@task(inputs=["text"], outputs=["result"])
def process_text(params):
    text = params.get("text")
    return {"result": text.upper()}

task1 = workflow.add_task(
    process_text,
    inputs={"text": "hello"}
)

Example 2 - Referencing other task outputs:

task1 = workflow.add_task(func1, inputs={"input": "value"})
task2 = workflow.add_task(
    func2,
    inputs={"input": task1.outputs["output"]}
)
# Automatically creates edge: task1 → task2

Example 3 - Custom task name:

task = workflow.add_task(
    my_function,
    inputs={"data": "value"},
    task_name="Custom Task Name"
)

Note

When inputs contains references to other task outputs (TaskOutput objects), Maze automatically creates dependency edges between tasks.

get_tasks()

Retrieve all tasks in the workflow.

Returns:

List of task information dictionaries

Return type:

List[dict]

Raises:

Exception – If the request fails

Example:

tasks = workflow.get_tasks()
for task in tasks:
    print(f"Task ID: {task['id']}, Name: {task['name']}")

Return Format:

[
    {"id": "task_id_1", "name": "task_name_1"},
    {"id": "task_id_2", "name": "task_name_2"},
    ...
]
add_edge(source_task, target_task)

Manually add a dependency edge between two tasks.

Parameters:
  • source_task (MaTask) – The source (predecessor) task

  • target_task (MaTask) – The target (successor) task

Raises:

Exception – If edge creation fails

Example:

workflow.add_edge(task1, task2)  # task1 → task2

Note

You typically don’t need to call this method manually. When you reference a task’s output in another task’s input, the edge is created automatically.

Warning

Adding an edge that would create a cycle in the workflow DAG will cause an error.

del_edge(source_task, target_task)

Delete a dependency edge between two tasks.

Parameters:
  • source_task (MaTask) – The source task

  • target_task (MaTask) – The target task

Raises:

Exception – If edge deletion fails

Example:

workflow.del_edge(task1, task2)
run()

Submit the workflow for execution.

Returns:

Unique run ID for this execution

Return type:

str

Raises:

Exception – If the submission fails

Example:

run_id = workflow.run()
print(f"Workflow started with run_id: {run_id}")

Note

This method submits the workflow and returns a run_id. The same workflow can be run multiple times, with each execution having a unique run_id. Use this run_id with get_results() or show_results() to retrieve execution results.

get_results(run_id, verbose=True)

Retrieve execution results for a specific workflow run. Results are cached client-side for efficient repeated queries.

Parameters:
  • run_id (str) – The run ID returned by run()

  • verbose (bool) – If True, print raw messages to console. Default is True.

Returns:

List of all execution messages

Return type:

List[dict]

Raises:

Message Format:

Each message is a dictionary with the following structure:

{
    "type": "message_type",
    "data": {
        # Type-specific data
    }
}

Message Types:

  • start_task: A task has started execution

    {
        "type": "start_task",
        "data": {
            "workflow_id": "workflow_123",
            "task_id": "task_12345",
            "node_ip": "127.0.0.1"
        }
    }
    
  • finish_task: A task has completed successfully

    {
        "type": "finish_task",
        "data": {
            "workflow_id": "workflow_123",
            "task_id": "task_12345",
            "result": {"output_key": "output_value"}
        }
    }
    
  • task_exception: A task failed with an exception

    {
        "type": "task_exception",
        "data": {
            "workflow_id": "workflow_123",
            "task_id": "task_12345",
            "result": "Traceback (most recent call last):\\n..."
        }
    }
    
  • finish_workflow: The workflow has completed

    {
        "type": "finish_workflow",
        "data": {}
    }
    

Example 1 - Basic usage (raw messages):

run_id = workflow.run()

messages = workflow.get_results(run_id, verbose=True)
# Prints raw messages like:
# {'type': 'start_task', 'data': {...}}
# {'type': 'finish_task', 'data': {...}}

for msg in messages:
    if msg["type"] == "finish_task":
        print(f"Task result: {msg['data']['result']}")

Example 2 - Silent mode (no printing):

run_id = workflow.run()

messages = workflow.get_results(run_id, verbose=False)
# No console output, just returns messages

task_results = {}
for msg in messages:
    if msg["type"] == "finish_task":
        task_id = msg["data"]["task_id"]
        task_results[task_id] = msg["data"]["result"]

Example 3 - Multiple queries (uses cache):

run_id = workflow.run()

# First query - fetches from server
messages1 = workflow.get_results(run_id, verbose=False)

# Second query - returns from cache (no server connection)
messages2 = workflow.get_results(run_id, verbose=False)

assert messages1 == messages2  # Same data

Note

Results are cached client-side after the first query. Subsequent calls with the same run_id return cached data without reconnecting to the server. This is a workaround for the server’s one-time consumption design.

Tip

For user-friendly formatted output, use show_results() instead. Use get_results when you need the raw message data for custom processing.

show_results(run_id, output_dir='workflow_results')

Display workflow execution results with formatted, user-friendly output.

This is a high-level method that fetches execution messages and presents them in a readable format. It also handles file downloads for the client.front module.

Parameters:
  • run_id (str) – The run ID returned by run()

  • output_dir (str) – Directory for downloading result files (default: "workflow_results", only used in client.front)

Returns:

Dictionary with execution summary and task results

Return type:

dict

Raises:

Exception – If an error occurs during execution

Return Value:

For client.maze module:

{
    "task_results": {
        "task_id_1": {"output_key": "value"},
        "task_id_2": {"output_key": "value"}
    },
    "workflow_completed": True,
    "has_exception": False,
    "exception_tasks": []
}

Example 1 - Basic usage:

run_id = workflow.run()
result = workflow.show_results(run_id)
# Prints formatted output:
# 🔗 Connected to server, starting workflow execution...
# ▶ Task started: task_12345678
# ✓ Task completed: task_12345678
#   result: HELLO WORLD
# ✅ Workflow execution completed!

print(result["task_results"])

Example 2 - Complete workflow:

from maze import MaClient, task

@task(inputs=["text"], outputs=["result"])
def process(params):
    return {"result": params["text"].upper()}

client = MaClient()
workflow = client.create_workflow()

task1 = workflow.add_task(process, inputs={"text": "hello"})

run_id = workflow.run()
result = workflow.show_results(run_id)

if result["workflow_completed"] and not result["has_exception"]:
    print("Workflow succeeded!")
    print(f"Results: {result['task_results']}")

Example 3 - Error handling:

run_id = workflow.run()
result = workflow.show_results(run_id)

if result["has_exception"]:
    print("Some tasks failed:")
    for task_id in result["exception_tasks"]:
        print(f"  - {task_id}")

Example 4 - Multiple runs:

# Run the same workflow multiple times
run_id1 = workflow.run()
result1 = workflow.show_results(run_id1)

run_id2 = workflow.run()
result2 = workflow.show_results(run_id2)

# Each run has independent results

Note

show_results() is designed for human-readable output. For raw message data, use get_results() instead.

Tip

This method automatically:

  • Fetches results using get_results() (with caching)

  • Prints formatted progress to console

  • Simplifies exception messages for readability

  • Returns a structured summary dictionary

print_graph()

Print an ASCII visualization of the workflow structure to console.

Example:

workflow.print_graph()
# Output:
# ╔══════════════════════════════════════════════════════════════╗
# ║              Workflow Structure Visualization               ║
# ╚══════════════════════════════════════════════════════════════╝
#
# Nodes (Tasks):
#   • task_12345678... [process_data]
#   • task_87654321... [format_output]
#
# Dependencies (Edges):
#   task_12345678... → task_87654321...
draw_graph(output_path='workflow.png', method='auto', figsize=(12, 8), dpi=300)

Export the workflow graph as an image file.

Parameters:
  • output_path (str) – Path for the output image file. Supported formats: PNG, PDF, SVG, JPG

  • method (str) – Rendering method - "auto" (default), "graphviz", or "matplotlib"

  • figsize (tuple) – Figure size for matplotlib method (width, height in inches)

  • dpi (int) – Image resolution in dots per inch

Returns:

Path to the generated image file

Return type:

str

Raises:

ImportError – If required libraries are not installed

Example 1 - Basic usage:

workflow.draw_graph("my_workflow.png")
# Creates my_workflow.png using available library

Example 2 - High-resolution PDF:

workflow.draw_graph(
    output_path="workflow.pdf",
    method="graphviz",
    dpi=600
)

Example 3 - Custom size with matplotlib:

workflow.draw_graph(
    output_path="workflow.png",
    method="matplotlib",
    figsize=(16, 10),
    dpi=150
)

Note

Requires either graphviz (recommended) or matplotlib + networkx:

# Option 1: Graphviz (recommended)
pip install graphviz

# Option 2: Matplotlib + NetworkX
pip install matplotlib networkx
get_graph_mermaid()

Generate Mermaid diagram syntax for the workflow.

Returns:

Mermaid diagram code as string

Return type:

str

Example:

mermaid_code = workflow.get_graph_mermaid()
print(mermaid_code)
# Output:
# graph TD
#     task_12345678["process_data"]
#     task_87654321["format_output"]
#     task_12345678 --> task_87654321

Tip

Copy the output to mermaid.live for online visualization.

get_graph_info()

Get detailed structural information about the workflow graph.

Returns:

Dictionary with nodes, edges, and statistics

Return type:

dict

Return Format:

{
    "nodes": {
        "task_id_1": {
            "name": "task_name",
            "func_name": "function_name",
            "inputs": ["input1", "input2"],
            "outputs": ["output1", "output2"],
            "resources": {"cpu": 1, "cpu_mem": 0, "gpu": 0, "gpu_mem": 0}
        },
        ...
    },
    "edges": [
        ["task_id_1", "task_id_2"],
        ...
    ],
    "stats": {
        "total_nodes": 5,
        "total_edges": 4,
        "max_depth": 3
    }
}

Example:

info = workflow.get_graph_info()
print(f"Total tasks: {info['stats']['total_nodes']}")
print(f"Total dependencies: {info['stats']['total_edges']}")
get_task_result(run_id, task_id)

Query a specific task’s result from cached workflow execution.

Parameters:
  • run_id (str) – The run ID of the workflow execution

  • task_id (str) – The task ID to query

Returns:

Task result dictionary, or None if not found

Return type:

Optional[dict]

Raises:

ValueError – If run_id is not in cache (must call get_results() or show_results() first)

Example 1 - Query specific task:

run_id = workflow.run()
workflow.get_results(run_id, verbose=False)  # Cache results

# Query specific task
task_result = workflow.get_task_result(run_id, task1.task_id)
print(f"Task1 output: {task_result}")

Example 2 - Check multiple tasks:

run_id = workflow.run()
workflow.show_results(run_id)  # Cache results

for task in [task1, task2, task3]:
    result = workflow.get_task_result(run_id, task.task_id)
    if result:
        print(f"{task.task_name}: {result}")
    else:
        print(f"{task.task_name}: No result found")

Note

This method requires that results for run_id are already cached. Call get_results() or show_results() first to populate the cache.

list_cached_runs()

List all run IDs that have cached results.

Returns:

List of cached run IDs

Return type:

List[str]

Example:

# Run workflow multiple times
run_id1 = workflow.run()
workflow.get_results(run_id1, verbose=False)

run_id2 = workflow.run()
workflow.get_results(run_id2, verbose=False)

# List cached runs
cached = workflow.list_cached_runs()
print(f"Cached runs: {cached}")
# Output: ['run_id1', 'run_id2']
clear_cache(run_id=None)

Clear cached workflow results.

Parameters:

run_id (str) – Specific run ID to clear. If None, clears all cached results.

Example 1 - Clear specific run:

workflow.clear_cache(run_id)
print(f"Cleared cache for {run_id}")

Example 2 - Clear all cache:

workflow.clear_cache()
print("All cache cleared")

MaTask

class MaTask(task_id, workflow_id, server_url, task_name=None, output_keys=None)

Represents a task in a workflow.

Note

Tasks are typically created using MaWorkflow.add_task() rather than direct instantiation.

task_id

The unique identifier of the task.

Type:

str

workflow_id

The ID of the workflow this task belongs to.

Type:

str

task_name

The name of the task.

Type:

str

outputs

Collection of task outputs that can be referenced by other tasks.

Type:

TaskOutputs

Example:

task1 = workflow.add_task(func1, inputs={"in": "value"})

# Reference task1's output
output_ref = task1.outputs["output_key"]

# Use in another task
task2 = workflow.add_task(
    func2,
    inputs={"in": task1.outputs["output_key"]}
)
delete()

Delete the task from the workflow.

Raises:

Exception – If deletion fails

Example:

task.delete()

TaskOutput

class TaskOutput(task_id, output_key)

Represents a reference to a task’s output parameter.

Note

TaskOutput objects are created automatically when accessing task.outputs["key"].

task_id

The ID of the task that produces this output.

Type:

str

output_key

The key of the output parameter.

Type:

str

to_reference_string()

Convert to the server’s reference string format.

Returns:

Reference string in format "{task_id}.output.{output_key}"

Return type:

str

Example:

output = task1.outputs["result"]
ref_str = output.to_reference_string()
# Returns: "task_id_123.output.result"

TaskOutputs

class TaskOutputs(task_id, output_keys)

A collection of task outputs supporting dictionary-style access.

__getitem__(key)

Get an output reference by key.

Parameters:

key (str) – The output parameter name

Returns:

Reference to the output

Return type:

TaskOutput

Raises:

KeyError – If the output key doesn’t exist

Example:

output = task.outputs["result"]
keys()

Get all output keys.

Returns:

List of output parameter names

Return type:

list

Example:

for key in task.outputs.keys():
    print(f"Output: {key}")

Decorators

@task

@task(inputs, outputs, resources=None, data_types=None)

Decorator for defining task functions with metadata.

Parameters:
  • inputs (list) – List of input parameter names

  • outputs (list) – List of output parameter names

  • resources (dict) – Resource requirements (optional)

  • data_types (dict) – Data type mapping for parameters (optional)

Returns:

Decorated function with task metadata

Return type:

callable

Parameters:

  • inputs (List[str]): Names of input parameters that the task expects. These correspond to keys in the params dictionary.

  • outputs (List[str]): Names of output parameters that the task returns. These must match the keys in the returned dictionary.

  • resources (Dict[str, Any], optional): Resource requirements for the task. If not specified or partially specified, missing values are filled with defaults:

    • cpu (int/float): Number of CPU cores (default: 1, minimum: 1)

    • cpu_mem (int): CPU memory in MB (default: 0)

    • gpu (int/float): Number of GPUs (default: 0, auto-set to 1 if gpu_mem > 0)

    • gpu_mem (int): GPU memory in MB (default: 0)

  • data_types (Dict[str, str], optional): Data types for parameters. Defaults to "str" for all parameters.

    • Supported types: "str", "int", "float", "bool", "list", "dict"

Function Requirements:

The decorated function must:

  1. Accept a single params dictionary argument

  2. Return a dictionary containing all declared output parameters

Example 1 - Basic task:

from maze import task

@task(
    inputs=["text"],
    outputs=["result"],
    resources={"cpu": 1, "cpu_mem": 128}
)
def uppercase_text(params):
    text = params.get("text")
    return {"result": text.upper()}

# Equivalent to (auto-filled):
# resources={"cpu": 1, "cpu_mem": 128, "gpu": 0, "gpu_mem": 0}

Example 2 - Multiple inputs and outputs:

@task(
    inputs=["a", "b"],
    outputs=["sum", "product", "difference"],
    resources={"cpu": 2, "cpu_mem": 256, "gpu": 0, "gpu_mem": 0}
)
def calculate(params):
    a = params.get("a")
    b = params.get("b")
    return {
        "sum": a + b,
        "product": a * b,
        "difference": a - b
    }

Example 3 - Using external libraries:

@task(
    inputs=["numbers"],
    outputs=["mean", "std"],
    resources={"cpu": 2, "cpu_mem": 512, "gpu": 0, "gpu_mem": 0}
)
def statistics(params):
    import numpy as np

    numbers = params.get("numbers")
    arr = np.array(numbers)

    return {
        "mean": float(np.mean(arr)),
        "std": float(np.std(arr))
    }

Example 4 - GPU task:

@task(
    inputs=["data"],
    outputs=["result"],
    resources={"cpu": 4, "cpu_mem": 2048, "gpu": 1, "gpu_mem": 4096}
)
def gpu_processing(params):
    import torch

    data = params.get("data")
    # GPU processing logic
    result = process_with_gpu(data)

    return {"result": result}

Example 5 - Auto-filled resources:

# Only specify gpu_mem - gpu auto-set to 1, cpu auto-set to 1
@task(
    inputs=["data"],
    outputs=["result"],
    resources={"gpu_mem": 4096}
)
def auto_gpu_task(params):
    return {"result": "processed"}

# Effective resources: {"cpu": 1, "cpu_mem": 0, "gpu": 1, "gpu_mem": 4096}

# No resources specified - uses all defaults
@task(inputs=["x"], outputs=["y"])
def minimal_task(params):
    return {"y": params["x"] * 2}

# Effective resources: {"cpu": 1, "cpu_mem": 0, "gpu": 0, "gpu_mem": 0}

Example 6 - Custom data types:

@task(
    inputs=["count", "rate"],
    outputs=["total"],
    resources={"cpu": 1, "cpu_mem": 128},
    data_types={"count": "int", "rate": "float", "total": "float"}
)
def calculate_total(params):
    count = params.get("count")
    rate = params.get("rate")
    return {"total": count * rate}

Note

The @task decorator uses cloudpickle to serialize the function, including any imports and closures. This allows the function to be executed on remote workers with full access to its dependencies.

Warning

Ensure that any external libraries used in the task function are installed on the worker nodes.

Data Types

Input Schema Types

When configuring task inputs, two schema types are supported:

  • from_user: Direct user input

    {"key": "direct_value"}
    
  • from_task: Reference to another task’s output

    {"key": other_task.outputs["output_key"]}
    

Resource Configuration

Resource requirements are specified as dictionaries:

resources = {
    "cpu": 4,          # CPU cores (int or float)
    "cpu_mem": 2048,   # CPU memory in MB (int)
    "gpu": 1,          # GPU count (int or float)
    "gpu_mem": 4096    # GPU memory in MB (int)
}

Resource Units:

  • CPU cores: Number of processor cores (can be fractional, e.g., 0.5)

  • CPU memory: Megabytes (MB)

  • GPU: Number of GPUs (can be fractional for GPU sharing)

  • GPU memory: Megabytes (MB)

See Also

  • quick_start - Get started quickly with a simple example

  • maclient - Learn about workflow patterns and best practices

  • GitHub Repository - Browse example code and contribute