MaClient API Reference
This page provides detailed API documentation for the Maze Client SDK.
MaClient
- class MaClient(server_url='http://localhost:8000')
The main client class for connecting to the Maze server and managing workflows.
- Parameters:
server_url (str) – The URL of the Maze server. Defaults to
"http://localhost:8000".
Example:
from maze import MaClient # Connect to local server client = MaClient() # Connect to remote server client = MaClient("http://192.168.1.100:8000")
MaWorkflow
- class MaWorkflow(workflow_id, server_url)
Represents a workflow that manages tasks and their execution.
Note
Workflows are typically created using
MaClient.create_workflow()rather than direct instantiation.- add_task(task_func=None, inputs=None, task_type='code', task_name=None, **kwargs)
Add a task to the workflow.
- Parameters:
task_func (callable) – A function decorated with
@task. When provided, the task is automatically configured from the decorator metadata.inputs (dict) –
Dictionary of input parameters. Values can be:
Direct values:
{"key": "value"}Task output references:
{"key": other_task.outputs["output_key"]}
task_type (str) – Type of task (default:
"code")task_name (str) – Custom name for the task. If not provided, uses the function name.
- Returns:
The created task instance
- Return type:
- Raises:
Exception – If task creation or configuration fails
Example 1 - Using decorated function (recommended):
@task(inputs=["text"], outputs=["result"]) def process_text(params): text = params.get("text") return {"result": text.upper()} task1 = workflow.add_task( process_text, inputs={"text": "hello"} )
Example 2 - Referencing other task outputs:
task1 = workflow.add_task(func1, inputs={"input": "value"}) task2 = workflow.add_task( func2, inputs={"input": task1.outputs["output"]} ) # Automatically creates edge: task1 → task2
Example 3 - Custom task name:
task = workflow.add_task( my_function, inputs={"data": "value"}, task_name="Custom Task Name" )
Note
When
inputscontains references to other task outputs (TaskOutputobjects), Maze automatically creates dependency edges between tasks.
- get_tasks()
Retrieve all tasks in the workflow.
- Returns:
List of task information dictionaries
- Return type:
List[dict]
- Raises:
Exception – If the request fails
Example:
tasks = workflow.get_tasks() for task in tasks: print(f"Task ID: {task['id']}, Name: {task['name']}")
Return Format:
[ {"id": "task_id_1", "name": "task_name_1"}, {"id": "task_id_2", "name": "task_name_2"}, ... ]
- add_edge(source_task, target_task)
Manually add a dependency edge between two tasks.
- Parameters:
- Raises:
Exception – If edge creation fails
Example:
workflow.add_edge(task1, task2) # task1 → task2
Note
You typically don’t need to call this method manually. When you reference a task’s output in another task’s input, the edge is created automatically.
Warning
Adding an edge that would create a cycle in the workflow DAG will cause an error.
- del_edge(source_task, target_task)
Delete a dependency edge between two tasks.
- Parameters:
- Raises:
Exception – If edge deletion fails
Example:
workflow.del_edge(task1, task2)
- run()
Submit the workflow for execution.
- Returns:
Unique run ID for this execution
- Return type:
- Raises:
Exception – If the submission fails
Example:
run_id = workflow.run() print(f"Workflow started with run_id: {run_id}")
Note
This method submits the workflow and returns a
run_id. The same workflow can be run multiple times, with each execution having a uniquerun_id. Use thisrun_idwithget_results()orshow_results()to retrieve execution results.
- get_results(run_id, verbose=True)
Retrieve execution results for a specific workflow run. Results are cached client-side for efficient repeated queries.
- Parameters:
- Returns:
List of all execution messages
- Return type:
List[dict]
- Raises:
Exception – If an error occurs during execution
ValueError – If
run_idis invalid
Message Format:
Each message is a dictionary with the following structure:
{ "type": "message_type", "data": { # Type-specific data } }
Message Types:
start_task: A task has started execution{ "type": "start_task", "data": { "workflow_id": "workflow_123", "task_id": "task_12345", "node_ip": "127.0.0.1" } }
finish_task: A task has completed successfully{ "type": "finish_task", "data": { "workflow_id": "workflow_123", "task_id": "task_12345", "result": {"output_key": "output_value"} } }
task_exception: A task failed with an exception{ "type": "task_exception", "data": { "workflow_id": "workflow_123", "task_id": "task_12345", "result": "Traceback (most recent call last):\\n..." } }
finish_workflow: The workflow has completed{ "type": "finish_workflow", "data": {} }
Example 1 - Basic usage (raw messages):
run_id = workflow.run() messages = workflow.get_results(run_id, verbose=True) # Prints raw messages like: # {'type': 'start_task', 'data': {...}} # {'type': 'finish_task', 'data': {...}} for msg in messages: if msg["type"] == "finish_task": print(f"Task result: {msg['data']['result']}")
Example 2 - Silent mode (no printing):
run_id = workflow.run() messages = workflow.get_results(run_id, verbose=False) # No console output, just returns messages task_results = {} for msg in messages: if msg["type"] == "finish_task": task_id = msg["data"]["task_id"] task_results[task_id] = msg["data"]["result"]
Example 3 - Multiple queries (uses cache):
run_id = workflow.run() # First query - fetches from server messages1 = workflow.get_results(run_id, verbose=False) # Second query - returns from cache (no server connection) messages2 = workflow.get_results(run_id, verbose=False) assert messages1 == messages2 # Same data
Note
Results are cached client-side after the first query. Subsequent calls with the same
run_idreturn cached data without reconnecting to the server. This is a workaround for the server’s one-time consumption design.Tip
For user-friendly formatted output, use
show_results()instead. Useget_resultswhen you need the raw message data for custom processing.
- show_results(run_id, output_dir='workflow_results')
Display workflow execution results with formatted, user-friendly output.
This is a high-level method that fetches execution messages and presents them in a readable format. It also handles file downloads for the
client.frontmodule.- Parameters:
- Returns:
Dictionary with execution summary and task results
- Return type:
- Raises:
Exception – If an error occurs during execution
Return Value:
For
client.mazemodule:{ "task_results": { "task_id_1": {"output_key": "value"}, "task_id_2": {"output_key": "value"} }, "workflow_completed": True, "has_exception": False, "exception_tasks": [] }
Example 1 - Basic usage:
run_id = workflow.run() result = workflow.show_results(run_id) # Prints formatted output: # 🔗 Connected to server, starting workflow execution... # ▶ Task started: task_12345678 # ✓ Task completed: task_12345678 # result: HELLO WORLD # ✅ Workflow execution completed! print(result["task_results"])
Example 2 - Complete workflow:
from maze import MaClient, task @task(inputs=["text"], outputs=["result"]) def process(params): return {"result": params["text"].upper()} client = MaClient() workflow = client.create_workflow() task1 = workflow.add_task(process, inputs={"text": "hello"}) run_id = workflow.run() result = workflow.show_results(run_id) if result["workflow_completed"] and not result["has_exception"]: print("Workflow succeeded!") print(f"Results: {result['task_results']}")
Example 3 - Error handling:
run_id = workflow.run() result = workflow.show_results(run_id) if result["has_exception"]: print("Some tasks failed:") for task_id in result["exception_tasks"]: print(f" - {task_id}")
Example 4 - Multiple runs:
# Run the same workflow multiple times run_id1 = workflow.run() result1 = workflow.show_results(run_id1) run_id2 = workflow.run() result2 = workflow.show_results(run_id2) # Each run has independent results
Note
show_results()is designed for human-readable output. For raw message data, useget_results()instead.Tip
This method automatically:
Fetches results using
get_results()(with caching)Prints formatted progress to console
Simplifies exception messages for readability
Returns a structured summary dictionary
- print_graph()
Print an ASCII visualization of the workflow structure to console.
Example:
workflow.print_graph() # Output: # ╔══════════════════════════════════════════════════════════════╗ # ║ Workflow Structure Visualization ║ # ╚══════════════════════════════════════════════════════════════╝ # # Nodes (Tasks): # • task_12345678... [process_data] # • task_87654321... [format_output] # # Dependencies (Edges): # task_12345678... → task_87654321...
- draw_graph(output_path='workflow.png', method='auto', figsize=(12, 8), dpi=300)
Export the workflow graph as an image file.
- Parameters:
- Returns:
Path to the generated image file
- Return type:
- Raises:
ImportError – If required libraries are not installed
Example 1 - Basic usage:
workflow.draw_graph("my_workflow.png") # Creates my_workflow.png using available library
Example 2 - High-resolution PDF:
workflow.draw_graph( output_path="workflow.pdf", method="graphviz", dpi=600 )
Example 3 - Custom size with matplotlib:
workflow.draw_graph( output_path="workflow.png", method="matplotlib", figsize=(16, 10), dpi=150 )
Note
Requires either
graphviz(recommended) ormatplotlib+networkx:# Option 1: Graphviz (recommended) pip install graphviz # Option 2: Matplotlib + NetworkX pip install matplotlib networkx
- get_graph_mermaid()
Generate Mermaid diagram syntax for the workflow.
- Returns:
Mermaid diagram code as string
- Return type:
Example:
mermaid_code = workflow.get_graph_mermaid() print(mermaid_code) # Output: # graph TD # task_12345678["process_data"] # task_87654321["format_output"] # task_12345678 --> task_87654321
Tip
Copy the output to mermaid.live for online visualization.
- get_graph_info()
Get detailed structural information about the workflow graph.
- Returns:
Dictionary with nodes, edges, and statistics
- Return type:
Return Format:
{ "nodes": { "task_id_1": { "name": "task_name", "func_name": "function_name", "inputs": ["input1", "input2"], "outputs": ["output1", "output2"], "resources": {"cpu": 1, "cpu_mem": 0, "gpu": 0, "gpu_mem": 0} }, ... }, "edges": [ ["task_id_1", "task_id_2"], ... ], "stats": { "total_nodes": 5, "total_edges": 4, "max_depth": 3 } }
Example:
info = workflow.get_graph_info() print(f"Total tasks: {info['stats']['total_nodes']}") print(f"Total dependencies: {info['stats']['total_edges']}")
- get_task_result(run_id, task_id)
Query a specific task’s result from cached workflow execution.
- Parameters:
- Returns:
Task result dictionary, or
Noneif not found- Return type:
Optional[dict]
- Raises:
ValueError – If
run_idis not in cache (must callget_results()orshow_results()first)
Example 1 - Query specific task:
run_id = workflow.run() workflow.get_results(run_id, verbose=False) # Cache results # Query specific task task_result = workflow.get_task_result(run_id, task1.task_id) print(f"Task1 output: {task_result}")
Example 2 - Check multiple tasks:
run_id = workflow.run() workflow.show_results(run_id) # Cache results for task in [task1, task2, task3]: result = workflow.get_task_result(run_id, task.task_id) if result: print(f"{task.task_name}: {result}") else: print(f"{task.task_name}: No result found")
Note
This method requires that results for
run_idare already cached. Callget_results()orshow_results()first to populate the cache.
- list_cached_runs()
List all run IDs that have cached results.
- Returns:
List of cached run IDs
- Return type:
List[str]
Example:
# Run workflow multiple times run_id1 = workflow.run() workflow.get_results(run_id1, verbose=False) run_id2 = workflow.run() workflow.get_results(run_id2, verbose=False) # List cached runs cached = workflow.list_cached_runs() print(f"Cached runs: {cached}") # Output: ['run_id1', 'run_id2']
- clear_cache(run_id=None)
Clear cached workflow results.
- Parameters:
run_id (str) – Specific run ID to clear. If
None, clears all cached results.
Example 1 - Clear specific run:
workflow.clear_cache(run_id) print(f"Cleared cache for {run_id}")
Example 2 - Clear all cache:
workflow.clear_cache() print("All cache cleared")
MaTask
- class MaTask(task_id, workflow_id, server_url, task_name=None, output_keys=None)
Represents a task in a workflow.
Note
Tasks are typically created using
MaWorkflow.add_task()rather than direct instantiation.- outputs
Collection of task outputs that can be referenced by other tasks.
- Type:
Example:
task1 = workflow.add_task(func1, inputs={"in": "value"}) # Reference task1's output output_ref = task1.outputs["output_key"] # Use in another task task2 = workflow.add_task( func2, inputs={"in": task1.outputs["output_key"]} )
TaskOutput
- class TaskOutput(task_id, output_key)
Represents a reference to a task’s output parameter.
Note
TaskOutputobjects are created automatically when accessingtask.outputs["key"].
TaskOutputs
- class TaskOutputs(task_id, output_keys)
A collection of task outputs supporting dictionary-style access.
Decorators
@task
- @task(inputs, outputs, resources=None, data_types=None)
Decorator for defining task functions with metadata.
- Parameters:
- Returns:
Decorated function with task metadata
- Return type:
callable
Parameters:
inputs (List[str]): Names of input parameters that the task expects. These correspond to keys in the
paramsdictionary.outputs (List[str]): Names of output parameters that the task returns. These must match the keys in the returned dictionary.
resources (Dict[str, Any], optional): Resource requirements for the task. If not specified or partially specified, missing values are filled with defaults:
cpu(int/float): Number of CPU cores (default: 1, minimum: 1)cpu_mem(int): CPU memory in MB (default: 0)gpu(int/float): Number of GPUs (default: 0, auto-set to 1 ifgpu_mem > 0)gpu_mem(int): GPU memory in MB (default: 0)
data_types (Dict[str, str], optional): Data types for parameters. Defaults to
"str"for all parameters.Supported types:
"str","int","float","bool","list","dict"
Function Requirements:
The decorated function must:
Accept a single
paramsdictionary argumentReturn a dictionary containing all declared output parameters
Example 1 - Basic task:
from maze import task @task( inputs=["text"], outputs=["result"], resources={"cpu": 1, "cpu_mem": 128} ) def uppercase_text(params): text = params.get("text") return {"result": text.upper()} # Equivalent to (auto-filled): # resources={"cpu": 1, "cpu_mem": 128, "gpu": 0, "gpu_mem": 0}
Example 2 - Multiple inputs and outputs:
@task( inputs=["a", "b"], outputs=["sum", "product", "difference"], resources={"cpu": 2, "cpu_mem": 256, "gpu": 0, "gpu_mem": 0} ) def calculate(params): a = params.get("a") b = params.get("b") return { "sum": a + b, "product": a * b, "difference": a - b }
Example 3 - Using external libraries:
@task( inputs=["numbers"], outputs=["mean", "std"], resources={"cpu": 2, "cpu_mem": 512, "gpu": 0, "gpu_mem": 0} ) def statistics(params): import numpy as np numbers = params.get("numbers") arr = np.array(numbers) return { "mean": float(np.mean(arr)), "std": float(np.std(arr)) }
Example 4 - GPU task:
@task( inputs=["data"], outputs=["result"], resources={"cpu": 4, "cpu_mem": 2048, "gpu": 1, "gpu_mem": 4096} ) def gpu_processing(params): import torch data = params.get("data") # GPU processing logic result = process_with_gpu(data) return {"result": result}
Example 5 - Auto-filled resources:
# Only specify gpu_mem - gpu auto-set to 1, cpu auto-set to 1 @task( inputs=["data"], outputs=["result"], resources={"gpu_mem": 4096} ) def auto_gpu_task(params): return {"result": "processed"} # Effective resources: {"cpu": 1, "cpu_mem": 0, "gpu": 1, "gpu_mem": 4096} # No resources specified - uses all defaults @task(inputs=["x"], outputs=["y"]) def minimal_task(params): return {"y": params["x"] * 2} # Effective resources: {"cpu": 1, "cpu_mem": 0, "gpu": 0, "gpu_mem": 0}
Example 6 - Custom data types:
@task( inputs=["count", "rate"], outputs=["total"], resources={"cpu": 1, "cpu_mem": 128}, data_types={"count": "int", "rate": "float", "total": "float"} ) def calculate_total(params): count = params.get("count") rate = params.get("rate") return {"total": count * rate}
Note
The
@taskdecorator usescloudpickleto serialize the function, including any imports and closures. This allows the function to be executed on remote workers with full access to its dependencies.Warning
Ensure that any external libraries used in the task function are installed on the worker nodes.
Data Types
Input Schema Types
When configuring task inputs, two schema types are supported:
from_user: Direct user input
{"key": "direct_value"}
from_task: Reference to another task’s output
{"key": other_task.outputs["output_key"]}
Resource Configuration
Resource requirements are specified as dictionaries:
resources = {
"cpu": 4, # CPU cores (int or float)
"cpu_mem": 2048, # CPU memory in MB (int)
"gpu": 1, # GPU count (int or float)
"gpu_mem": 4096 # GPU memory in MB (int)
}
Resource Units:
CPU cores: Number of processor cores (can be fractional, e.g., 0.5)
CPU memory: Megabytes (MB)
GPU: Number of GPUs (can be fractional for GPU sharing)
GPU memory: Megabytes (MB)
See Also
quick_start - Get started quickly with a simple example
maclient - Learn about workflow patterns and best practices
GitHub Repository - Browse example code and contribute