Manager
Central service for task management and proplet coordination in Propeller
The Manager is Propeller's central orchestration service. It coordinates the complete lifecycle of WebAssembly workloads across distributed proplets—from task creation and scheduling through execution tracking and result collection.
For a quick start with basic task operations, see the Getting Started guide. For advanced workflow patterns, see Task Scheduling.
Why the Manager
Central Coordination
Distributed edge computing requires a coordinator that understands the state of all nodes and workloads. The Manager provides:
- Single point of control: One API to manage tasks across all proplets
- State persistence: All task and proplet state survives restarts
- Scheduling intelligence: Automatic proplet selection based on availability and load
- Execution tracking: Real-time visibility into task progress and results
Decoupled Communication
The Manager acts as an intermediary between HTTP clients and MQTT-connected proplets:
This separation allows clients to use familiar REST APIs while proplets communicate over lightweight MQTT—ideal for constrained edge devices.
For the full system architecture and component overview, see the Architecture documentation.
Running the Manager
Using Docker Compose
The recommended way to run the Manager is with Docker Compose:
cd propeller
docker compose -f docker/compose.yaml --env-file docker/.env up -dVerify the Manager is running:
curl http://localhost:7070/healthYour output should look like this:
{
"status": "pass",
"version": "0.0.0",
"commit": "ffffffff",
"description": "manager service",
"build_time": "1970-01-01_00:00:00",
"instance_id": "47c7fcf5-2d9a-41e6-8e0c-97303736019e"
}Configuration
The Manager is configured through environment variables. These are the most commonly used:
Core Settings
| Variable | Description | Default |
|---|---|---|
MANAGER_LOG_LEVEL | Log level (debug, info, warn, error) | info |
MQTT Connection
The Manager communicates with proplets over MQTT using the SuperMQ message broker. SuperMQ provides secure, multi-tenant messaging with built-in authentication and authorization—ensuring that only authorized components can publish and subscribe to Propeller's control channels.
| Variable | Description | Default |
|---|---|---|
MANAGER_MQTT_ADDRESS | MQTT broker URL | tcp://localhost:1883 |
MANAGER_MQTT_QOS | Quality of Service level (0, 1, or 2) | 2 |
MANAGER_MQTT_TIMEOUT | Connection and operation timeout | 30s |
MANAGER_DOMAIN_ID | SuperMQ domain identifier | From config.toml |
MANAGER_CHANNEL_ID | SuperMQ channel identifier | From config.toml |
MANAGER_CLIENT_ID | MQTT client ID for authentication | From config.toml |
MANAGER_CLIENT_KEY | MQTT client key for authentication | From config.toml |
QoS Levels:
0(At most once): Fire and forget—fastest but messages may be lost1(At least once): Guaranteed delivery but may have duplicates2(Exactly once): Guaranteed single delivery—recommended for Propeller
Why QoS 2? Propeller uses QoS 2 because task scheduling requires exactly-once semantics. With QoS 1, a task start command could be delivered twice, causing the proplet to execute the same task multiple times—wasting resources and corrupting results. Similarly, duplicate result messages could trigger downstream workflow tasks prematurely or update task state incorrectly. QoS 2's four-step handshake (PUBLISH → PUBREC → PUBREL → PUBCOMP) ensures each message is processed exactly once, maintaining consistent state across the Manager and all proplets.
Provisioning with config.toml
The MANAGER_DOMAIN_ID, MANAGER_CHANNEL_ID, MANAGER_CLIENT_ID, and MANAGER_CLIENT_KEY values come from SuperMQ provisioning. The propeller-cli provision command automates this setup by:
- Authenticating with your SuperMQ instance
- Creating a domain (logical isolation boundary)
- Creating clients (Manager, Proplets, Proxy)
- Creating a channel (shared communication topic)
- Connecting all clients to the channel with publish/subscribe permissions
- Writing credentials to
config.toml
Run the provisioning wizard:
propeller-cli provisionThe wizard prompts for SuperMQ credentials and generates a config.toml file:
# SuperMQ Configuration
[manager]
domain_id = "73cc3c8f-2c6c-4c08-903b-f1f426d5d9bf"
client_id = "47c7fcf5-2d9a-41e6-8e0c-97303736019e"
client_key = "26975130-4cd7-4791-8050-c1698ac31e10"
channel_id = "9856d353-88d3-4eb6-82bd-117eadf69c74"
[proplet]
domain_id = "73cc3c8f-2c6c-4c08-903b-f1f426d5d9bf"
client_id = "ec182939-7940-4b25-869e-47b245ddec09"
client_key = "21076163-f7b9-439e-94b9-f17f69ce3d28"
channel_id = "9856d353-88d3-4eb6-82bd-117eadf69c74"
[proxy]
domain_id = "73cc3c8f-2c6c-4c08-903b-f1f426d5d9bf"
client_id = "ec182939-7940-4b25-869e-47b245ddec09"
client_key = "21076163-f7b9-439e-94b9-f17f69ce3d28"
channel_id = "9856d353-88d3-4eb6-82bd-117eadf69c74"Each section contains:
| Field | Purpose |
|---|---|
domain_id | SuperMQ domain for multi-tenant isolation |
client_id | Unique identifier for MQTT authentication |
client_key | Secret key for MQTT authentication |
channel_id | Shared channel for Manager-Proplet communication |
The Manager, Proplets, and Proxy all share the same domain_id and channel_id but have unique client_id/client_key pairs. This allows them to communicate securely on a shared channel while maintaining distinct identities.
When starting Docker Compose, the docker/.env file references these values, which are loaded from config.toml. You can also set them directly as environment variables to override the file.
HTTP Server
The Manager exposes a REST API over HTTP for client interactions. This provides a familiar, language-agnostic interface—any HTTP client (curl, SDKs, web applications) can create tasks, query proplet status, and manage workflows without understanding MQTT internals.
| Variable | Description | Default |
|---|---|---|
MANAGER_HTTP_HOST | Listen address (empty = all interfaces) | localhost |
MANAGER_HTTP_PORT | Listen port | 7070 |
MANAGER_HTTP_SERVER_CERT | TLS certificate file path | "" |
MANAGER_HTTP_SERVER_KEY | TLS private key file path | "" |
MANAGER_HTTP_SERVER_CA_CERTS | CA certificate for verifying client certs (mTLS) | "" |
MANAGER_HTTP_CLIENT_CA_CERTS | Client CA certificate for mTLS | "" |
MANAGER_HTTP_SERVER_READ_TIMEOUT | Maximum time to read entire request | 15s |
MANAGER_HTTP_SERVER_WRITE_TIMEOUT | Maximum time to write response | 15s |
MANAGER_HTTP_SERVER_READ_HEADER_TIMEOUT | Maximum time to read request headers | 5s |
MANAGER_HTTP_SERVER_IDLE_TIMEOUT | Keep-alive connection timeout | 60s |
MANAGER_HTTP_SERVER_MAX_HEADER_BYTES | Maximum request header size | 1048576 (1 MB) |
Enabling HTTPS: To enable TLS, provide both certificate and key files:
export MANAGER_HTTP_SERVER_CERT=/etc/propeller/cert.pem
export MANAGER_HTTP_SERVER_KEY=/etc/propeller/key.pemWhen both are set, the server accepts HTTPS connections only. For production deployments, always enable TLS to protect task payloads and API credentials in transit.
Mutual TLS (mTLS): For zero-trust environments, enable client certificate verification:
export MANAGER_HTTP_SERVER_CA_CERTS=/etc/propeller/ca.pem
export MANAGER_HTTP_CLIENT_CA_CERTS=/etc/propeller/client-ca.pemThis requires clients to present valid certificates signed by the configured CA.
Timeout tuning: The default timeouts work well for most deployments. Increase MANAGER_HTTP_SERVER_WRITE_TIMEOUT if clients download large Wasm binaries, or increase MANAGER_HTTP_SERVER_READ_TIMEOUT if clients upload large files via /tasks/{id}/upload.
Storage Backend
The Manager persists task definitions, proplet registrations, job configurations, and execution metrics to a storage backend. Propeller supports four backends to match different deployment scenarios—from quick local testing to production-grade durability.
Why four backends? Different environments have different requirements:
- Development needs zero setup and fast iteration
- Edge deployments need persistence without external dependencies
- Production needs durability, backups, and operational tooling
Select your storage backend with MANAGER_STORAGE_TYPE:
| Backend | Default | Best For | Trade-offs |
|---|---|---|---|
memory | ✓ | Development, testing, CI pipelines | Zero setup; all state lost on restart |
sqlite | Single-node production, edge gateways | File-based persistence; no external dependencies; limited concurrent writes | |
postgres | Multi-node production, enterprise deployments | Full SQL capabilities, ACID transactions, replication; requires PostgreSQL server | |
badger | High-throughput edge, embedded systems | Fast key-value store; no SQL queries; good for high write volumes |
Default: memory — The Manager defaults to in-memory storage so you can start immediately without database setup. This is ideal for:
- Local development and experimentation
- Automated testing in CI/CD pipelines
- Quick demos and proof-of-concept work
However, all state is lost when the Manager restarts. Tasks, proplets, and jobs disappear. Never use memory in production.
Switching Storage Backends
Set MANAGER_STORAGE_TYPE to change backends. Each backend has additional configuration:
SQLite — Best for single-node deployments where you need persistence without running a database server. The entire state lives in one file, making backups simple (cp propeller.db propeller.db.backup).
export MANAGER_STORAGE_TYPE=sqlite
export MANAGER_SQLITE_PATH=./propeller.dbUse cases:
- Edge gateways managing local proplets
- Small-scale production (under ~100 concurrent tasks)
- Deployments where PostgreSQL is overkill
PostgreSQL — Best for production deployments that need durability, concurrent access, and operational tooling. Supports multiple Manager replicas, point-in-time recovery, and standard database monitoring.
export MANAGER_STORAGE_TYPE=postgres
export MANAGER_POSTGRES_HOST=localhost
export MANAGER_POSTGRES_PORT=5432
export MANAGER_POSTGRES_USER=propeller
export MANAGER_POSTGRES_PASS=propeller
export MANAGER_POSTGRES_DB=propeller
export MANAGER_POSTGRES_SSLMODE=disableUse cases:
- Production deployments with SLA requirements
- Multi-Manager high-availability setups
- Environments with existing PostgreSQL infrastructure
- When you need SQL queries for reporting or debugging
Badger — Best for high-write-throughput scenarios on edge devices. Badger is an embedded key-value store optimized for SSDs, offering faster writes than SQLite for metrics-heavy workloads.
export MANAGER_STORAGE_TYPE=badger
export MANAGER_BADGER_PATH=./data/badgerUse cases:
- Edge devices collecting high-frequency metrics
- Embedded systems with limited memory
- When SQLite write performance becomes a bottleneck
Observability
The Manager provides built-in observability through Prometheus metrics and OpenTelemetry distributed tracing. These tools help you monitor system health, debug performance issues, and trace requests across the Manager and proplets.
Prometheus Metrics are always enabled at /metrics. OpenTelemetry tracing requires configuration.
| Variable | Description | Default |
|---|---|---|
MANAGER_OTEL_URL | OpenTelemetry collector URL (OTLP/HTTP) | "" (tracing disabled) |
MANAGER_TRACE_RATIO | Fraction of requests to trace (0.0–1.0) | 0 |
Prometheus Metrics
The Manager exposes Prometheus-compatible metrics at http://localhost:7070/metrics. No configuration needed—metrics are always available.
Available metrics include:
| Metric | Type | Description |
|---|---|---|
manager_api_request_count | Counter | Total API requests by method and status |
manager_api_request_latency_seconds | Histogram | Request latency distribution |
| Task state counters | Gauge | Number of tasks in each state |
Scrape configuration for Prometheus:
scrape_configs:
- job_name: 'propeller-manager'
static_configs:
- targets: ['localhost:7070']
scrape_interval: 15sOpenTelemetry Tracing
Enable distributed tracing to follow requests from the HTTP API through MQTT to proplet execution. This is invaluable for debugging slow requests, failed tasks, and message delivery issues.
Setup with Jaeger:
# Start Jaeger (all-in-one for development)
docker run -d --name jaeger \
-p 4318:4318 \
-p 16686:16686 \
jaegertracing/all-in-one:latest
# Configure Manager to send traces
export MANAGER_OTEL_URL=http://localhost:4318
export MANAGER_TRACE_RATIO=1.0 # Trace all requests (development)Access the Jaeger UI at http://localhost:16686 to view traces.
Trace ratio tuning:
0— No tracing (default, zero overhead)0.1— Sample 10% of requests (production recommendation)1.0— Trace every request (development/debugging only)
In production, start with 0.1 and adjust based on observed overhead and debugging needs. Tracing adds latency and storage costs proportional to the sample rate.
Middleware Stack
Every API request passes through three middleware layers in order:
- Logging → Records method name, duration, and errors to structured logs
- Tracing → Adds OpenTelemetry spans (when enabled)
- Metrics → Increments counters and records latency histograms
This means even with tracing disabled, you still get logging and metrics for every request.
Federated Learning
Propeller supports federated machine learning workflows where proplets train local models and contribute updates to a central coordinator. This keeps training data on edge devices while still producing a globally optimized model.
| Variable | Description | Default |
|---|---|---|
COORDINATOR_URL | External FL Coordinator service URL | "" (FL disabled) |
The Manager acts as a proxy between proplets and an external FL Coordinator service. The coordinator handles aggregation algorithms (FedAvg, etc.) while the Manager handles task distribution and result collection. For a complete overview of the architecture and training round lifecycle, see the Federated Learning documentation.
Enabling Federated Learning
Set COORDINATOR_URL to your FL Coordinator's base URL:
export COORDINATOR_URL=http://fl-coordinator:8080The Manager then proxies FL endpoints to the coordinator. For the complete endpoint reference, see the Federated Learning API.
When FL is Disabled
When COORDINATOR_URL is not set:
- The Manager logs a warning at startup:
"COORDINATOR_URL not configured - FL features will not be available" - All FL endpoints return an error:
"COORDINATOR_URL must be configured" - Standard task, job, and workflow operations work normally
This allows you to run Propeller without FL dependencies when you only need standard compute workloads.
Task Operations
Tasks are the fundamental unit of work in Propeller. Each task represents a WebAssembly module to execute on a proplet.
For a hands-on tutorial with commands and expected outputs, see the Getting Started guide. For the complete task data model including all fields and states, see the Reference documentation.
WASM Module Deployment
Tasks execute independently on proplets—possibly on different nodes—so each task needs access to its WASM module. There are two ways to provide it:
-
Upload directly: Use
PUT /tasks/{id}/uploadwith a multipart form to upload a.wasmfile. The module is stored with the task and delivered to the proplet when execution starts. -
Reference via URL: Set
image_urlwhen creating the task to reference a WASM module hosted in an OCI registry (Docker Hub, GHCR, private registries). The Proxy service fetches the module from the registry, chunks it for efficient transfer, and delivers it to the proplet over MQTT.
The URL approach is recommended for production deployments. It keeps your WASM binaries versioned in a registry, avoids embedding large files in API requests, and allows multiple proplets to fetch the same module without re-uploading. See the Proxy documentation for setup instructions and authentication configuration.
Manager's Role in Task Execution
When you call POST /tasks/{id}/start, the Manager performs a multi-step orchestration:
1. Dependency checking — If the task has depends_on set, the Manager verifies all dependencies exist and are completed. This ensures workflow ordering is respected before any execution begins.
2. Proplet selection — If no proplet_id is specified in the task, the Manager calls its scheduler to pick one:
// From manager/service.go
p, err = svc.SelectProplet(ctx, t)The default round-robin scheduler cycles through alive proplets, skipping any that haven't sent a heartbeat within the last 10 seconds.
If proplet_id IS specified, the Manager validates that proplet is alive before proceeding:
// From manager/service.go
if !p.Alive {
return fmt.Errorf("specified proplet %s is not alive", t.PropletID)
}3. Task-proplet binding — The Manager records which proplet owns the task in the task_proplet repository. This mapping is used later for stop commands and result routing.
4. MQTT publish — The Manager publishes a start command to the control topic:
m/{domain_id}/c/{channel_id}/control/manager/startThe payload includes the task definition, Wasm binary location, and execution parameters.
5. State transition — The task moves from Pending → Running in the database.
6. Proplet task count — The Manager increments the selected proplet's TaskCount to track load.
Stopping Tasks
When you call POST /tasks/{id}/stop, the Manager:
- Looks up the task-proplet mapping to find which proplet is running the task
- Publishes a stop command to:
m/{domain_id}/c/{channel_id}/control/manager/stop - Deletes the task-proplet mapping
- Decrements the proplet's
TaskCount
The proplet receives the stop command and terminates the Wasm execution.
Result Handling
When a proplet completes a task, it publishes results to:
m/{domain_id}/c/{channel_id}/control/proplet/resultsThe Manager receives this message and:
- Updates the task's
Resultsfield with the output - Sets
StatetoCompleted(orFailedif an error message is present) - Records
FinishTime - Triggers the workflow coordinator (see Workflows below)
Proplet Management
Proplets are the edge runtimes that execute tasks. The Manager tracks their availability through MQTT heartbeats and manages their lifecycle.
For detailed proplet configuration and capabilities, see the Proplet documentation.
Manager's Role in Proplet Tracking
The Manager subscribes to three proplet-related MQTT topics:
| Topic | Purpose |
|---|---|
control/proplet/create | New proplet registration |
control/proplet/alive | Heartbeat signals |
control/proplet/metrics | Resource utilization reports |
Proplet Registration — When a proplet starts, it publishes a discovery message:
{"proplet_id": "a95517f9-5655-4cf5-a7c8-aa00290b3895"}The Manager's createPropletHandler receives this and:
- Generates a human-readable name (e.g., "crimson-falcon")
- Creates a proplet record in the database
- Logs the successful registration
Heartbeat Processing — Proplets send heartbeats every few seconds. The Manager's updateLivenessHandler:
- Looks up the proplet by ID
- If not found, auto-registers it (handles restart scenarios)
- Sets
Alive = true - Appends the current timestamp to
AliveHistory(kept to last 10 entries) - Persists the updated record
Proplet Liveness
A proplet is considered alive if it sent a heartbeat within the last 10 seconds:
// From pkg/proplet/proplet.go
func (p *Proplet) SetAlive() {
if len(p.AliveHistory) > 0 {
lastAlive := p.AliveHistory[len(p.AliveHistory)-1]
if time.Since(lastAlive) <= aliveTimeout {
p.Alive = true
return
}
}
p.Alive = false
}The Manager recalculates liveness on every GetProplet and ListProplets call. This ensures scheduling decisions use fresh state.
Dead proplets are excluded from scheduling — The round-robin scheduler skips any proplet where Alive == false. If ALL proplets are dead, task start commands fail with ErrDeadProplers.
Listing Proplets
curl "http://localhost:7070/proplets?limit=10"Your output should look like this:
{
"offset": 0,
"limit": 10,
"total": 1,
"proplets": [
{
"id": "a95517f9-5655-4cf5-a7c8-aa00290b3895",
"name": "crimson-falcon",
"task_count": 2,
"alive": true,
"alive_at": ["2026-03-01T12:00:00Z", "2026-03-01T12:00:10Z"],
"metadata": {
"os": "linux",
"hostname": "edge-node-1",
"cpu_arch": "amd64",
"wasm_runtime": "wasmtime"
}
}
]
}Deleting Proplets
Proplets with active tasks cannot be deleted:
// From manager/service.go
if p.TaskCount > 0 {
return fmt.Errorf("%w: proplet %s has %d active tasks", pkgerrors.ErrConflict, propletID, p.TaskCount)
}Stop all tasks on a proplet before removing it:
# Check proplet's task count
curl "http://localhost:7070/proplets/a95517f9-5655-4cf5-a7c8-aa00290b3895"
# Delete (only succeeds if task_count == 0)
curl -X DELETE "http://localhost:7070/proplets/a95517f9-5655-4cf5-a7c8-aa00290b3895"Jobs
Jobs group multiple tasks together and execute them with a common strategy. Unlike workflows, job tasks don't have inter-task dependencies—they run based on the execution mode.
For DAG-based task ordering with dependencies, see Workflows below.
Manager's Role in Job Execution
Job Creation (POST /jobs) — The Manager's CreateJob method:
- Generates a job ID (or uses one provided in the request)
- Assigns all tasks to the same
job_id - Validates the DAG structure (even for jobs, cycles are rejected)
- Stores the job with its
execution_mode - Persists each task individually
curl -X POST "http://localhost:7070/jobs" \
-H "Content-Type: application/json" \
-d '{
"name": "batch-process",
"execution_mode": "sequential",
"tasks": [
{"name": "step-1", "image_url": "docker.io/myorg/step1:v1"},
{"name": "step-2", "image_url": "docker.io/myorg/step2:v1"},
{"name": "step-3", "image_url": "docker.io/myorg/step3:v1"}
]
}'Your output should look like this:
{
"job_id": "b2c3d4e5-f6a7-8901-bcde-f23456789012",
"tasks": [
{
"id": "c3d4e5f6-a7b8-9012-cdef-345678901234",
"name": "step-1",
"state": 0,
"image_url": "docker.io/myorg/step1:v1",
"cli_args": null,
"daemon": false,
"encrypted": false,
"job_id": "b2c3d4e5-f6a7-8901-bcde-f23456789012",
"start_time": "0001-01-01T00:00:00Z",
"finish_time": "0001-01-01T00:00:00Z",
"created_at": "2026-03-17T10:00:00Z",
"updated_at": "2026-03-17T10:00:00Z",
"next_run": "0001-01-01T00:00:00Z"
}
]
}Execution Modes
For execution mode options (parallel, sequential, configurable) and their behavior, see Task Scheduling: Jobs.
When a task completes, the Manager checks if it belongs to a job. For sequential jobs, it automatically starts the next task in order.
Failure handling — If ANY task in a job fails, the Manager stops all remaining running tasks:
// From manager/service.go
if t.State == task.Failed {
svc.logger.InfoContext(ctx, "task failed, stopping remaining job tasks", "job_id", t.JobID, "task_id", taskID)
svc.stopJobTasks(ctx, jobTasks)
}Starting and Stopping Jobs
# Start all tasks according to execution mode
curl -X POST "http://localhost:7070/jobs/{job_id}/start"
# Stop all running tasks in the job
curl -X POST "http://localhost:7070/jobs/{job_id}/stop"Stop behavior — StopJob iterates through all job tasks and calls StopTask on each, publishing stop commands to the respective proplets.
Workflows
Workflows use DAG-based ordering to express dependencies between tasks. A task only starts when all its dependencies have completed.
For detailed workflow patterns, conditional execution, and examples, see Task Scheduling.
Manager's Role in Workflow Orchestration
The Manager uses a WorkflowCoordinator component to handle DAG-based execution:
// From manager/workflow.go
type WorkflowCoordinator struct {
taskRepo storage.TaskRepository
service Service
logger *slog.Logger
}Workflow Creation (POST /workflows) — The Manager's CreateWorkflow method:
- Generates a shared
workflow_idfor all tasks - Validates all
depends_onreferences exist within the workflow - Runs DAG validation to detect cycles
- Validates
run_ifvalues (success,failure, or empty) - Persists all tasks atomically (rollback on failure)
curl -X POST "http://localhost:7070/workflows" \
-H "Content-Type: application/json" \
-d '{
"tasks": [
{"name": "fetch-data", "image_url": "docker.io/myorg/fetch:v1"},
{
"name": "process",
"image_url": "docker.io/myorg/process:v1",
"depends_on": ["<fetch-data-task-id>"],
"run_if": "success"
}
]
}'DAG Validation — Before persisting, the Manager validates the task graph:
// From manager/service.go
if err := dag.ValidateDependenciesExist(tasks); err != nil {
return nil, fmt.Errorf("dependency validation failed: %w", err)
}
if err := dag.ValidateDAG(tasks); err != nil {
return nil, fmt.Errorf("DAG validation failed: %w", err)
}Invalid workflows are rejected immediately with a descriptive error.
Workflow Coordinator
When any task completes, the Manager calls OnTaskCompletion:
// From manager/workflow.go
func (wc *WorkflowCoordinator) OnTaskCompletion(ctx context.Context, taskID string) error {
t, err := wc.service.GetTask(ctx, taskID)
if err != nil {
return fmt.Errorf("failed to get completed task: %w", err)
}
if t.WorkflowID == "" {
return nil
}
return wc.CheckAndStartReadyTasks(ctx, t.WorkflowID)
}CheckAndStartReadyTasks scans the workflow for tasks that:
- Have all dependencies in terminal state (Completed, Failed, or Skipped)
- Are not already Running or Scheduled
- Pass conditional execution evaluation
Conditional Execution
The run_if field controls when a task should execute based on dependency outcomes:
| Value | Manager Behavior |
|---|---|
success (default) | Start only if ALL dependencies completed successfully |
failure | Start only if ANY dependency failed |
// From manager/workflow.go
func (wc *WorkflowCoordinator) EvaluateConditionalExecution(ctx context.Context, t task.Task, parentStates map[string]task.State) bool {
if len(t.DependsOn) == 0 {
return true
}
runIf := t.RunIf
if runIf == "" {
runIf = task.RunIfSuccess
}
switch runIf {
case task.RunIfSuccess:
for _, depID := range t.DependsOn {
state, exists := parentStates[depID]
if !exists || state != task.Completed {
return false
}
}
return true
case task.RunIfFailure:
for _, depID := range t.DependsOn {
state, exists := parentStates[depID]
if exists && state == task.Failed {
return true
}
}
return false
default:
wc.logger.WarnContext(ctx, "invalid run_if value", "task_id", t.ID, "run_if", runIf)
return false
}
}Skipped tasks — If conditional execution returns false, the task is marked Skipped rather than started. This allows downstream tasks to evaluate their own conditions.
Workflow Execution Flow
Workflow execution begins when you submit a workflow via POST /workflows. The Manager validates the DAG structure, checks that all depends_on references exist, and creates all tasks in a Pending state. At this point, no tasks are running—the workflow is ready but waiting.
To start execution, you call POST /tasks/{id}/start on one or more root tasks (tasks with no dependencies). The Manager assigns each root task to a proplet and publishes start commands over MQTT. When a root task completes, the proplet publishes results back to the Manager, which triggers OnTaskCompletion.
The workflow coordinator then takes over. It calls CheckAndStartReadyTasks, which scans the workflow for tasks whose dependencies have all reached terminal states. For each eligible task, the coordinator evaluates conditional execution rules (run_if) and either starts the task or marks it as Skipped. This process repeats automatically—each task completion triggers another round of evaluation—until all workflow tasks have reached a terminal state.
You only need to start the root task(s); the coordinator handles everything else.
Scheduling
The Manager handles two types of scheduling: one-time proplet selection when starting tasks, and recurring execution via the built-in cron scheduler.
Proplet Selection (Round-Robin)
When a task starts without a specified proplet_id, the Manager's scheduler selects one using a round-robin algorithm implemented in pkg/scheduler/roundrobin.go.
Key behaviors:
- Cycles through proplets in order, wrapping around
- Skips dead proplets (no heartbeat in last 10 seconds)
- Fails immediately if no proplets exist or all are dead
- Does NOT consider current load (
TaskCount)—simple round-robin only
Cron Scheduler and Priority
For recurring task execution with cron schedules and priority-based dispatch ordering, see Task Scheduling.
The Manager's cron loop calls StartTask for due tasks, which then goes through the same proplet selection process described above.
Data Flow
Task Execution Flow
The diagram above illustrates the complete lifecycle of a task from creation to completion. The flow begins when a client sends a POST /tasks request, which creates the task in a Pending state within the Manager's database. When the client subsequently calls POST /tasks/{id}/start, the Manager selects an available proplet using its round-robin scheduler and publishes a start command over MQTT.
The proplet receives this command on the control/manager/start topic, downloads the Wasm binary, and begins execution. As the Wasm module runs, it produces results that the proplet collects. Upon completion, the proplet publishes these results back to the Manager via the control/proplet/results topic. The Manager then updates the task's state to either Completed or Failed based on whether an error was present in the results.
Proplet Registration Flow
This diagram shows how proplets join the system and maintain their availability status. When a proplet starts, it immediately publishes a discovery message to the control/proplet/create MQTT topic containing its unique identifier. The Manager receives this message, generates a human-readable name for the proplet, and creates a database record.
After registration, the proplet enters a continuous heartbeat loop, sending alive signals every few seconds to control/proplet/alive. The Manager tracks these heartbeats in the proplet's AliveHistory array, keeping the most recent 10 timestamps. A proplet is considered alive—and therefore eligible for task scheduling—only if its last heartbeat arrived within 10 seconds. Dead proplets are automatically excluded from round-robin selection until they resume sending heartbeats.
API Reference
For detailed request/response schemas and examples, see the API Reference.