Skip to main content

NoETL Distributed Runtime + Event-Sourced Shared Memory Spec

Status: staged implementation in progress. Owner: NoETL core (repos/noetl). Implementation tracker:

  • Umbrella: noetl/noetl#434.

  • Phase 0 PR: noetl/noetl#435.

  • Phase 1 frame execution: noetl/noetl#436.

  • Projectors: noetl/noetl#437.

  • Shared payload cache / Arrow IPC hints: noetl/noetl#438.

  • Event/projection store ports: noetl/noetl#439.

  • Replay regression / serialization gates: noetl/noetl#440.

  • Docs alignment: noetl/noetl#441.

  • Batch event mirror: noetl/noetl#460.

  • Remaining event append mirror audit: noetl/noetl#461.

  • Transactional outbox: noetl/noetl#464. Companion specs that this revision builds on (do not re-spec):

  • noetl_async_sharded_architecture.md — sharded projection workers, epoch barriers.

  • noetl_cursor_loop_design.md — cursor-driven worker loops (already in production for PFT v2).

  • noetl_data_plane_architecture.md — control/data plane split, reference-only event envelopes, TempStore tiers.

  • noetl_distributed_processing_plan.md — phased rollout: P0 shipped, P1–P3 partial.

  • noetl_pft_performance_optimization.md — PFT-specific tuning.

  • distributed_fanout_mode_spec.md — designed but not shipped; this spec absorbs it.

  • nats_kv_distributed_cache.md — NATS K/V scope.

Not in scope here: DSL surface changes or the playbook authoring guide. Event-sourcing invariants are in scope because they are the contract that lets the distributed runtime reproduce system state at a requested point in time.

Latest implementation checkpoint:

  • Local kind full PFT v2 execution 629193644754337792 completed on 2026-05-18 in 487.855s (~8m08s) using frame.process: frame, per-stage frame-stream lock ordering, and the full frame lifecycle (frame.dispatched -> frame.started -> frame.committed).
  • The previous local kind default-row-frame baseline was 1880.739s (~31m21s); the row-concurrency experiment was 2002.841s (~33m23s).
  • The full lock-order validation produced 5,496 committed frames, 49.94 average rows/frame, 50 max rows/frame, and 5,496/5,496 frames with claim, start, and terminal event links.
  • Validation passed for all 10 facilities: all patient domains were 1000/1000 and MDS was 224,443/224,443.
  • The canonical PFT v2 fixture shape remains preserved for regression coverage. Page-continuation fidelity is covered by a companion fixture, fixtures/playbooks/pft_flow_test/test_pft_page_continuation.yaml, which follows paging.hasMore with one HTTP request per patient/data-type/page record instead of changing the canonical benchmark.
  • 2026-05-18 hardening update: noetl/noetl#453 removes the remaining lazy frame-mint advisory lock from the cursor claim hot path by using an indexed stage/worker-slot/frame-index claim key with INSERT ... ON CONFLICT DO NOTHING; noetl/noetl#454 fixes execution-list status flicker when a late non-terminal projection event follows an already-terminal noetl.execution.status.
  • 2026-05-18 serialization gate update: noetl/noetl#456 rejects frame commit output_ref envelopes that advertise an IPC/shared-memory hint without any durable ref, uri, or locator, preserving replay authority after cache GC.
  • 2026-05-18 event mirror update: noetl/noetl#460 wires the async batch-ingestion path into the same opt-in event mirror used by direct /api/events and frame lifecycle routes. Batch item events, batch.accepted, batch status events, and batch-generated command.issued envelopes now carry command, stage, frame, and parent lineage into the distribution stream after the canonical database commit.
  • 2026-05-18 execution-route mirror update: noetl/noetl#462 mirrors /api/execute generated command.issued envelopes and /api/executions/{execution_id}/cancel execution.cancelled events after commit. Remaining direct event append paths are tracked by noetl/noetl#461.
  • 2026-05-18 outbox pivot: noetl/noetl#464 adds noetl.outbox as the preferred distribution bridge. Event writers should enqueue mirrored envelopes in the same transaction as noetl.event; a publisher drains committed outbox rows to NATS. The table stores JSONB for operator visibility plus pre-encoded payload bytes and a codec field so the drain path can publish bytes without reserializing JSON when the subject is known.
  • 2026-05-18 core event outbox update: noetl/noetl#466 moves direct /api/events lifecycle envelopes and generated command.issued envelopes onto the same-transaction outbox path. The route keeps HTTP semantics unchanged, but distribution intent is now durable before any publish attempt.
  • 2026-05-18 batch event outbox update: noetl/noetl#467 applies the same boundary to /api/events/batch: item events, batch.accepted, batch status events, and batch-generated command.issued chunks enqueue to noetl.outbox before commit, then drain committed rows.
  • 2026-05-18 execution event outbox update: noetl/noetl#468 moves /api/execute initial command.issued envelopes and /api/executions/{execution_id}/cancel execution.cancelled envelopes to same-transaction outbox enqueue.
  • 2026-05-18 auto-resume outbox update: noetl/noetl#469 moves recovery-generated execution.cancelled envelopes to the same-transaction outbox path.
  • 2026-05-18 broker event outbox update: noetl/noetl#470 moves generic broker EventService.emit_event appends to same-transaction outbox enqueue.
  • 2026-05-19 command-claim outbox update: noetl/noetl#471 moves command.claimed worker lifecycle events to same-transaction outbox enqueue.
  • 2026-05-19 event-store outbox update: noetl/noetl#472 moves backend-neutral PostgresEventStore.append writes to same-transaction outbox enqueue.
  • 2026-05-19 DSL executor outbox update: noetl/noetl#473 moves engine-owned lifecycle, stage.opened, and stage.closed events to same-transaction outbox enqueue with explicit caller-owned transaction handling.
  • 2026-05-19 projector codec update: noetl/noetl#479 lets projector NATS consumers decode both JSON envelopes and Arrow Feather payloads, so noetl.outbox can keep the low-serialization bytes path without making projector workers JSON-only.
  • 2026-05-19 outbox publisher update: noetl/noetl#480 adds python -m noetl.outbox as a standalone publisher loop. Route-local drains remain a latency optimization, while the worker gives failed or deferred outbox rows a traffic-independent retry path.
  • 2026-05-20 autoscaling surface update: noetl/ops#107 adds an optional KEDA Prometheus scaler for noetl-worker that reads the NoETL runtime backlog metric noetl_frame_backlog_total. The scaler consumes the NoETL metric contract, not the current storage implementation, so the default chart still renders the fixed replica count / existing HPA behavior unless explicitly enabled.
  • 2026-05-20 tenant backlog metric update: noetl/noetl#491 adds noetl_frame_backlog_detail_total with tenant, organization, stage-kind, and status labels. KEDA keeps using the global noetl_frame_backlog_total series, while policy engines and dashboards can inspect tenant/org pressure without coupling to a storage table.
  • 2026-05-20 topology label update: noetl/noetl#492 adds cluster_id, region, and zone labels to worker-local metrics when the environment provides them; noetl/ops#108 exposes matching Helm values. This gives operators a locality signal for IPC/cache hit analysis before scheduler claim hints become enforceable.
  • 2026-05-20 topology helper update: noetl/noetl#494 centralizes worker locality extraction, canonical worker-locator construction, and locality-distance comparison (node, zone, region, cluster, any) in noetl.core.runtime.topology, so worker metrics, frame claims, frame-dispatch metadata, and scheduler enforcement use the same identity rules.
  • 2026-05-20 placement evaluation update: noetl/noetl#495 records source_locality and placement evaluation (distance, max_distance, within_max_distance) on frame.dispatched metadata when a frame cursor carries source_locality / producer_locality. This makes locality decisions replayable before frame selection is changed.
  • 2026-05-20 runtime-event topology update: noetl/noetl#496 enriches worker-reported runtime events with meta.locality and a canonical meta.worker_locator from the shared topology helper. Explicit caller-provided locality or locator metadata remains authoritative, so event replay can distinguish observed worker identity from producer-owned placement context.
  • 2026-05-20 command-claim topology update: noetl/noetl#497 lets command claim requests carry optional worker locality and records locality, worker_locator, source_locality, and placement evaluation on command.claimed metadata. Existing clients remain compatible, while topology-aware workers make command ownership replayable by placement distance.
  • 2026-05-20 replay command projection update: noetl/noetl#498 folds command.issued, command.claimed, command.started, and terminal command events into the deterministic replay state. The reconstructed command view now carries stage/frame linkage, parent command id, worker id, locality, worker_locator, source locality, and placement evaluation, closing the command-ownership gap in point-in-time replay.
  • 2026-05-20 business-object replay update: noetl/noetl#499 adds a generic business_objects replay projection for events that explicitly identify a domain object through aggregate_type=business_object or business-object metadata. The fold records object identity, status, version, event lineage, attributes, and payload references without introducing storage-specific logic.
  • 2026-05-20 command replay parity update: noetl/noetl#500 adds normalized live/replayed command projection rows and command_projection_checksum, so release gates can compare command ownership, lifecycle event links, worker topology, and placement metadata the same way frame replay parity compares frame rows.
  • 2026-05-20 business-object replay parity update: noetl/noetl#501 adds normalized live/replayed business-object projection rows and business_object_projection_checksum, covering object identity, lifecycle/event lineage, payload summaries, and folded attributes.
  • 2026-05-20 loop replay parity update: noetl/noetl#502 adds normalized live/replayed loop projection rows and loop_projection_checksum, covering loop totals, done/failed counters, completion state, and last event linkage.
  • 2026-05-20 execution replay parity update: noetl/noetl#503 adds normalized live/replayed execution projection rows and execution_projection_checksum, covering tenant/org scope, execution status, last node/event, event count, payload-reference summary, projection name, and upcaster registry digest.
  • 2026-05-20 stage replay parity update: noetl/noetl#504 adds normalized live/replayed stage projection rows and stage_projection_checksum, covering stage status, kind, parent/loop linkage, open/close event ids, frame/row/emitted/failed counters, and last event linkage.
  • 2026-05-20 replay checksum bundle update: noetl/noetl#505 adds replay_projection_checksum_bundle(state) and returns it as projection_checksums from replay state responses. Replay-state projector records also copy the bundle into metadata for cheap checkpoint inspection. The single deterministic checksum map covers execution, stages, frames, commands, business objects, and loops, keeping release gates and operators on one replay parity contract while each surface remains independently comparable. The same PR adds storage-neutral live-bundle and parity-report helpers plus an offline JSON comparator for future adapters and GKE validation, without depending on database routines.
  • 2026-05-20 replay event-reader port update: noetl/noetl#506 introduces ReplayEventReader as the storage-neutral read port for canonical replay inputs. The current SQL reads move behind PostgresReplayEventReader, so ReplayService folds events and snapshots through configurable or per-call adapters instead of owning storage-specific routines.
  • 2026-05-20 replay payload resolver update: noetl/noetl#507 introduces ReplayPayloadResolver as the storage-neutral read port for immutable replay payload references. Replay state remains cheap by default; callers opt in with resolve_payloads=true to receive bounded checksum/shape summaries rather than payload bodies. Duplicate payload references are resolved once per replay call and reported for every lineage occurrence, and payload_resolution_summary gives total/resolved/unresolved/unique-ref counts plus a deterministic checksum.
  • 2026-05-20 replay payload-resolution gate update: noetl/noetl#508 adds scripts/check_replay_payload_resolution_report.py, an offline validator for replay-state JSON that recomputes payload_resolution_summary, rejects summary checksum drift, and fails when any payload reference is unresolved.
  • 2026-05-20 replay state report fetcher update: noetl/noetl#509 adds scripts/fetch_replay_state_report.py, a small API fetcher for /api/replay/state JSON with tenant/org scope, cutoffs, and optional resolve_payloads=true, so local kind and GKE validation can produce the report consumed by offline parity/payload gates.
  • 2026-05-20 replay validation runner update: noetl/noetl#510 adds scripts/run_replay_validation.py, a single local-kind/GKE smoke command that fetches replay state and runs the offline projection parity and payload-resolution gates, emitting a JSON run report with per-step command output.
  • 2026-05-20 replay upcaster registry injection update: noetl/noetl#511 makes ReplayService use a configurable or per-call EventUpcasterRegistry instead of hard-coding the process default. Validation tools and future storage adapters can now replay with an explicit schema registry and compare its digest without changing endpoint behavior.
  • 2026-05-20 replay time-cutoff snapshot update: noetl/noetl#512 lets the reference replay reader use replay-state snapshots for as_of_time requests when the snapshot version is at or before the resolved cutoff event id, avoiding unnecessary full-history folds while keeping the ReplayEventReader port storage-neutral.
  • 2026-05-20 replay snapshot registry guard update: noetl/noetl#513 makes ReplayService ignore snapshot seeds whose recorded upcaster registry digest does not match the registry used for the current fold. Snapshots remain accelerators only; mismatched schema reducers fall back to canonical events.
  • 2026-05-20 replay state-integrity gate update: noetl/noetl#514 adds scripts/check_replay_state_report.py and wires it into scripts/run_replay_validation.py, so smoke runs validate replay JSON structure, projection checksum drift, and snapshot metadata before comparing against live projection checksums.
  • 2026-05-20 replay state-checksum gate update: noetl/noetl#515 extends the state-integrity gate to recompute the replay state's top-level checksum, catching mutated state reports before projection parity or payload gates run.
  • 2026-05-20 replay parity checksum-shape update: noetl/noetl#516 makes scripts/check_replay_parity_report.py reject replay/live checksum bundles with non-lowercase-SHA-256 values by default, while keeping an explicit legacy escape hatch for ad-hoc reports.
  • 2026-05-20 replay state-envelope gate update: noetl/noetl#517 makes scripts/check_replay_state_report.py require canonical replay-state envelope fields and checksum_algorithm=sha256, so incomplete state reports fail before parity comparison.
  • 2026-05-20 replay payload checksum-shape update: noetl/noetl#518 makes scripts/check_replay_payload_resolution_report.py reject resolved payload checksums that are not lowercase SHA-256 hex digests, with ref/scope diagnostics for the failing payload.
  • 2026-05-20 replay registry digest-shape update: noetl/noetl#519 makes scripts/check_replay_state_report.py require upcaster_registry_digest and snapshot registry digests to be lowercase SHA-256 hex values before compatibility checks run.
  • 2026-05-20 replay state type/range gate update: noetl/noetl#520 makes scripts/check_replay_state_report.py validate canonical replay envelope string fields, non-negative integer fields, and snapshot version typing before checksum recomputation. Malformed reports now fail as structured validation output instead of crashing the gate.
  • 2026-05-20 replay payload row-shape gate update: noetl/noetl#521 makes scripts/check_replay_payload_resolution_report.py reject non-object payload_resolution rows and non-object row resolution values before checksum and summary validation, so malformed payload evidence cannot be silently filtered out.
  • 2026-05-20 replay strict integer-shape update: noetl/noetl#522 makes scripts/check_replay_state_report.py require replay state counters, event ids, and snapshot versions to be JSON integers instead of coercible strings, keeping the serialized replay contract independent of Python-specific type coercion.
  • 2026-05-20 replay parity required-surface update: noetl/noetl#523 makes scripts/check_replay_parity_report.py require the complete canonical projection checksum bundle (execution, stages, frames, commands, business_objects, loops) on both replayed and live inputs, and reject unknown extra surfaces.
  • 2026-05-20 replay state projection-surface update: noetl/noetl#524 makes scripts/check_replay_state_report.py require the same complete canonical projection_checksums surface set and reject unknown extra surfaces before parity or payload validation runs.
  • 2026-05-20 replay payload summary-shape update: noetl/noetl#525 makes scripts/check_replay_payload_resolution_report.py validate supplied payload_resolution_summary objects before checksum comparison, requiring the exact summary field set, strict non-negative integer counters, boolean all_resolved, and lowercase SHA-256 checksum shape.
  • 2026-05-20 replay validation manifest phase update: noetl/noetl#526 upgrades scripts/run_replay_validation.py from a gate launcher into a reproducible validation manifest writer. Runs now include config, UTC timestamps, per-step duration, parsed JSON stdout when available, optional --report-output, runner-level cutoff validation, and fetch-artifact existence checks; scripts/fetch_replay_state_report.py also rejects non-object replay JSON before writing gate input.
  • 2026-05-20 replay live checksum artifact phase update: noetl/noetl#527 adds scripts/build_live_projection_checksums.py so any storage adapter can export live projection rows as JSON and produce the canonical live checksum bundle. scripts/run_replay_validation.py now accepts --live-rows, builds live-checksums-<execution_id>.json, and runs parity from that artifact while rejecting simultaneous --live-checksums / --live-rows inputs.
  • 2026-05-20 replay validation manifest gate phase update: noetl/noetl#528 adds scripts/check_replay_validation_manifest.py so local-kind/GKE validation manifests are independently gateable. The manifest checker validates config shape, timestamps, required step order, step return codes/durations, optional artifact existence, and mutually exclusive live parity inputs; the replay release gate now covers this checker.
  • 2026-05-20 replay live-row export adapter phase update: noetl/noetl#529 adds scripts/export_live_projection_rows_postgres.py as the current reference adapter for producing the storage-neutral live-row JSON artifact from deployed clusters. The exporter uses plain reads against the current projection tables and replay-state projection record; the validation contract remains the JSON row artifact consumed by scripts/build_live_projection_checksums.py, so future storage engines can replace the adapter without changing replay parity gates.
  • 2026-05-20 replay live-row artifact gate phase update: noetl/noetl#530 makes the exported live-row evidence self-describing and independently checkable. Live-row artifacts now carry schema_version, exported_at, per-surface row_counts, and a canonical rows_checksum; scripts/check_live_projection_rows.py validates those fields before checksum generation, and scripts/run_replay_validation.py inserts a live_rows_integrity step before live_checksums.
  • 2026-05-20 replay validation artifact-index phase update: noetl/noetl#531 adds scripts/package_replay_validation_artifacts.py and run_replay_validation.py --artifact-index-output ... so local-kind/GKE evidence can be shipped as one storage-neutral index. The index records each validation artifact role, path, existence, size, and SHA-256 digest, and can be checked later for missing files or digest drift.
  • 2026-05-20 replay artifact-index manifest finalization update: noetl/noetl#532 makes run_replay_validation.py --artifact-index-output ... record the artifact_index step in the validation manifest before packaging. The index therefore hashes the final manifest content, including the artifact-index step and path, instead of an intermediate manifest.
  • 2026-05-20 replay artifact-index role gate update: noetl/noetl#533 makes scripts/package_replay_validation_artifacts.py --check validate the evidence role contract in addition to file existence, size, and SHA-256 drift. A portable validation bundle must contain exactly one manifest, replay, and report artifact, and live-row evidence must be paired with the derived live-checksum artifact when either role is present. scripts/check_replay_validation_manifest.py --check-artifacts now validates the referenced artifact index, rejects indexes that point at a different manifest, and requires the artifact_index step to be present and final when artifacts.artifact_index is set, so the manifest gate verifies the complete shipped evidence bundle.
  • 2026-05-20 replay artifact-index portability update: noetl/noetl#534 stores artifact-index paths relative to the artifact-index directory when possible and resolves relative paths from that directory during --check. The checker also validates the top-level path_base marker and manifest pointer. Validation bundles can now be copied and rechecked from a different working directory without rewriting paths; external artifacts outside the bundle remain absolute.
  • 2026-05-20 replay validation bundle checker update: noetl/noetl#535 adds scripts/check_replay_validation_bundle.py as the operator entrypoint for shipped local-kind/GKE evidence. It runs the manifest gate with artifact checks, validates the referenced artifact index, and rejects mismatches between an explicit --artifact-index argument and the manifest's own index reference.
  • 2026-05-20 projector shard-skip metric update: noetl/noetl#536 splits projector skip telemetry into noetl_projector_events_unowned_total for healthy other-shard traffic and noetl_projector_events_unshardable_total for events missing or carrying invalid shard keys. The existing empty/unowned notification counter remains for compatibility.
  • 2026-05-20 projector stale-write metric update: noetl/noetl#537 adds noetl_projector_projection_stale_records_total for projection writes skipped by version monotonicity. The counter is based on attempted tenant/org/execution projection groups, not raw events, so same-execution multi-event batches are not reported as stale.
  • 2026-05-20 projector shard settings validation update: noetl/noetl#538 makes projector workers fail fast when shard_id resolves to an index outside shard_count or runtime limits are non-positive. Bad StatefulSet/shard-count wiring now exits at startup instead of running a shard that owns no events.
  • 2026-05-20 projector strict settings loader update: noetl/noetl#539 removes env/CLI clamping for projector shard and runtime limits. Invalid values now flow into the same startup validation path instead of being silently repaired into defaults.
  • 2026-05-20 projector metrics identity label update: noetl/noetl#540 adds stable shard_index, shard_count, stream, and subject labels to projector metrics, alongside shard_id and consumer, so dashboards can verify the live shard topology they are scraping.
  • 2026-05-20 projector decode-error metric update: noetl/noetl#541 adds noetl_projector_decode_errors_total for projector notification payload decode failures. Malformed JSON/Arrow payloads are now visible in projector metrics before the subscriber NAKs the message.
  • 2026-05-20 projector projection-error metric update: noetl/noetl#542 adds noetl_projector_projection_errors_total for failures inside the projection callback while keeping noetl_projector_errors_total as the aggregate compatibility counter.
  • 2026-05-20 projector aggregate error metric update: noetl/noetl#543 makes noetl_projector_errors_total count both decode failures and projection callback failures, while the split counters continue to identify where the failure occurred.
  • 2026-05-20 projector redelivery metric update: noetl/noetl#544 adds subscriber action observation and projector counters for noetl_projector_redelivery_requests_total plus noetl_projector_delayed_redelivery_requests_total, making NAK/retry intent visible before JetStream redelivery.
  • 2026-05-20 projector terminal action metric update: noetl/noetl#545 adds projector ACK and TERM counters from the same subscriber action observer: noetl_projector_acknowledged_notifications_total and noetl_projector_terminated_notifications_total.
  • 2026-05-20 projector action/batch metric bundle update: noetl/noetl#546 adds terminal-action recency gauges, last redelivery delay, last-batch shape gauges for extracted/unowned/unshardable/stale counts, and an action-summary helper with ACK/redelivery/termination ratios.
  • 2026-05-20 projector metrics summary helper bundle: noetl/noetl#547 adds in-process summaries for last-batch shape, decode/projection error split, and the combined projector runtime state. Operators and validation tools can consume the same stable ratio payloads without scraping Prometheus text.
  • 2026-05-20 projector metrics summary endpoint bundle: noetl/noetl#548 exposes the summary payload at /summary on the projector metrics server, adds scripts/check_projector_metrics_summary.py, and wires that checker into the replay release gate. Phase 2 validation can now capture scrape-independent projector state evidence from live kind/GKE runs.
  • 2026-05-20 Phase 2 projector evidence bundle: noetl/noetl#549 adds scripts/fetch_projector_metrics_summary.py, extends scripts/run_replay_validation.py with projector summary artifacts, validates those artifacts in replay manifests, and adds scripts/check_projector_phase2_evidence.py as the phase gate for live projector evidence plus optional projection parity.
  • 2026-05-20 Phase 2 projector validation runner bundle: noetl/noetl#550 adds scripts/run_projector_phase2_validation.py as the operator entrypoint for replay + projector-summary evidence, lets bundle checks require Phase 2 projector evidence, and adds scripts/render_projector_phase2_command.py for reproducible StatefulSet projector validation commands.

0. Status — all seven phases done (2026-05-23 close-out)

The v2 distributed-runtime spec is complete across all seven phases (0–6). NoETL reached v2.100.2 across the closing session. The per-phase landing PRs:

PhaseTopicClosing PR(s)
0Instrumentation + stage/frame tables + replay API#435#550
1Frame-shaped cursor loops#585
2Projector StatefulSet behind NATS durable consumerspredates the close-out session
3Apache Arrow IPC Tier 1.5#587
4URN + KEDA + NATS supercluster#593 (URN) · #594 (KEDA) · #595 (NATS topology) · #596 (supercluster runtime fixes) · #597 (KEDA NATS-account fix)
5Port/adapter event/projection/payload#582#592 (rounds 1–5)
6Stage planner for fanout/reduce#588

The architectural shape the spec described is now operational: events flow through durable JetStream consumers; projectors are sharded StatefulSets; Arrow IPC Tier 1.5 is wired; KEDA scales worker pools off NATS consumer lag; the URN scheme encodes locality and supports per-segment subject derivation; PayloadStore has a Protocol + adapters for filesystem + S3 + GCS + Azure + SeaweedFS-via-S3; EventRecord.payload_ref accepts a typed PayloadReference and serializes through a canonical dict discriminator; a multi-cluster NATS supercluster generator emits gateway-meshed JetStream topologies.

Phase 4 — three rounds, all merged

  • Round 1, #593 — URN extension. KNOWN_RESOURCE_KINDS taxonomy, NoetlResourceLocator.to_nats_subject() / from_nats_subject(), locality() extraction, dataset_locator / stream_locator / partition_locator builders. WorkerLocatorParts gains region / zone; worker_locator() emits coarse-to-fine (tenant → org → region → zone → cluster → node → worker).
  • Round 2, #594 — KEDA scaler. ScaledObjectSpec + build_worker_scaledobject() + dump_scaledobject_yaml(). Sample manifest at ci/manifests/keda/scaledobject-worker-cpu-01.yaml (now in noetl/ops — see "Scope A/B" below). Live kind validation drove noetl-worker 1 → 12 → 1 under a 200-message lag burst.
  • Round 3, #595 + #596 — NATS supercluster. Multi-cluster topology generator + 2-cluster sample at ci/manifests/nats-supercluster/. Three live-validation fixes baked in (#596): unique server_name per pod via downward API, split /healthz endpoints (?js-server-only=true for liveness, ?js-enabled-only=true for readiness, plus a long-failureThreshold startupProbe), and publishNotReadyAddresses: true on the headless Service so gateway DNS doesn't deadlock on cluster startup.
  • Round 3 follow-up, #597 — KEDA NATS account fix. Live scale-up smoke caught that ScaledObjectSpec.nats_account defaulted to the global $G account, but NoETL's noetl user lives in the NOETL account — the scaler was silently reading 0 lag. Default fixed to "NOETL" with a pinning assertion. With the fix, the end-to-end scale-up loop validated: burst → TARGETS=200/10 (avg)replicas=481620 (cap) → drain → 1 via HPA stabilization.

Phase 5 — five rounds, port + adapters + envelope binding

  • Round 1, #582 — PayloadStore Protocol + FilesystemPayloadStore. Sharded layout (<root>/<sha[:2]>/<sha[2:4]>/<sha>), atomic writes via tempfile + fsync + os.replace, optional metadata sidecar.
  • Round 2, #589 — S3PayloadStore + parametrized compliance suite. Sync boto3 + asyncio.to_thread to keep moto compatibility; documented as the canonical cloud-adapter pattern (every subsequent adapter follows the same shape).
  • Round 3, #590 — GCSPayloadStore. Sync google-cloud-storage + unittest.mock tests (no in-process emulator equivalent of moto; fake-gcs-server is a process-based binary deferred to a future test-infra round).
  • Round 4, #591 — AzureBlobPayloadStore + SeaweedFS-via-S3 docs. Adds azure-storage-blob>=12.20.0 runtime dep.
  • Round 5, #592EventRecord.payload_ref typed binding. The payload_ref field accepts either a PayloadReference or the legacy dict[str, Any]; the envelope serializes through payload_ref_to_dict() to a canonical shape carrying a kind: "payload_store" discriminator. The replay_payload_ref_locator helper recognizes the discriminator. Closes Phase 5.

0.1 Post-closeout — manifest consolidation (Scopes A + B)

After the seven phases closed, two follow-up rounds consolidated the operational artifacts that had been drifting in parallel between noetl/noetl/ci/manifests/ and noetl/ops/ci/manifests/:

  • Scope A (noetl/ops#112 + noetl/noetl#598) — moved the Phase 4 KEDA + NATS supercluster sample manifests from noetl/noetl/ci/manifests/ to noetl/ops/ci/manifests/.
  • Scope B (noetl/ops#113 + noetl/noetl#599) — completed the move for the remaining 11 directories (analytics, clickhouse, gateway, jupyterlab, nats, noetl, postgres, qdrant, test-server, tshoot, etc.) and updated noetl/ops/automation/development/noetl.yaml to read from local ci/manifests/... paths. noetl/noetl now carries only a ci/MOVED.md breadcrumb where its ci/manifests/ used to be.
  • Scope B operator follow-ups (noetl/ops#114) — adds action=reset to the deploy playbook (clears NoETL namespaces + patches static PV claimRefs so the next deploy rebinds cleanly), adds retry/backoff around the test-server contract verifier's first /health call, and targets a Ready paginated-api pod by name to avoid exec-mid-rollout SIGKILL.

After Scopes A + B, noetl/ops/ci/manifests/ is the sole home for NoETL operational manifests. The Python generators (noetl/core/runtime/keda.py, noetl/core/runtime/nats_topology.py) stay in noetl/noetl; only the committed sample YAML moved.

0.2 Documentation conventions (for AI agents + humans)

"Where to look" for everything the v2-spec implementation touches:

What you needWhere it lives
Application codenoetl/noetl
Operational manifests, deployment playbooks, infra automationnoetl/ops
Design docs, feature specs (this doc)noetl/docs
Python API + DSL semantics referencenoetl/noetl wiki
Operational guides (install / verify / tuning)noetl/ops wiki
AI session state, handoffs, rulesnoetl/ai-meta

Wiki entry points for the v2 work:

Two-wiki convention codified in ai-meta/agents/rules/wiki-maintenance.md Rule 0: Python API / DSL / core architecture docs live in the noetl/noetl wiki; operational / deployment / infra docs live in the noetl/ops wiki; hybrid topics split across both with prominent cross-link callouts.

0.3 Local-kind validation evidence (2026-05-23)

After the Scope B consolidation merged, the full lifecycle was exercised end-to-end in a local kind cluster (kind-noetl, single control-plane node, podman backend):

  • Teardown + provision: namespaces deleted, then noetl run automation/development/noetl.yaml --runtime local --set action=deploy --set image_tag=2026-05-22-09-37 from repos/ops/ provisioned cleanly. (The "PV claimRef" + "test- server contract" friction was caught here and is fixed in noetl/ops#114.)
  • NoETL API: /api/health HTTP 200 in 2.8 ms. 415 playbooks listable from catalog (Postgres PV is Retain, so state survived the teardown).
  • DB connection footprint: 8 idle (server pool) + 5 idle (projector) + 2 idle (outbox-publisher) + 0 (workers — NATS- only by design) = 15 idle, matching the pre-Scope-B baseline of 11–12 within normal pool variance. No connection-count regression.
  • Test playbook latency:test/simple_python runs: 1.20s → 0.97s → 0.73s → 0.67s → 0.66s. Steady-state ~0.66s warm.
  • KEDA scale-up loop: 200-message lag burst against the paused noetl_worker_pool consumer drove HPA TARGETS=0/10200/10 and replicas=1481620 (cap) → drain → 1 via HPA stabilizationWindowSeconds.
  • Supercluster gateway mesh: with cluster_size=1 per cluster (CPU budget on single kind node), nats-cluster-a-0 reports cluster.name=a, gateway.outbound_gateways=['b'], inbound_gateways=['b'], JetStream domain=tenant_default_org_default_region_us_east_1_cluster_a — URN-derived, bidirectional.
  • Node resource pressure: post-validation 78% CPU reservation on a 4-vCPU podman VM. Adding more pods (e.g. full 3-replica supercluster + KEDA-driven worker scale-up) exceeds capacity; documented in ops/wiki/manifests-nats-supercluster under "Resource footprint".

1. Why this revision exists

The PFT v2 workload (fixtures/playbooks/pft_flow_test/test_pft_flow_v2.yaml) is the canonical stress test for the distributed runtime. It exercises:

  • 10 facilities × 1000 patients × (assessments × 4 pages + conditions × 3 + medications × 3 + vital signs + demographics + MDS detail fan-out) ≈ 120k HTTP requests + ~26k claim cycles per execution.
  • A successful 2026-05-15 local-kind run completed in 3h 54m 21s, wrote 26,199 commands to noetl.event with event_type='command.*', and required two automatic recoveries by the in-process command reaper.

Even after the cursor-loop refactor (noetl_cursor_loop_design.md) eliminated per-iteration collection materialization, three structural costs remain:

CostWhere it shows upOrder of magnitude in PFT v2
Server-side per-claim coordinationclaim_next_loop_indices and _issue_cursor_loop_commands in repos/noetl/noetl/core/dsl/engine/executor/one HTTP claim round-trip per cursor row
Per-fragment event amplificationeach worker claim emits command.issued / claimed / started / call.done / step.exit / command.completed6× events × ~26k fragments ≈ 150k event rows
Server-centric projectiononly the server folds events into projection statesingle writer to projection store; saturates Postgres pool under MDS bursts

The 2026-05-15 GKE amber run (memory/archive/2026/05/20260515-193100-gke-runtime-reaper-pft-v2-amber.md) saw both the NoETL API DB pool and NATS get into pressure during facility-1 MDS, confirming the central path is the bottleneck.

This spec proposes the next reduction step, but with one correction to the mental model: the event log is not an implementation detail or a backend adapter concern. It is the temporal kernel of the NoETL business operating system. Frames, Arrow IPC, distributed caches, local indexes, streaming materialized views, analytical projections, and object storage are accelerators around that kernel. They must never become the only source of state.

That means every durable state transition must be representable as an append-only event with a deterministic serialization contract. Projections, caches, materialized views, local shared memory regions, analytical tables, streaming state, and distributed indexes are all rebuildable derived state. Given an execution id, tenant id, and timestamp or event position, NoETL must be able to replay the canonical event stream and reconstruct the system's command state, frame state, loop progress, projection outputs, and payload references as they existed then.

The implementation priorities are:

  1. Event-sourced state reproduction. noetl.event remains the canonical append-only ledger. Every frame, lease, cursor, payload reference, projection checkpoint, and tenant-visible business state transition is replayable from event data plus immutable payload objects.
  2. Shared memory as a core advantage. The runtime uses a tiered shared-state fabric: in-process memory, node-local Arrow IPC, local disk/NVMe, NATS K/V for small distributed coordination state, and durable object storage. This fabric makes colocated execution fast without weakening replay.
  3. Worker-side batched loop execution. Workers claim and execute frames (multi-row windows) instead of single cursor rows. Per-fragment events drop by a factor equal to the frame size.
  4. Decentralized projection. Projection workers become pluggable, sharded, and horizontally scalable, with NATS consumer groups for fan-out and a stable identity scheme so they can re-attach to their shard after restart.
  5. Cloud-agnostic event / payload / projection store abstractions. Port/adapter architecture for NATS JetStream / Kafka / Pub/Sub / Event Hubs / Kinesis; for S3 / GCS / Azure Blob / SeaweedFS; and for operational, analytical, search, vector, and streaming projection backends.

The end state is a distributed business operating system in the NoETL sense: every tenant, organization, execution, compute actor, queue, stream, cache object, projection, and payload is addressable via a unified resource locator; workers scale on real backlog signals; loops collapse to map-reduce style stages whose data plane uses shared memory whenever colocated and durable storage otherwise; and auditors/operators can reproduce the state of the organization at a given event position.


2. Non-goals

  • This is not a rewrite of the NoETL DSL. Existing next.arcs[], set:, mode: semantics stay.
  • This does not replace Postgres as the default projection store. Postgres remains the reference; new backends are opt-in.
  • This is not a new Arrow distribution mechanism. We use pyarrow and arrow-rs as-is.
  • This is not a green-field event sourcing platform. We keep noetl.event as the canonical source of truth; new layers wrap, distribute, cache, mirror, and project it, but they do not replace it.

3. Architecture overview

┌─────────────────────────────────────────────┐
│ Server (planner) │
│ - schedules stages, not iterations │
│ - mints frame leases on each loop │
│ - exposes claim/heartbeat/commit endpoints │
└────────────┬──────────────────┬─────────────┘
│ control events │ frame leases
▼ ▼
┌─────────────────────────┐ ┌──────────────────────────┐
│ Event Store (Tier A) │ │ Projection Store (B) │
│ NATS JS / Kafka / │ │ Postgres / DynamoDB / │
│ Pub/Sub / Event Hubs / │ │ Firestore / Cassandra / │
│ Kinesis │ │ analytical / search / │
│ │ │ operational backends │
│ (append-only ledger + │ │ (derived, replayable │
│ stream adapters) │ │ state) │
└──────────┬──────────────┘ └────────────┬─────────────┘
│ subscribe │ project
▼ ▼
┌─────────────────────────────────────────────┐
│ Workers (runners) │
│ - claim a frame (N rows / N seconds / │
│ bounded memory) │
│ - run inner DSL block in-process │
│ - accumulate results in Arrow RecordBatch │
│ - flush to IPC + Tier 3 + emit one event │
│ - heartbeat lease; on crash, frame is │
│ handed off via cursor checkpoint │
└────────────┬──────────────────┬─────────────┘
│ │
▼ ▼
┌─────────────────────────┐ ┌──────────────────────────┐
│ Tier 1.5 — Arrow IPC │ │ Tier 3 — Payload Store │
│ shared memory │ │ S3 / GCS / Azure Blob / │
│ (same host only) │ │ SeaweedFS (durable, │
│ │ │ content-addressed) │
└─────────────────────────┘ └──────────────────────────┘

Three core moves vs today:

  • Event stream as the kernel. Append-only events plus immutable payload objects are the durable operating-system journal. Everything else is an index, cache, materialized view, or delivery stream derived from that journal.
  • Stage-shaped scheduling. The server no longer thinks in cursor rows; it thinks in frames. A frame is a unit of work a worker can execute end-to-end, of bounded duration and bounded result size, and is independently recoverable.
  • Worker as the loop interpreter. The inner DSL block of a loop step is interpreted inside the worker that owns the frame, not via N round-trips to the server.
  • Data plane separate from control plane. Frame outputs flow as Arrow record batches over shared memory when possible and as content-addressed objects in Tier 3 always. The event store only ever carries lightweight manifests.

3.1 Event-sourced state contract

NoETL has one canonical state rule:

If it must be reproduced, audited, routed, billed, retried, or explained later, it must be represented by an event and any referenced immutable payload objects.

The event stream is the durable timeline for a multitenant organization. Projections are allowed to be fast, denormalized, and backend-specific, but they must be disposable. A projection rebuild from (tenant_id, organization_id, execution_id, from_position, to_position) must produce the same state as the original run, modulo explicitly declared non-deterministic external side effects.

Required event envelope fields:

{
"event_id": "snowflake-or-ulid",
"tenant_id": "tenant-123",
"organization_id": "org-456",
"execution_id": "987654321",
"stream_id": "execution/987654321/stage/fetch_patients",
"aggregate_id": "frame/123",
"aggregate_type": "frame",
"event_type": "frame.committed",
"schema_name": "noetl.frame.committed",
"schema_version": 1,
"event_time": "2026-05-16T12:34:56.789Z",
"ingest_time": "2026-05-16T12:34:56.812Z",
"producer": "noetl://tenant/tenant-123/org/org-456/cluster/prod-1/.../worker/cpu-01",
"causation_id": "event-that-caused-this-event",
"correlation_id": "execution-or-business-flow-id",
"idempotency_key": "tenant/org/execution/frame/event-kind/attempt",
"payload_ref": {
"uri": "noetl://tenant/tenant-123/org/org-456/payloads/sha256/...",
"sha256": "...",
"media_type": "application/vnd.apache.arrow.stream"
},
"meta": {}
}

Envelope invariants:

  • tenant_id and organization_id are mandatory on every event and payload reference. They are the isolation boundary for routing, replay, retention, encryption, and billing.
  • event_id is globally unique and monotonic enough for local ordering, but replay correctness depends on (stream_id, stream_version) or backend position, not wall-clock time.
  • expected_version is enforced per aggregate/stream where the backend supports it. Where the transport cannot enforce it directly, a side index records stream versions.
  • idempotency_key is mandatory for externally retried transitions. Duplicate delivery may occur; duplicate durable effects must not.
  • payload_ref points to immutable content. Events never point to mutable cache paths as their only copy.

3.2 Time-travel and replay

The replay API is a first-class implementation target, not a diagnostic afterthought:

GET /api/replay/state
query:
tenant_id
organization_id
execution_id
as_of_event_id | as_of_position | as_of_time
projection = execution | frame | loop | business_object | all

Implementation status:

  • GET /api/replay/state shipped in Phase 0 (noetl/noetl#435).
  • Phase 0 folds execution, frame, stage, loop, and payload-reference lineage from noetl.event.
  • The endpoint returns a deterministic sha256 checksum over the folded state.
  • The live reader tolerates pre-migration event tables during rolling rollout. If the additive tenant/org envelope columns are not present yet, replay treats only tenant_id=default and organization_id=default as visible legacy events.
  • A core replay upcaster registry now exists in Phase 0 (noetl.core.replay.EventUpcasterRegistry) and is wired into ReplayService.replay_state. The default registry preserves legacy events as schema_name=noetl.event, schema_version=1.
  • ReplayService can use either its configured registry or a per-call upcaster_registry, keeping schema migration validation storage-neutral and deterministic without mutating process-global state.
  • Replay state includes upcaster_registry_digest, a stable SHA-256 digest of the registered (schema_name, from_version) transitions used for the fold.
  • A golden replay corpus now lives under tests/fixtures/replay/ in repos/noetl and asserts a stable fold checksum for execution, command, business-object, stage, frame, payload-reference, and registry-digest state.
  • Event-position snapshot selection now exists for projection_snapshot rows with aggregate_type=replay_state and aggregate_id=execution/<execution_id>/<projection>. Replay can seed from the snapshot, skip earlier events, and fold the remaining stream.
  • Snapshot seeds are accepted only when their recorded upcaster registry digest matches the registry used for the current replay fold.
  • Time-based snapshot selection now resolves the time cutoff to an event position before selecting a safe snapshot. Payload resolution, registered production upcasters for future schema revisions, and domain-specific business-object reducers remain tracked by noetl/noetl#440.

Replay reconstructs state by:

  1. Loading the latest validated snapshot at or before the requested position.
  2. Reading canonical events from that snapshot position through the requested cutoff.
  3. Resolving immutable payload references through the payload store.
  4. Applying schema upcasters in deterministic order. The Phase 0 registry enforces monotonic version advancement so an upcaster cannot silently rewrite an event without moving the schema version forward.
  5. Folding events with the same projection code used by live projectors.

Snapshots are performance accelerators. They are never the authority. A snapshot must record:

  • event position covered;
  • projection code version;
  • schema upcaster versions and registry digest, matching the replay state's upcaster_registry_digest;
  • payload digest set or Merkle root;
  • tenant encryption context;
  • deterministic fold checksum.

Replay verification becomes a release gate for every phase after Phase 0: run live execution, rebuild projections from events, and compare checksums for execution state, frame state, loop progress, and configured business projections.

The first golden checksum gate is intentionally small. It does not replace live PFT replay parity; it gives every future serializer/upcaster/projection change a cheap deterministic test that fails before a full cluster run.

Implementation status:

  • noetl/noetl#481 adds the local replay/serialization release gate at scripts/check_replay_release_gate.sh. The gate runs replay golden corpus checksums, upcaster behavior, replay route folding, projector folding/metrics, outbox publishing, command/broker/executor outbox coverage, frame event serialization, Arrow Feather serialization, and IPC durability/cache tests. Use PYTHON_BIN=/path/to/python scripts/check_replay_release_gate.sh when the default .venv/bin/python is not available.

3.3 Visual component diagram

The diagram below maps all named components to their roles, storage tiers, and primary data flows. Solid arrows are runtime data paths; dashed arrows are control or fallback paths.

Key observations from the diagram:

  • Event Kernel is the single append authority. The canonical log is the only target for durable writes. The distribution stream mirrors it for low-latency fan-out but carries no replay authority.
  • Payload Store and Canonical Log are the correctness pair. Losing either breaks replay. Every other tier and store can be dropped and rebuilt from these two.
  • Shared-state fabric follows a strict miss chain. Tier 1 → 1.5 → 2 → Tier 3 (Payload Store). Writers always write Tier 3 first; higher tiers are best-effort and admission-controlled.
  • Projectors are the sole writers to projection stores. After Phase 2, the server never writes projection state directly.
  • Port/Adapter boundaries (labeled on subgraphs) mark where backend substitutions happen at deploy time; the runtime image does not change.

4. Workload baseline (instrument before refactor)

Before any code change ships, baseline the PFT v2 run with telemetry the team can reason from. This is the metric set every later phase is judged against.

The PFT fixture API must not become the benchmark bottleneck. The paginated-api test server enforces the PFT endpoint request limit in each process, so local kind and GKE deployments should run at least three replicas for PFT v2 validation. That gives enough pod-level capacity for the target 30-50 requests/second workload while preserving the fixture's per-process 50 requests/second contract and keeping 429s meaningful when a single worker bursts too hard.

The playbook's pft_http_concurrency variable documents the client-side pressure target. loop.spec.max_in_flight now accepts either a positive integer or a renderable expression, with the rendered value re-validated as a positive integer before dispatch. PFT v2 cursor loops should use that workload variable directly so catalog registration and runtime scheduling share one source of truth. Each PFT v2 cursor loop must also opt into loop.spec.frame with max_rows: 50 so the stage/frame runtime is exercised. A run that leaves noetl.stage and noetl.frame empty is still on the legacy cursor-command path and is not a valid Phase 1 benchmark.

Per execution, capture:

MetricHowToday's value (PFT v2 GKE 2026-05-15)
total command.* event countSELECT count(*) FROM noetl.event WHERE execution_id = $1 AND event_type LIKE 'command.%'≈ 26k × 6 ≈ 150k
frame countsum of cursor claims that returned > 0 rows5,498 in local kind frame-mode run 629145120213828019; older row-shaped baselines were ~5.5k committed frames but still ran one task pipeline per claimed row
mean rows per frametotal rows / frame count49.92 in frame-mode run 629145120213828019
server CPU on /claim hot pathrequest-log percentiles from gatewaydominated by GKE pool pressure
Postgres pool depth high-watermarkpg_stat_activity pollhit 50 waiters
NATS reschedule eventskubectl get events -n nats1 during facility-1 MDS
payload bytes written to Tier 3TempStore counternot currently instrumented
projection store write ratecounter on mark_step_completedsingle writer
execution wall timenoetl.execution.end_time - start_time279.785s local kind frame-mode run; earlier local kind default-row-frame baseline was 1880.739s

Target after Phases 1–3:

MetricTargetMechanism
total command.* event count÷10frame-shaped claims, mean rows/frame ≥ 50
server /claim requests per execution÷50one claim per frame
Postgres pool depth high-watermark< 20 sustainedclaim path narrower + projection sharded
Tier 3 bytes writtenunchangeddata goes to Tier 3 either way
Tier 1.5 cache hit ratio> 60% on colocated consumersnew metric, see Phase 3
execution wall time÷2parallelism + reduced coordination

A separate dashboard tile per metric is mandatory before merging any phase that claims an improvement.


5. Control plane: stage and frame model

5.1 New tables (additive, no breaking change)

-- Stage describes a unit of orchestration the planner cares about.
-- One stage per loop step or per fan-out step. Tool steps keep using
-- the existing noetl.command path.
CREATE TABLE IF NOT EXISTS noetl.stage (
stage_id BIGINT PRIMARY KEY, -- snowflake id
execution_id BIGINT NOT NULL REFERENCES noetl.execution(execution_id),
parent_event_id BIGINT REFERENCES noetl.event(event_id),
parent_stage_id BIGINT REFERENCES noetl.stage(stage_id),
loop_event_id TEXT,
opened_event_id BIGINT,
closed_event_id BIGINT,
kind TEXT NOT NULL CHECK (kind IN ('loop','fanout','reduce')),
step_name TEXT NOT NULL,
dsl_ref TEXT NOT NULL, -- pointer to playbook step
status TEXT NOT NULL DEFAULT 'OPEN',
frame_policy JSONB NOT NULL, -- size, time, memory bounds
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ended_at TIMESTAMPTZ
);

-- Frame is a worker-claimable window of work inside a stage.
CREATE TABLE IF NOT EXISTS noetl.frame (
frame_id BIGINT PRIMARY KEY, -- snowflake id
stage_id BIGINT NOT NULL REFERENCES noetl.stage(stage_id),
execution_id BIGINT NOT NULL REFERENCES noetl.execution(execution_id),
parent_frame_id BIGINT REFERENCES noetl.frame(frame_id),
command_id BIGINT,
claimed_event_id BIGINT,
terminal_event_id BIGINT,
cursor JSONB NOT NULL, -- driver-specific resume hint
row_count INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'PENDING',
owner_worker TEXT,
lease_until TIMESTAMPTZ,
output_ref JSONB, -- {tier3_sha, ipc_handle?}
events_emitted INTEGER NOT NULL DEFAULT 0,
attempts INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS frame_open_idx
ON noetl.frame (stage_id, status, lease_until)
WHERE status IN ('PENDING','CLAIMED','RUNNING');

CREATE INDEX IF NOT EXISTS idx_stage_execution_step_loop
ON noetl.stage (execution_id, step_name, loop_event_id)
WHERE loop_event_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_frame_stage_frame
ON noetl.frame (stage_id, frame_id);
CREATE INDEX IF NOT EXISTS idx_frame_command
ON noetl.frame (command_id)
WHERE command_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_command_stage_worker_slot
ON noetl.command (execution_id, stage_id, ((meta->>'worker_slot_id')))
WHERE stage_id IS NOT NULL AND meta ? 'worker_slot_id';
CREATE INDEX IF NOT EXISTS idx_event_exec_stage_event_id_desc
ON noetl.event (execution_id, stage_id, event_id DESC)
WHERE stage_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_event_exec_frame_event_id_desc
ON noetl.event (execution_id, frame_id, event_id DESC)
WHERE frame_id IS NOT NULL;

These coexist with noetl.event and noetl.command. Legacy tool steps are not migrated.

5.1.1 Replay lineage graph

The stage/frame tables are operational projections, but their keys must make replay and audit cheap without scraping JSON. The current implementation uses this graph:

execution
└─ stage
├─ parent_stage_id -> parent stage for nested/fan-out chains
├─ loop_event_id -> loop epoch that opened the stage
├─ opened_event_id -> noetl.event(stage.opened)
├─ closed_event_id -> noetl.event(stage.closed)
├─ command.stage_id -> worker commands issued for the stage
└─ frame
├─ parent_frame_id -> upstream frame for derived/reduce work
├─ command_id -> noetl.command that owns the worker slot
├─ claimed_event_id -> noetl.event(frame.dispatched)
└─ terminal_event_id-> noetl.event(frame.committed|frame.failed)

noetl.event also carries direct stage_id and frame_id columns for stage/frame lifecycle events and command events where the producer knows the projection key. Those columns are indexed for replay scans by (execution_id, stage_id, event_id DESC) and (execution_id, frame_id, event_id DESC). noetl.command carries stage_id and frame_id; the cursor-worker path resolves frame.command_id from the command context and has a server-side fallback indexed by (execution_id, stage_id, meta->>'worker_slot_id').

parent_stage_id and parent_frame_id are the explicit traversal edges for runtime projections. Cursor loops currently use loop_event_id as the direct stage correlation key and mint one root frame per stage; subsequent frames in that stage carry parent_frame_id to form a deterministic stage-local chain. Fan-out/reduce will extend the same parent ids to preserve the execution tree without replaying every event payload.

5.2 Frame policy

frame_policy is the configurable bound that decides how much work a single frame contains. Phase 0 adds a typed runtime model as FramePolicy and accepts it on cursor loops as loop.spec.frame (or loop.spec.frame_policy internally). Defaults preserve today's one-row cursor behavior until a playbook opts into batching.

loop:
cursor: ...
spec:
mode: cursor
max_in_flight: 100 # worker slots
frame:
max_rows: 50 # max rows per frame
max_seconds: 30 # max wall-time per frame
max_bytes: 67108864 # max in-flight serialized output bytes
lease_seconds: 120 # initial lease window
heartbeat_seconds: 30 # expected heartbeat cadence
process: frame # row = run body once per row; frame = run body once per frame
retry_mode: whole_frame
max_attempts: 3

For PFT v2, the validated opt-in frame is { max_rows: 50, max_seconds: 30, max_bytes: 64MB, lease_seconds: 120, heartbeat_seconds: 30, process: frame }. process: row remains the default for backwards-compatible semantics: the worker claims a frame but runs the declared task pipeline once per row. process: frame exposes frame.rows, frame.row_count, frame.index, and iter.<iterator>_rows to the task pipeline and runs it once per claimed frame. That is the mode that produced the 2026-05-18 full PFT v2 local kind validations in ~4m30s to ~5m40s.

row_concurrency remains available as an opt-in tuning knob for process: row, but the 2026-05-17 local kind validation showed row_concurrency: 4 regressed the full PFT v2 run from ~31m21s to ~33m23s. Leave it at the default 1 unless a workload explicitly needs concurrent row execution inside one frame.

retry_mode: whole_frame is the only supported Phase 1 recovery mode. If a worker crashes or its lease expires before terminal commit, reclaim emits frame.abandoned and then a new frame.dispatched attempt for the same frame boundary. Row-split retry is intentionally not supported yet: splitting a failed frame would create a second event-sourcing boundary and requires a future subframe model. A task failure that reaches terminal commit emits frame.failed with the frame cursor, output reference summary, row count, events_emitted, and recovery policy so an operator can audit the lost-work unit and decide whether to requeue at the business cursor layer.

Implementation status:

  • FramePolicy now exists in noetl.core.dsl.engine.models.workflow.
  • Cursor loop dispatch includes the resolved frame policy in both cursor_worker tool config and command metadata.
  • The cursor_worker runtime passes __frame_max_rows and __frame_policy into cursor claim rendering, then asks the driver for a frame-sized row window. Drivers without a batched path fall back to repeated single-row claims.
  • The Postgres cursor driver now implements claim_many(...). If the playbook claim SQL uses LIMIT {{ __frame_max_rows | default(1) | int }}, one database round-trip claims a whole frame instead of one row. PFT v2 uses this form for all patient-domain and MDS-detail cursor queues.
  • The cursor_worker runtime supports both row-mode and frame-mode execution. Row mode runs the existing task pipeline once per claimed row in-process and returns frame metadata in the terminal command result. Frame mode runs the task pipeline once per claimed frame with frame.rows in context, so playbooks can batch HTTP, database writes, and reducer-style work while preserving one replayable frame boundary.
  • frame.row_concurrency is an opt-in bound for processing rows within one frame concurrently in row mode; the default remains 1 to preserve legacy ordering and side-effect behavior.
  • Cursor frame commits now carry profiling metadata under frame.cursor.metrics: row outcome counts, row concurrency, total task/render/tool milliseconds, and rollups by task kind/name. This is the release-gate instrumentation for deciding whether the next PFT improvement should target HTTP calls, Postgres saves, serialization, or control-plane commits.
  • Claimed frame rows are serialized as Apache Arrow IPC stream bytes through TempStore.put_ipc_bytes. The command result carries a durable rows_ref with schema digest, row count, media type, and optional Tier 1.5 IPC hint. Default max_rows=1 preserves existing one-row behavior unless a playbook opts into batching.
  • Frame commit validation now treats IPC metadata as a cache-only hint: if output_ref contains ipc, the same envelope must also contain a durable ref, uri, or locator, including nested rows_ref shapes. IPC-only frame events are rejected because they cannot be replayed once shared memory is swept.

5.3 Claim / heartbeat / commit API

New endpoints on the NoETL server, additive to the existing /api/commands/*:

POST /api/stages/{stage_id}/frames/claim
body: { worker_id, command_id?, requested_count, lease_seconds, cursor, frame_policy, locality? }
returns: [ { frame_id, cursor, lease_until, dsl_ref, frame_policy } ... ]

POST /api/frames/{frame_id}/heartbeat
body: { worker_id, lease_seconds, status }
returns: { lease_until }

POST /api/frames/{frame_id}/commit
body: { worker_id, cursor, output_ref, row_count, status, events_emitted }
returns: { ok, next_action } # may immediately hand the worker the next frame

Implementation status:

  • The frame control-plane endpoints are now present in repos/noetl behind the main API router:
    • POST /api/stages/{stage_id}/frames/claim
    • POST /api/frames/{frame_id}/heartbeat
    • POST /api/frames/{frame_id}/commit
  • Phase 1 starts with explicit replayable frame leases. Claim either takes a pending/expired frame or lazily creates a frame row for the requested stage.
  • Each lifecycle transition emits a canonical frame event:
    • frame.dispatched
    • frame.started
    • frame.heartbeat
    • frame.abandoned
    • frame.committed
    • frame.failed
  • Frame lifecycle events now populate stream_version and envelope_checksum, so replay can order stage-local frame transitions and detect envelope drift with the same checksum contract as the event-store port. Frame telemetry streams use the Snowflake event id as a sparse monotonic stream_version; this removes the former per-frame stream advisory lock and max(stream_version)+1 scan from heartbeat and commit writes.
  • Frame claims now persist frame.command_id from the worker command context. If an older worker omits it, the server resolves it from (execution_id, stage_id, worker_slot_id) using the indexed command metadata path.
  • Lazily minted runtime frames use the cursor claim key (stage_id, cursor.worker_slot_id, cursor.frame_index) as their idempotency boundary. The hardening patch in noetl/noetl#453 replaces the remaining stage-scoped frame-mint advisory lock with a partial unique index and conflict reload, keeping duplicate retries deterministic without serializing frame claimers.
  • Cursor workers now send a RUNNING heartbeat immediately after a successful frame claim and before frame execution. The heartbeat endpoint records the first CLAIMED -> RUNNING transition as frame.started; later RUNNING lease extensions are recorded as frame.heartbeat. This keeps the normal replay sequence clear: frame.dispatched -> frame.started -> frame.heartbeat* -> frame.committed|frame.failed.
  • When a worker reclaims an expired CLAIMED / RUNNING frame, the server emits frame.abandoned before the new frame.dispatched event. Replay therefore sees the recovery chain as append-only history rather than inferring it from a mutable row overwrite.
  • Recovery metadata is persisted on frame.abandoned and frame.dispatched: retry_mode=whole_frame, row_split_retry=false, max_attempts, previous owner, reclaimer, prior lease, and previous attempt count. This is enough to audit whether a frame was retried as a whole unit and whether operator intervention is needed after repeated attempts.
  • Duplicate or late terminal commits are rejected with 409 frame_already_terminal and the existing terminal_event_id, so a retrying worker cannot emit a second frame.committed / frame.failed event for the same frame. Heartbeats against terminal frames receive the same conflict response.
  • Worker-side cursor integration uses the existing cursor_worker path with frame policy and batched driver claims. The remaining Phase 1 work is validation, telemetry, and removing any residual per-row server coordination from non-PFT cursor shapes.

The HTTP surface is the operational fallback. The primary path uses NATS JetStream pull consumers (see §6) for lower-latency claim and built-in lease semantics.

5.4 Server's narrower role

After the refactor:

  • For a loop step the server emits one stage.opened event, mints frames lazily as workers ask, and emits one stage.closed when all frames commit.
  • The server does not touch per-row state. Cursors are opaque; the server only records the latest committed cursor per frame.
  • The server does not issue per-row command.issued events. Frame claims are observable via stage and frame rows plus a single frame.dispatched event per claim.

The current command_reaper repurposes to frame reaper: it scans noetl.frame for lease_until < now() AND status IN ('CLAIMED','RUNNING'), marks the lease abandoned, and republishes the frame. Same correctness guarantees; smaller scan surface.


6. Data plane: Arrow IPC zero-copy (Tier 1.5)

Tier 1.5 is part of a broader shared-state fabric, not a one-off optimization. The design target is a proven distributed-data pattern: keep durable state in an append-only log and immutable payload objects; keep hot execution state in rebuildable shared caches, indexes, and materialized views.

LayerBackendsPurposeRebuildable from events?
Canonical lognoetl.event Postgres partitions; optional mirrored JetStream/Kafka/Pub/Sub/Event Hubs/Kinesis streamsDurable timeline and replay authoritySource
Immutable payloadsS3 / GCS / Azure Blob / SeaweedFS / local durable storeLarge event data, Arrow batches, filesReferenced by log
Hot shared memoryin-process LRU + Arrow IPC shm/memfdSame-process and same-node zero-copy readsYes
Warm node cachelocal NVMe/PVC disk cacheReuse payload blocks after restart or rescheduleYes
Small distributed cacheNATS K/VLease hints, loop counters, small coordination stateYes
Streaming materializationsource/table/materialized-view/sink engine with barriersIncremental state over event streams and work queuesYes
Analytical materializationcolumnar analytical projection storeHigh-volume queryable facts, metrics, audit/event lake viewsYes

Only the first two layers are required for correctness. The rest are latency, throughput, and serving-shape advantages. This is the core product advantage: NoETL can run like a low-latency distributed shared-memory system while remaining reproducible from an append-only event log.

6.1 Where it fits in TempStore

The existing repos/noetl/noetl/core/storage/result_store.py already tiers payloads as MEMORY → KV → DISK → S3/GCS/DB. We insert a new tier between MEMORY and DISK:

TierMechanismScopeLifetime
1in-process LRU (existing)per processconfigurable bytes
1.5Apache Arrow IPC over POSIX shm / memfdper host (all processes on the node)frame lease + 30s grace
2local NVMe disk cache (existing)per nodeconfigurable GB
3S3 / GCS / Azure Blob / SeaweedFS (existing)global, content-addressedretention policy

Tier 1.5 is the zero-copy hop for colocated workers. Tier 3 is always written; Tier 1.5 is best-effort fast path.

6.2 Format

Workers materialize loop results as pyarrow.RecordBatch (Python) / arrow_array::RecordBatch (Rust). The on-disk and on-wire format is Arrow IPC stream. The shared memory carrier is one of:

  • POSIX shm_open + mmap (Linux/macOS) — simplest, ubiquitous.
  • memfd_create (Linux only) — preferred where available; no path collisions, anonymous file descriptor passed via SCM_RIGHTS over a Unix domain socket from a node-local broker.

For cross-runtime parity (Python ↔ Rust), the SHM region carries:

  1. A 16-byte header: magic NOETLIPC, format version u32, payload length u64.
  2. The Arrow IPC stream bytes.

This is intentionally simpler than the Plasma object store: NoETL workers are co-tenants of a single Kubernetes pod (one process per container today; if we ever go multi-process per pod we add a small node-local broker — see §6.5). No Plasma client/server fan-out, no shared catalog.

6.3 Reference shape

PayloadReference (existing in result_store.py) gains optional IPC metadata:

@dataclass
class PayloadReference:
tier3_uri: str # noetl://tenant/<tenant>/org/<org>/payloads/sha256/<sha256>
sha256: str
media_type: str # "application/vnd.apache.arrow.stream"
rows: int
bytes: int

# Tier 1.5 fast-path hint, may be None or stale
ipc: Optional[IpcHint] = None

@dataclass
class IpcHint:
node_id: str # noetl://tenant/<tenant>/org/<org>/cluster/<id>/node/<id>
shm_name: str # /noetl-<execution>-<frame>-<seq>
schema_digest: str # quick sanity check before attach
valid_until: datetime # writer-promised minimum lifetime

Implementation status:

  • repos/noetl now exposes IpcHint on ResultRef / TempRef.
  • ResultRefMeta records Arrow/replay-relevant payload metadata: media_type, schema_digest, and row_count.
  • IpcHint is explicitly best-effort. It carries shm_name, schema_digest, byte_length, optional row_count, producer, node_id, lease_expires_at, and media type. NOETL_NODE_ID is populated from Kubernetes spec.nodeName in the worker/projector/outbox publisher pods when deployed through Helm.
  • ArrowIpcSharedMemoryCache now provides the first Tier 1.5 implementation surface in noetl.core.storage.ipc_cache: budget enforcement, POSIX shared-memory allocation, IpcHint creation, read/attach, delete, and expired-lease sweep.
  • TempStore.put_ipc_bytes / get_ipc_bytes now provide an explicit raw Arrow IPC path: durable bytes are always written, IPC admission is optional, and reads fall back to the durable tier when the shared-memory hint is expired or missing.
  • TempStore.ipc_stats() exposes local counters for admission attempts/success/failures, read attempts/hits/misses, and durable fallback reads.
  • /metrics now exports the server process's TempStore IPC counters as noetl_storage_ipc_* Prometheus metrics, plus noetl_storage_ipc_read_hit_ratio.
  • Worker processes can expose their own TempStore IPC counters with NOETL_WORKER_METRICS_PORT. The endpoint is intentionally lightweight (/health, /metrics), suppresses scrape access logs, and labels counters by worker_id, worker_pool, and runtime so pod-level cache admission/read behavior is visible independently from server metrics. The Helm chart publishes this as the noetl-worker-metrics headless service by default.
  • noetl.core.storage.arrow_ipc now provides the runtime serialization primitive: rows_to_arrow_ipc and arrow_ipc_to_rows. Schema digests are computed from Arrow's serialized schema, not from row values, so replay/projector code can compare logical frame shape independently from payload content.
  • cursor_worker now writes multi-row frame captures through this path when frame_policy.max_rows > 1 or NOETL_CURSOR_FRAME_CAPTURE_ENABLED=true. NOETL_CURSOR_FRAME_IPC_ENABLED=false disables only the same-node IPC admission; the durable payload write remains authoritative.
  • TempStore.resolve(result_ref) is Arrow-aware for application/vnd.apache.arrow.stream references: it attempts the same-node IPC hint first, falls back to durable bytes, then decodes to row dictionaries only at the consumer boundary. Generic JSON result refs continue through the existing JSON resolver.
  • The durable ResultRef remains authoritative; consumers must treat expired, missing, or foreign-node IPC hints as cache misses and fall back through durable storage.
  • Worker-side metrics aggregation into dashboards and colocated worker/projector validation remains tracked by noetl/noetl#438.

6.4 Producer / consumer protocol

Producer (any worker that emits a record batch):

  1. Serialize batch to Arrow IPC stream bytes.
  2. Write to Tier 3 keyed by sha256 (idempotent, exists-first check).
  3. Attempt Tier 1.5 write: create / open shm, copy buffer (one memcpy from the IPC stream), set valid_until = now() + lease_until + 30s.
  4. Emit one event whose envelope carries PayloadReference with both Tier 3 URI and the optional IpcHint.

Consumer (frame commit handler, reducer, projection worker):

  1. Read PayloadReference from event envelope.
  2. Try IpcHint if present and node_id == self.node_id and valid_until > now(): open shm, mmap, wrap as pyarrow.RecordBatchStreamReader. Zero-copy.
  3. If hint is missing, stale, or another node: read from Tier 1 cache; on miss, Tier 2; on miss, Tier 3.

The consumer never trusts the IPC hint blindly. It validates schema_digest (cheap), bumps a tier metric, and on any error falls through to the durable read path. The durable read is the source of truth.

6.5 Garbage collection and back-pressure

  • Each shm region is owned by the producer worker for the duration of its frame lease + 30s grace. After grace, the worker shm_unlinks the region.
  • Workers track a per-node tier15_bytes_in_use counter. New shm writes are admission-controlled by a configurable budget (default 1 GB per node). On budget exhaustion the writer skips Tier 1.5 and emits only the durable Tier 3 reference. No data plane stall.
  • For multi-process per pod (future): introduce a noetl-node-broker sidecar that owns the shm namespace and brokers lifetimes via Unix-socket RPC. Not required for the current one-process-per-pod deployment shape.

6.6 Serialization contract

Serialization is part of the replay contract. If two runtimes read the same event stream and payload objects, they must fold the same state.

Rules:

  • Event envelopes: canonical JSON for the persisted envelope metadata. Writers must emit stable field names, UTC timestamps, explicit nulls only where schema allows them, and deterministic map key ordering before signing/checksumming.
  • Large tabular payloads: Apache Arrow IPC stream, media type application/vnd.apache.arrow.stream. Every payload records schema_digest, row_count, byte_length, compression codec, and content SHA-256.
  • Nested business payloads: JSON Schema / OpenAPI-compatible payloads for small objects; Arrow struct, list, and map types for large repeated data. Avoid pickle, language-native binary serialization, or runtime-specific object graphs.
  • Decimals and time: decimals carry precision/scale; timestamps are UTC with explicit unit; local time zones are data fields, not implicit runtime settings.
  • Schema evolution: every event has schema_name and schema_version. Upcasters are pure functions registered by (schema_name, from_version) and must advance to a later schema_version before the next upcaster can run. The registry is deterministic and covered by replay tests; production releases must also record the registry digest in replay/projection snapshots.
  • Cross-language parity: Python and Rust compliance tests read the same golden event/payload corpus and compare fold checksums.

The durable payload digest is computed over the exact serialized bytes. The projection checksum is computed over a canonical projection serialization, not over backend-specific storage bytes.

6.7 Why Arrow, not raw bytes

  • Columnar layout is the right shape for the heavy paths (Postgres bulk reads, DuckDB joins, fanout reducers, projection writes to analytical stores).
  • Zero-copy between Python and Rust workers via the C Data Interface comes for free with Arrow. Important once we run a mix of pyarrow-using Python workers and arrow-rs Rust workers on the same pod.
  • The wire schema is self-describing enough for payload reads, while the event envelope still records schema identity/version for governance and replay.

7. Decentralized projection

7.1 Current state

noetl_async_sharded_architecture.md already specifies async projection workers and epoch barriers. The projection worker skeleton exists. What is not yet decentralized: the projection still runs inside the server process, and the projection write rate caps at the server's single-writer Postgres connection.

7.2 Refactor

  • Extract projection into its own deployable: noetl-projector. Same image, different entrypoint.
  • Keep the reducer transport-neutral: the same deterministic projector core must accept events from tests, replay, NATS JetStream, Kafka, or cloud stream adapters. Transport code owns delivery and checkpointing; reducer code owns ordering, folding, lineage, checksum, and idempotent writes.
  • Each projector instance owns one or more shards via NATS JetStream pull consumer group (noetl.projection.shard.<n>).
  • Shard assignment is sticky: a projector keeps a shard as long as it heartbeats. On stop, NATS reassigns. This is the standard JetStream durable consumer pattern.
  • Each shard has its own Postgres connection (or its own backend entirely, see §8 for the projection store abstraction). Total projection throughput scales linearly with shard count.
  • Projector reads events from NATS, resolves PayloadReference via the cache hierarchy (Tier 1 → 1.5 → 2 → 3), and writes projection state. Tier 1.5 is the hot path when the projector and the producing worker are colocated.

7.3 Stable identity

To get StatefulSet-style stable identity (required for Tier 2 disk cache continuity and for NATS durable consumer affinity), projectors and workers run as StatefulSet not Deployment:

apiVersion: apps/v1
kind: StatefulSet
metadata:
name: noetl-projector
spec:
serviceName: noetl-projector
replicas: 4
template:
spec:
containers:
- name: projector
env:
- name: NOETL_SHARD_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NOETL_NODE_ID
valueFrom:
fieldRef:
fieldPath: status.podIP

NOETL_SHARD_ID = noetl-projector-0..3 is stable across restarts and maps directly to a NATS durable consumer name.

7.4 Projection store abstraction (port/adapter)

class ProjectionStorePort(Protocol):
async def save_projection(
self, projection_id: str, state: bytes, version: int
) -> None: ...

async def load_projection(
self, projection_id: str
) -> Optional[Tuple[bytes, int]]: ...

async def save_snapshot(
self, aggregate_id: str, aggregate_type: str,
snapshot: bytes, version: int
) -> None: ...

async def load_snapshot(
self, aggregate_id: str
) -> Optional[Tuple[bytes, int]]: ...

async def query(
self, projection_type: str, filters: Mapping[str, Any],
pagination: Pagination
) -> AsyncIterator[Mapping[str, Any]]: ...

Adapters: Postgres (reference), cloud/serverless document and key-value stores, wide-column stores, columnar analytical stores, search stores, vector stores, and streaming materialized-view engines. Specific products are deployment choices, not architectural dependencies.

Implementation status:

  • The first projection-store port is present in repos/noetl/noetl/core/projection_store.
  • ProjectionRecord and ProjectionSnapshot define idempotent projection and snapshot writes.
  • projection_checksum() provides deterministic JSON checksums for replay parity.
  • PostgresProjectionStore is the reference adapter and writes to additive noetl.projection and noetl.projection_snapshot tables.
  • The first transport-neutral reducer is present in repos/noetl/noetl/core/projector. ReplayStateProjector groups events by tenant, organization, and execution, orders by event position, folds them with the same replay-state reducer used by GET /api/replay/state, and writes lineage-rich ProjectionRecord rows with checksum and upcaster-registry metadata.
  • The first projector process entrypoint is present at python -m noetl.projector. It wraps the reducer with a NATS durable pull consumer, stable shard identity, and modulo execution-id shard filtering so a StatefulSet replica only projects the events it owns.
  • noetl/noetl#457 adds optional projector /metrics and /health endpoints (NOETL_PROJECTOR_METRICS_PORT / --metrics-port) with counters for notifications, extracted events, shard-owned events, projection records, empty/unowned notifications, errors, and last batch gauges. noetl/noetl#536 adds separate counters for events assigned to another shard and events that cannot be sharded because the execution id is missing or invalid. noetl/noetl#537 adds a stale projection-write counter for duplicate/out-of-order delivery that loses the version race against an already-written projection. Access logging is suppressed for scrape paths.
  • noetl/noetl#458 exports durable checkpoint gauges from successful projector writes: last source event id, event-time watermark, projected-at timestamp, latest projection lag, and max projection lag since process start.
  • NATSEventPublisher is present in repos/noetl/noetl/core/messaging. It mirrors canonical event envelopes to subjects shaped as noetl.events.<tenant>.<org>.<execution>.<shard>, creates the event stream with a wildcard subject, and retries once after reconnect on publish failure.
  • noetl/noetl#464 adds noetl.outbox plus noetl.core.outbox. The outbox is the preferred Phase 2 commit boundary: append noetl.event and enqueue the distribution envelope in one database transaction, then let a publisher drain only committed rows to NATS. Rows include JSONB payloads for debugging and pre-encoded payload bytes with a codec so the publisher can avoid repeat JSON serialization when a subject is known.
  • noetl/noetl#479 completes the matching consumer side for that codec choice. Projector subscribers keep JSON compatibility for direct envelopes and decode Arrow Feather payload bytes from the outbox path into the same event dictionaries before sharding and replay-state folding.
  • noetl/noetl#480 adds the standalone outbox publisher entrypoint (python -m noetl.outbox). It initializes the database pool, ensures the outbox table, publishes ready rows in bounded batches, and supports a --once mode for smoke tests and Kubernetes Jobs.
  • noetl/noetl#466 applies that boundary to direct /api/events: command lifecycle events, workflow/playbook lifecycle events, and engine-generated command.issued rows enqueue their mirrored envelopes into noetl.outbox before the database commit. After commit, the route asks the outbox publisher to drain committed rows. This removes the direct post-commit publish hook from the core event path without changing the external worker API.
  • noetl/noetl#467 applies the same boundary to /api/events/batch: item events, batch.accepted, batch.processing, batch.completed, batch.failed, and batch-generated command.issued rows enqueue in the writer transaction or insert chunk. The route preserves the existing external batch-request contract; it only changes how distribution intent becomes durable.
  • noetl/noetl#468 applies the boundary to execution routes: /api/execute initial command.issued rows, /api/executions/{execution_id}/cancel execution.cancelled rows, and cleanup-stuck execution.cancelled rows enqueue before commit, then drain committed outbox rows. Command recovery publish remains separate because it is worker dispatch, not projector/event distribution.
  • noetl/noetl#469 applies the boundary to auto-resume recovery cancellations so interrupted parent executions marked execution.cancelled by startup recovery participate in the same projector distribution bridge as API-generated cancellations.
  • noetl/noetl#470 applies the boundary to generic broker event emission. EventService.emit_event stores the sanitized event row and outbox envelope in one transaction, preserving the broker API response while making generic emits projector-visible.
  • noetl/noetl#471 applies the boundary to command claims. claim_command now stores command.claimed and updates the mutable command row in the same transaction as the outbox envelope, then drains committed rows after commit.
  • noetl/noetl#472 applies the boundary to the backend-neutral event-store port. PostgresEventStore.append writes the canonical envelope and outbox envelope in the same expected-version transaction, so future event-store users inherit durable projector fan-out without adding route-local hooks.
  • noetl/noetl#473 applies the boundary to DSL executor-owned events. _persist_event, stage.opened, and stage.closed enqueue before commit; outbox drain is skipped for caller-owned transactions and only run after executor-owned commits.
  • Command-event idempotency keeps command identifiers lossless as strings at the event boundary while still using the numeric projection column when available. Duplicate guards compare both the typed command_id column and meta.command_id, preserving replay compatibility with older worker events and newer snowflake command rows.
  • noetl/noetl#478 applies the boundary to frame lifecycle events. Frame claim, heartbeat/start, abandon, commit, and failure routes enqueue replay envelopes into noetl.outbox in the same transaction as noetl.frame and noetl.event, then drain only committed rows after the route commit.
  • Frame lifecycle mirroring remains opt-in via NOETL_EVENT_MIRROR_ENABLED=true, but the route no longer publishes directly; distribution is mediated by noetl.outbox.
  • noetl/noetl#459 originally extended the same opt-in mirroring to persisted /api/events lifecycle events and generated command.issued events via post-commit direct publish. noetl/noetl#466 supersedes that route-local hook with same-transaction outbox enqueue.
  • noetl/noetl#460 closes the async batch-ingestion gap by mirroring /api/events/batch item events, batch.accepted, batch.processing, batch.completed, batch.failed, and batch-generated command.issued events after their canonical commits. The mirrored envelopes preserve command_id, stage_id, frame_id, parent_event_id, and parent_execution_id where available so projector replay sees the same lineage as direct event ingestion.
  • noetl/noetl#462 covers execution-route appends: initial /api/execute generated command.issued events and API execution.cancelled events are mirrored after their canonical commits.
  • Deployment wiring has started in noetl/ops#102 and noetl/ops#103: the Helm chart gains an opt-in noetl-projector StatefulSet, projector ConfigMap, headless service, stable shard identity from the StatefulSet pod name, automatic server event-mirror env wiring when projector.enabled=true, and a named metrics port on 9090.
  • Projector startup validates the resolved shard settings. A worker whose stable pod identity maps to shard_index >= shard_count, or whose fetch/ack/metrics runtime limits are non-positive, fails at process initialization instead of silently subscribing as a shard that can never own events. Environment and CLI loaders preserve invalid values for that validation step; they do not clamp bad shard counts, inflight limits, timeouts, or metrics ports into defaults. Projector metrics expose shard_id, shard_index, shard_count, consumer, stream, and subject as stable labels so validation dashboards can confirm the running topology. Payload decode failures and projection callback failures have separate counters; noetl_projector_errors_total is the aggregate counter over both categories. Projector ACKs, NAKs, delayed NAKs, and TERMs are counted separately from the subscriber action observer, with recency gauges and last redelivery delay available for validation dashboards. In-process summary helpers now report action ratios, last-batch shape ratios, decode/projection error ratios, and the combined projector runtime state; the projector metrics server exposes the same payload at /summary for local-kind/GKE evidence capture, and run_projector_phase2_validation.py wraps replay validation plus projector evidence gating into the current Phase 2 operator workflow.
  • Projector checkpoint metadata landed in noetl/noetl#455: each replay-state projection write records event_count, source_event_id, event_time_watermark, projected_at, and projection_lag_ms in ProjectionRecord.meta. noetl/noetl#458 exposes those fields as per-shard Prometheus gauges.
  • The adapter currently supports:
    • save_projection(record) with version-monotonic upsert semantics;
    • load_projection(projection_id);
    • query_projections(query) by tenant, organization, projection type, execution id, and limit;
    • save_snapshot(snapshot) with version-monotonic upsert semantics;
    • load_snapshot(aggregate_id, aggregate_type=...).
  • Live mirrored-event projector validation and remaining direct event append paths remain tracked by noetl/noetl#437 and noetl/noetl#461. The next migration step is deploying the full outbox path in kind/GKE and validating projector replay against PFT v2.
  • Non-Postgres projection adapters remain tracked by noetl/noetl#439.

Projection backend roles:

  • Operational projection stores serve execution status, current frame state, user-facing API reads, and transactional admin surfaces.
  • Columnar analytical projection stores are the high-volume analytical memory of the system: event lake tables, tenant/org audit timelines, PFT metrics, cost/billing facts, and queryable business histories. They are append-friendly and rebuildable from the event stream; they must not own unreplayable state.
  • Streaming materialized-view engines maintain source/table/materialized-view/sink pipelines, incremental aggregations, work-queue state, and barrier-aligned derived state. They consume canonical events or table sources and emit derived updates, never replacing the canonical log.
  • Search/vector stores serve retrieval shapes only. They are rebuilt from event/payload/projected source data and may lag without compromising correctness.

Every projection row should include enough lineage to support audit and replay comparison: tenant_id, organization_id, execution_id where applicable, source event_id or event-position range, projection version, and checksum.

Each projection type can target a different backend. The projector dispatches by the projection's configured backend (see §11 for the YAML).


8. Event store abstraction (port/adapter)

class EventStorePort(Protocol):
async def append(
self, stream_id: StreamId, events: Sequence[EventEnvelope],
expected_version: Optional[int]
) -> int: ...

async def read(
self, stream_id: StreamId, from_version: int = 0
) -> AsyncIterator[EventEnvelope]: ...

async def subscribe(
self, stream_pattern: str, consumer_group: str,
from_position: ConsumerPosition
) -> Subscription: ...

Adapters (priority order):

  1. Postgres noetl.event — canonical ledger and replay source by default. Partitioned by tenant/time/execution as volumes grow.
  2. NATS JetStream — reference distribution stream. Subject = noetl.events.<tenant>.<execution>.<shard>. Durable consumers per projector / per worker. Events are persisted to the canonical ledger and mirrored to JetStream for low-latency fan-out.
  3. Apache Kafka / Confluent / MSK — partition key = aggregate id; offset checks for expected_version.
  4. Google Pub/Sub — topic per category, ordering key per aggregate, side store (Spanner / Firestore) for expected_version.
  5. Azure Event Hubs — Kafka-compat mode reuses Kafka adapter; native mode uses Event Hubs SDK + Blob checkpoint store.
  6. Amazon Kinesis Data Streams — partition key per aggregate, DynamoDB for version tracking, KCL for consumer coordination.

Implementation status:

  • The first backend-neutral event-store port is present in repos/noetl/noetl/core/event_store.
  • EventRecord defines the canonical append envelope used by adapters.
  • canonical_event_checksum() serializes JSON deterministically with sorted keys and compact separators.
  • PostgresEventStore is the reference adapter for noetl.event.
  • The adapter currently supports:
    • append(stream_id, events, expected_version=...)
    • read(stream_id, from_version=..., limit=...)
    • per-stream expected-version conflict detection;
    • mapping to additive event envelope columns (stream_id, stream_version, aggregate/schema/tenant fields, payload_ref, envelope_checksum).
  • NATS/Kafka/cloud-native stream adapters and projection-store ports remain tracked by noetl/noetl#439. The default NATS publisher now has an outbox-backed path tracked by noetl/noetl#464; direct /api/events adopts it in noetl/noetl#466. Future adapters should drain the same noetl.outbox contract rather than adding writer-local publish hooks.

Adapter design constraints:

  • Idempotent handlers as baseline; assume at-least-once delivery. Do not try to abstract exactly-once differences.
  • Per-aggregate ordering only. No global ordering guarantee.
  • Backend-specific retention / compaction / DLQ / monitoring stays out of the abstraction.
  • Event schema evolution: every event carries schema_version. Adapters do not transform; upcasting lives in the projection layer.
  • Configuration selects the backend at deploy time; same image runs anywhere.
  • The adapter boundary separates canonical append from distribution. A deployment may use Postgres as the canonical append log and JetStream/Kafka as the fan-out transport, or a cloud-native log plus a compacted replay archive, but it must expose the same replay semantics.

9. Payload store abstraction (port/adapter)

class PayloadStorePort(Protocol):
async def store(
self, data: bytes | AsyncIterator[bytes],
content_type: str, metadata: Mapping[str, str]
) -> PayloadReference: ...

async def fetch(
self, reference: PayloadReference
) -> bytes | AsyncIterator[bytes]: ...

async def exists(self, reference: PayloadReference) -> bool: ...

async def delete(self, reference: PayloadReference) -> None: ...

async def resolve_envelope(
self, envelope: EventEnvelope
) -> EventEnvelope: ...

async def externalize_envelope(
self, envelope: EventEnvelope, threshold: int
) -> EventEnvelope: ...

Adapters: S3 (reference, covers SeaweedFS via S3-compat), GCS, Azure Blob, local filesystem.

Constraints:

  • Content-addressed by SHA-256. Two events with identical large payloads share a reference. No duplicate uploads.
  • Immutable on write. Versioning happens through new references, not in-place updates.
  • Transparent to handlers via resolve_envelope / externalize_envelope middleware.
  • Retention is reference-counted by event store retention; configurable TTL as safety net.
  • Encryption at rest is per adapter.
  • Streaming upload / download. No buffering full payloads in memory.

The cache hierarchy described in §6 (Tier 1 → 1.5 → 2 → 3) sits on top of this interface, transparent to user code.


10. Cloud distributed OS surfaces

This layer is what lets NoETL behave like a distributed business operating system for multitenant organizations. It provides identity, addressability, locality, replay, and resource accounting across clusters without coupling application logic to a specific cloud.

10.1 Unified resource locator

Every addressable thing in the system gets a stable URI:

noetl://tenant/<tenant>/org/<org>/cluster/<cluster>/region/<region>/zone/<zone>/node/<node>/process/<pid>/<kind>/<id>

Examples:
noetl://tenant/acme/org/care-network/cluster/prod-1/region/us-central1/zone/us-central1-a/
node/gke-noetl-pool-a-7b/process/1/worker/cpu-01

noetl://tenant/acme/org/care-network/cluster/prod-1/region/global/none/none/none/
stream/events/<execution>/<shard>

noetl://tenant/acme/org/care-network/payloads/sha256/<sha256>

Tenants, organizations, workers, projectors, MCP servers, JetStream streams, Tier 3 payloads, cache entries, projection shards, and frame leases are all referenceable. The locator is the join key across the event store, projection store, and observability layer.

Implementation status:

  • repos/noetl/noetl/core/resource_locator.py provides the first side-effect-free parser/builder for canonical noetl://... locators. It validates the scheme, rejects query/fragment data, percent-encodes path segments, extracts alternating key/value pairs, and supports existing execution result refs such as noetl://execution/<id>/result/<step>/<ref>.
  • Initial adoption is intentionally non-invasive. Frame commit durable-reference validation, DSL compact result-ref normalization, and agent compact result-ref normalization now parse noetl:// locators with this utility. Remaining follow-up PRs should replace ad hoc startswith("noetl://") parsing where topology or tenant/org extraction is required.

10.1.1 Multitenant isolation

Multitenancy is enforced at the event/payload boundary first, then projected outward:

  • Event partitions include tenant_id, organization_id, and time/execution keys.
  • Payload object keys include tenant/org prefixes and content digests; encryption context is tenant-scoped.
  • Cache keys include tenant/org/execution. Shared cache layers may share infrastructure, not key space.
  • Projection stores use row-level security or physical database/schema separation depending on the tenant isolation tier.
  • Replay APIs require tenant/org scope and never scan a global stream without an explicit operator permission.
  • Cross-tenant materialized analytics are allowed only through curated, governed projections with redaction rules encoded as projection code.

10.2 Topology-aware scheduling

Frame claim has an optional locality preference:

POST /api/stages/{stage_id}/frames/claim
body:
worker_id: noetl://tenant/.../org/.../cluster/.../worker/cpu-01
locality:
prefer_node: <node_id> # for Tier 1.5 colocation
prefer_zone: us-central1-a # for Tier 2 cache locality
max_distance: zone | region | any

Initial implementation: cursor workers include best-effort node_id, cluster_id, region, zone, worker_pool, and runtime locality in the frame claim body, and the server persists that object on the frame.dispatched event metadata. When possible, the server also records worker_locator as a canonical noetl://tenant/.../org/.../cluster/.../node/.../worker/... identity. If the frame cursor includes source_locality or producer_locality, the server records a placement evaluation (distance, requested max_distance, and within_max_distance) on the dispatch event. The scheduler still claims the closest available frame using the existing indexed frame predicates; enforcing prefer_node / prefer_zone against selection is the next step after producer locality is persisted consistently.

Target scheduler behavior: try the closest match. Frames produced by a worker prefer to be reduced by a worker on the same node (Tier 1.5 hit) or in the same zone (Tier 2 hit). Cross-region only when local capacity is exhausted.

10.3 Autoscaling

The current deployable KEDA scaler reads the NoETL runtime backlog metric:

  • Source: noetl_frame_backlog_total{stage_kind="all",status="all"} via a Prometheus-compatible metrics endpoint.
  • Target: one backlog unit per desired worker by default, clamped by worker.autoscaling.minReplicas / maxReplicas.
  • Enablement: worker.autoscaling.enabled=true plus worker.autoscaling.keda.enabled=true in the Helm chart. When this mode is on, the chart renders a ScaledObject and suppresses the CPU/memory HPA to avoid competing scale controllers.
  • Detail signal: noetl_frame_backlog_detail_total{tenant_id=...,organization_id=...,stage_kind=...,status=...} exposes the same runtime backlog grouped by tenant and organization for dashboards and future policy engines. It is intentionally separate from the global KEDA metric so tenant cardinality does not affect the default scaler query.

No orchestration component should query the runtime database directly for scaling. Postgres is the current storage/projection implementation; the autoscaling contract is the NoETL metric stream so the backing store can change without rewriting cluster control surfaces.

The next scheduler-aware version should use two richer signals without changing replay semantics:

  • noetl_frame_backlog_detail_total{tenant_id=...,organization_id=...,stage_kind=...,status=...} — pending frames waiting for a worker, partitioned by stage kind and tenant/org.
  • frame_p95_lease_duration — moving p95 of how long frames take.

Autoscale formula: desired_workers = max(1, ceil(frame_backlog_total / target_concurrent_frames_per_worker)), clamped by per-cluster max. No provisioned capacity number.

For multi-cluster, the NATS JetStream supercluster routes the same noetl.events.* subjects across clusters. The scheduler can claim frames from peer clusters when local backlog has cleared. This is opt-in and gated by an egress cost budget.

Initial operations surface: noetl/ops#109 adds automation/infrastructure/nats_supercluster.yaml, an opt-in playbook that renders and deploys a gateway-enabled NATS JetStream member for the active Kubernetes context. The existing single-cluster NATS manifest remains unchanged. Operators pass a logical cluster_name, externally reachable gateway_advertise address, and explicit remote_gateways entries. This matches the NATS gateway model: gateways use a dedicated gateway port, connect clusters into a supercluster, and each gateway node needs bidirectional reachability to remote gateway nodes. The playbook also exposes a gatewayz action for inspecting gateway links.

References:

10.4 Resilience

  • Frame lease + cursor checkpoint = recovery primitive. Worker crash within a frame loses at most one frame's worth of work; the next worker resumes from the last committed cursor.
  • Tier 3 is the durable read. Tier 1.5 / 2 are optimizations; their failure modes are graceful degradation, not data loss.
  • Projection store writes are idempotent by (stage_id, frame_id, partition_id). Replays are safe.
  • Frame reaper (rebranded command reaper) republishes stale leases.

10.5 Quantum-cloud posture

"Quantum cloud" here means cloud infrastructure that can place work across heterogeneous compute and storage fabrics while preserving one event-sourced timeline per tenant organization. The runtime should assume future worker classes beyond CPU/GPU, including confidential compute, specialized accelerators, and quantum-provider task queues.

The contract stays the same:

  • Specialized workers receive frames with typed input payload references.
  • Provider submissions, callbacks, measurements, and result ingests are events with immutable payload references.
  • External provider state is modeled as side effects with correlation/idempotency keys, not as hidden runtime state.
  • Replay can reconstruct what was submitted, what was observed, and what state NoETL derived from it, even when the physical computation cannot be rerun deterministically.

11. Configuration

YAML, shared between Python and Rust runtimes:

runtime:
tenant_id: ${NOETL_TENANT_ID}
organization_id: ${NOETL_ORGANIZATION_ID}
node_id: ${POD_IP} # mandatory, used in locator + IPC hint
cluster_id: ${NOETL_CLUSTER_ID}
region: ${NOETL_REGION}
zone: ${NOETL_ZONE}
shard_id: ${NOETL_SHARD_ID} # for StatefulSet projectors / workers
scheduler:
locality_preference: zone # node | zone | region | any
max_inflight_frames_per_worker: 4

event_store:
canonical_backend: postgres # append-only replay ledger
distribution_backend: nats-jetstream # kafka | google-pubsub | azure-event-hubs | aws-kinesis | aws-msk
connection:
url: nats://nats.nats.svc.cluster.local:4222
stream_prefix: noetl.events
replay:
snapshot_interval_events: 10000
checksum_algorithm: blake3
require_projection_parity: true

serialization:
envelope_format: canonical-json
tabular_payload_format: arrow-ipc-stream
schema_registry:
backend: file # file | db | external
path: /etc/noetl/schemas
compression: zstd
timezone: UTC

payload_store:
backend: s3 # gcs | azure-blob | seaweedfs | local
connection:
endpoint: https://storage.googleapis.com
bucket: noetl-payloads-prod
threshold_bytes: 262144 # 256 KB inline cap
content_addressing: sha256
encryption_at_rest: true
cache:
tier_1_memory_mb: 512
tier_15_node_budget_mb: 1024 # Arrow IPC budget per node
tier_15_grace_seconds: 30
tier_2_disk_gb: 10
tier_2_disk_path: /var/cache/noetl/payloads
gc:
strategy: reference-count # | ttl
ttl_days: 90

projection_stores:
default:
backend: postgres
connection:
dsn: postgresql://noetl:***@pg.noetl.svc:5432/noetl
search:
backend: search-store
connection:
hosts: ["http://search.search.svc:9200"]
analytics:
backend: columnar-analytics
connection:
dsn: ${NOETL_ANALYTICS_DSN}
streaming:
backend: streaming-materialized-view
connection:
dsn: ${NOETL_STREAMING_MV_DSN}

materializations:
event_lake:
backend: columnar-analytics
source: event_store
include_payload_refs: true
operational_mvs:
backend: streaming-materialized-view
source: event_store
barrier_interval_ms: 1000

snapshots:
backend: postgres # defaults to projection_stores.default

frame_policy_defaults:
loop:
size: 50
duration_ms: 30000
memory_bytes: 67108864
parallelism: 1
fanout:
size: 1
duration_ms: 5000
memory_bytes: 16777216
parallelism: 8

The same image runs anywhere. Backend changes are config-only.


12. Refactor plan (phased, additive)

Phase 0 — Instrumentation (1 week)

Implementation status: partially shipped in noetl/noetl#435.

  • Add the metrics in §4 to Grafana / VictoriaMetrics dashboards (already deployed).
  • Baseline a fresh PFT v2 run on GKE. Capture the metric values and pin to memory.
  • Add noetl.stage and noetl.frame tables via canonical Postgres DDL. Done in Phase 0 as additive DDL.
  • Add canonical event-envelope schema validation, including tenant/org scope, schema_name, schema_version, idempotency_key, payload digest, and deterministic checksum fields. The additive columns are present; validation hardening remains incremental.
  • Add replay harness: rebuild execution/frame/loop projections from noetl.event + payload store and compare checksums against live projection rows. The first replay API is shipped; full parity harness remains tracked by noetl/noetl#440.

Validation notes:

  • Focused runtime/replay regression is green in repos/noetl.
  • Local kind deployment is validated via repos/ops/automation/infrastructure/kind.yaml and repos/ops/automation/development/noetl.yaml using Podman.
  • Latest local kind validation for noetl/noetl#435: image local/noetl:2026-05-16-22-49, health/replay/pod/log smokes passed, /metrics exposes the IPC counters, a temporary frame claim emitted stream_version=1, 64-character envelope_checksum, and non-null catalog_id, in-pod Arrow IPC TempStore round-trip passed, in-pod cursor-worker frame capture produced two Arrow-backed frames from three claimed rows, and the deployed server image imports ReplayStateProjector, NATSProjectorWorker, NATSEventPublisher, and the python -m noetl.projector entrypoint.
  • Frame-loop local kind validation on 2026-05-17 (noetl/noetl#435, noetl/e2e#22, noetl/ops#98) completed PFT v2 execution 628959278765703700 in 33m10s. The run populated 60 noetl.stage rows and 5,570 noetl.frame rows, emitted 60 stage.opened, 5,570 frame.dispatched, and 5,570 frame.committed events, and emitted zero frame.failed events. The fixture used three paginated-api replicas with PFT_RATE_LIMIT=500; strict fixture 429 checks and worker/server error checks were clean in the final validation windows.
  • Stage terminal events are now implemented and validated: the executor emits stage.closed, records closed_event_id, and marks the stage COMPLETED when the loop epoch finishes. Tracker noetl/noetl#443 is closed; future failed/abandoned/reaper-specific gaps should be tracked as narrower bugs.
  • Lineage validation on local kind image localhost/local/noetl:2026-05-17-13-32 completed PFT v2 execution 629003028435042682 in 33m36s with completed=true and failed=false. Final counters: 60 stage.opened, 60 stage.closed, 5,562 frame.dispatched, 5,562 frame.committed, zero frame.failed, all 60 stages COMPLETED, frame.command_id/claimed_event_id/terminal_event_id populated on all 5,562 frames, and completed frames averaged 49.34 rows with max 50.
  • Frame-mode local kind validation on image localhost/local/noetl:2026-05-17-18-07 completed PFT v2 execution 629145120213828019 in 279.785s with completed=true and failed=false. Final counters: 5,498 frames, average 49.92 rows/frame, max 50, metrics present on every frame, all 10 facilities at 1000/1000 for the five patient domains, and MDS 224,443/224,443.
  • Frame-lineage local kind validation on image localhost/local/noetl:2026-05-17-18-56 completed PFT v2 execution 629166136403165640 in 269.156s with completed=true and failed=false. Final counters: 60 stages opened/closed, 5,500 frames, 5,440 frames with parent_frame_id (one root per stage), 5,500/5,500 claimed_event_id and terminal_event_id links populated, 5,500 frame.dispatched, 5,500 frame.committed, zero frame.failed, all 10 facilities at 1000/1000 for the five patient domains, and MDS 224,443/224,443.
  • Frame-lifecycle local kind validation on image local/noetl:2026-05-17-19-10 completed PFT v2 execution 629173479832551841 in 340.435s with completed=true and failed=false. Final counters: 60 stages opened/closed, 5,499 frames, 5,439 frames with parent_frame_id (one root per stage), 5,499/5,499 claimed_event_id and terminal_event_id links populated, 5,499 frame.dispatched, 5,499 frame.started, 5,499 frame.committed, zero frame.failed, all 10 facilities at 1000/1000 for the five patient domains, and MDS 224,443/224,443. The validation command passed --set num_facilities=1 --set patients_per_facility=100, but catalog execution overrides were ignored and the run used defaults 10/1000; this is tracked in noetl/cli#12.
  • Frame-stream lock-order validation on image local/noetl:2026-05-17-19-51 completed PFT v2 execution 629193644754337792 in 487.855s with completed=true and failed=false. Final counters: 60 stages opened/closed, 5,496 frames, 5,436 frames with parent_frame_id (one root per stage), 5,496/5,496 claimed_event_id and terminal_event_id links populated, 5,496 frame.dispatched, 5,496 frame.started, 5,496 frame.committed, zero frame.failed, all 10 facilities at 1000/1000 for the five patient domains, MDS 224,443/224,443, and no frame endpoint deadlocks or 500s in the validation log window. Two prior validations on images local/noetl:2026-05-17-19-26 and local/noetl:2026-05-17-19-42 were cancelled after exposing frame event deadlocks; the fix is consistent lock acquisition before frame row mutation.
  • Replay hardening now folds direct event.stage_id, event.frame_id, and event.command_id columns; records stage open/close event ids and frame claim/terminal event ids; and treats frame.abandoned as a deterministic lease-recovery transition. The golden replay corpus checksum was updated to cover these extra lineage fields.
  • Frame replay parity now includes a deterministic frame projection checksum that can compare live noetl.frame rows with replayed frame state. The normalized parity surface includes frame/stage/parent/command ids, claim and terminal event links, status, row count, cursor metadata, emitted-event count, and Arrow IPC rows_ref summary fields (sha256, schema_digest, row_count, media_type, ref). Against local kind PFT execution 629193644754337792, live and replayed frame projection checksums matched at a65a8ce90d228332d3ce50958c6f41f1f8d584c36ee45ecd1645d61cb0e90cef across 5,496 frames.
  • Command replay parity now has the same normalized checksum surface for live command projection rows and replayed commands state, including stage/frame ids, parent command id, issued/claimed/started/terminal event links, worker locator, locality, source locality, and placement metadata.
  • Business-object replay parity now has normalized checksum helpers for live business-object projection rows and replayed business_objects state, including object ids, status/version, event lineage, deletion marker, latest payload summary, payload-ref count, and folded attributes.
  • Loop replay parity now has normalized checksum helpers for live loop progress rows and replayed loops state, including totals, done/failed counters, completion state, and last folded event id.
  • Execution replay parity now has normalized checksum helpers for live execution projection rows and replayed top-level execution state, including tenant/org scope, status, last node/event, event count, latest payload summary, and upcaster registry digest.
  • Stage replay parity now has normalized checksum helpers for live stage projection rows and replayed stages state, including stage status, kind, parent/loop linkage, open/close event ids, frame/row/emitted/failed counters, and last folded event id.
  • Replay parity now has an aggregate checksum bundle helper exposed on replay responses as projection_checksums, with named checksums for execution, stages, frames, commands, business_objects, and loops. Replay-state projector metadata carries the same bundle so consumers do not need to deserialize the full state snapshot for parity dashboards. Storage adapters can produce the live side through live_projection_checksum_bundle(...), compare bundles through projection_checksum_parity_report(...), or use scripts/check_replay_parity_report.py for offline JSON bundle checks, giving release gates one stable comparison artifact while preserving surface-level diagnostics when any checksum diverges.
  • Replay input reads now use the ReplayEventReader port. The reference PostgresReplayEventReader is only an adapter; replay folding, checksum bundles, and parity reporting remain storage-neutral and can be reused by future event-log backends. ReplayService.replay_state(..., event_reader=...) lets validation tools and future adapters inject a reader without mutating process-global state.
  • Replay payload reads now use the ReplayPayloadResolver port. The reference TempStoreReplayPayloadResolver resolves existing TempStore/ResultStore references and returns bounded verification data (checksum, size, row count, value type, error) so replay validation can prove payload availability without returning large row bodies or coupling to a specific durable store. payload_resolution_summary is the release-report surface for payload availability: total refs, resolved/unresolved counts, unique refs, all-resolved status, and a deterministic checksum of the bounded report.
  • Replay schema evolution now uses the same dependency-injection pattern as event reads and payload resolution: ReplayService.configure_upcaster_registry(...) sets the process default for a runtime, while ReplayService.replay_state(..., upcaster_registry=...) lets validation runs replay the same event stream with an explicit registry and assert the resulting upcaster_registry_digest.
  • Offline replay payload validation now has a release-gate script: scripts/check_replay_payload_resolution_report.py --report replay.json. It consumes replay-state JSON, recomputes the bounded payload-resolution summary, and exits nonzero on unresolved payload refs or summary checksum mismatch.
  • Replay state report capture now has a small API fetcher: scripts/fetch_replay_state_report.py --base-url ... --execution-id ... --output replay.json [--resolve-payloads]. The output is stable JSON intended for the offline projection parity and payload-resolution gates, and the fetcher rejects non-object replay JSON before writing the artifact.
  • Replay validation orchestration now has a runner: scripts/run_replay_validation.py --base-url ... --execution-id ... --output-dir ... [--live-checksums live.json | --live-rows live-rows.json] [--resolve-payloads] [--report-output validation.json]. It captures replay state, verifies the fetch artifact was produced, optionally builds live-checksums-<execution_id>.json from adapter-exported live projection rows, runs offline gates, and emits a reproducible validation manifest with config, UTC timestamps, per-step duration, command output, and parsed JSON stdout for local kind and GKE evidence.
  • Replay validation manifests are themselves gateable: scripts/check_replay_validation_manifest.py --manifest validation.json [--check-artifacts] verifies manifest config shape, ISO timestamps, required step order, successful step return codes, non-negative durations, optional artifact paths, and mutually exclusive live parity inputs. Failed manifests can be inspected with --allow-failed, but release evidence should pass without it.
  • Live parity artifacts are storage-neutral: scripts/build_live_projection_checksums.py --rows live-rows.json --output live-checksums.json consumes adapter-exported JSON row arrays for execution, stages, frames, commands, business_objects, and loops, validates row shape, and writes the canonical projection_checksums bundle plus row counts. Postgres is only one possible exporter; the checksum builder operates on rows, not database routines.
  • The current deployed-cluster live-row exporter is scripts/export_live_projection_rows_postgres.py --execution-id ... --tenant-id ... --organization-id ... --output live-rows.json. It is intentionally an adapter, not the contract: it emits the same rows.{execution,stages,frames,commands,business_objects,loops} JSON shape any future storage backend can produce. It uses ordinary reads from noetl.execution, noetl.stage, noetl.frame, noetl.command, noetl.event, and the replay-state projection record; no database functions or routines are required for validation evidence.
  • Live-row evidence is gateable before checksum comparison: scripts/check_live_projection_rows.py --rows live-rows.json validates schema_version, adapter identity, tenant/org/execution/projection scope, exported_at, complete canonical row surfaces, per-surface row_counts, and the canonical rows_checksum. scripts/build_live_projection_checksums.py also rejects nested row artifacts whose row_counts or rows_checksum drift from the actual rows, so a parity pass cannot be based on mutated live-row evidence. Validation manifests that include live-row evidence must contain live_rows_integrity before live_checksums; scripts/check_replay_validation_manifest.py enforces that order when artifacts.live_rows, config.live_rows, or config.export_live_rows_postgres=true is present.
  • A complete local-kind or GKE parity run can now produce all validation artifacts from one command when the current reference storage adapter is available:
python scripts/run_replay_validation.py \
--base-url "$NOETL_BASE_URL" \
--execution-id "$EXECUTION_ID" \
--tenant-id "$TENANT_ID" \
--organization-id "$ORGANIZATION_ID" \
--export-live-rows-postgres \
--resolve-payloads \
--output-dir "$ARTIFACT_DIR" \
--report-output "$ARTIFACT_DIR/validation-$EXECUTION_ID.json" \
--artifact-index-output "$ARTIFACT_DIR/artifact-index-$EXECUTION_ID.json"

That runner writes replay-$EXECUTION_ID.json, live-rows-$EXECUTION_ID.json, live-checksums-$EXECUTION_ID.json, validation-$EXECUTION_ID.json, and artifact-index-$EXECUTION_ID.json. The manifest records both the source live-row export and the derived live checksum artifact so --check-artifacts proves the full evidence chain exists. When --artifact-index-output is used, the runner records the artifact_index step in the final manifest before packaging, so the artifact index hashes the same manifest operators inspect. The artifact index records SHA-256 and size for each evidence file, and --check enforces exactly one manifest, replay, and report role plus paired live_rows / live_checksums roles when live evidence is present. Paths inside the index are relative to the artifact-index directory when the files live in the same bundle, while artifacts outside the bundle remain absolute; the checker validates both the path_base marker and the top-level manifest pointer before checking entry digests. The manifest checker also validates the referenced artifact index under --check-artifacts, verifies that the index points back to the same manifest, and requires artifact_index to be the final manifest step. This means the whole bundle can be moved out of the cluster and rechecked from any working directory for completeness as well as drift:

python scripts/check_replay_validation_manifest.py \
--manifest "$ARTIFACT_DIR/validation-$EXECUTION_ID.json" \
--check-artifacts

python scripts/package_replay_validation_artifacts.py \
--check "$ARTIFACT_DIR/artifact-index-$EXECUTION_ID.json"

python scripts/check_replay_validation_bundle.py \
--manifest "$ARTIFACT_DIR/validation-$EXECUTION_ID.json"

For non-Postgres stores or offline adapter development, export the same live-row JSON shape separately and pass it to the runner:

python scripts/export_live_projection_rows_postgres.py \
--execution-id "$EXECUTION_ID" \
--tenant-id "$TENANT_ID" \
--organization-id "$ORGANIZATION_ID" \
--output "$ARTIFACT_DIR/live-rows-$EXECUTION_ID.json"

python scripts/run_replay_validation.py \
--base-url "$NOETL_BASE_URL" \
--execution-id "$EXECUTION_ID" \
--tenant-id "$TENANT_ID" \
--organization-id "$ORGANIZATION_ID" \
--live-rows "$ARTIFACT_DIR/live-rows-$EXECUTION_ID.json" \
--resolve-payloads \
--output-dir "$ARTIFACT_DIR" \
--report-output "$ARTIFACT_DIR/validation-$EXECUTION_ID.json"

python scripts/check_replay_validation_manifest.py \
--manifest "$ARTIFACT_DIR/validation-$EXECUTION_ID.json" \
--check-artifacts
  • Replay validation now starts with state-report integrity: scripts/check_replay_state_report.py --report replay.json requires the canonical replay envelope, validates required string fields plus strict JSON integer fields, enforces checksum_algorithm=sha256, requires the complete canonical projection_checksums surface set, and recomputes the replay state's top-level checksum plus projection checksum bundles from the replay JSON. Mutated state, incomplete envelope data, malformed field types or snapshot metadata, missing/unknown checksum surfaces, or surface-level drift fail before parity or payload gates run.
  • Replay parity validation now treats bundle shape and checksum shape as part of the contract: scripts/check_replay_parity_report.py requires the complete canonical surface set (execution, stages, frames, commands, business_objects, loops) for replayed and live bundles, rejects unknown surfaces, and requires lowercase SHA-256 hex values unless --allow-invalid-checksum-shape is used for a legacy ad-hoc report.
  • Payload-resolution validation now treats row shape, resolved payload checksums, and summary shape as part of the replay contract: scripts/check_replay_payload_resolution_report.py requires every payload_resolution entry and its resolution value to be objects, requires lowercase SHA-256 hex values for every resolved payload checksum, and validates supplied payload_resolution_summary objects with exact fields, strict non-negative integer counters, boolean all_resolved, and lowercase SHA-256 checksum shape before accepting the summary.
  • Replay state-report validation now also checks registry digest shape: upcaster_registry_digest and any snapshot registry digest must be lowercase SHA-256 hex values before replay/snapshot compatibility is accepted.
  • Time-cutoff replay can now seed from projection_snapshot by first resolving as_of_time to the maximum visible event id at or before that timestamp, then applying the same version <= cutoff_event_id guard used by position-based replay. This keeps snapshots as accelerators only; events after the snapshot version are still folded through the requested cutoff.
  • Replay snapshot compatibility now checks upcaster_registry_digest before seeding. If a candidate snapshot was produced by a different registry, replay ignores it and reloads from canonical events instead of mixing reducer/schema versions.
  • Closed completed Phase 1 cleanup trackers after merged validation: noetl/noetl#443 stage terminal projection, noetl/noetl#444 distributed workload override handling via merged CLI fix, noetl/noetl#446 frame recovery hardening, noetl/noetl#447 frame payload replay parity, and noetl/noetl#448 loop supervision event-id compatibility. Active hardening PRs are noetl/noetl#453 for the frame claim hot path and noetl/noetl#454 for execution status projection stability.
  • Full repository pytest collection currently has legacy collection blockers unrelated to Phase 0; tracked by noetl/noetl#440.

Deliverable: dashboard URL + memory entry with baseline numbers, plus a replay parity report for the baseline run. No code change to hot paths.

Phase 1 — Frame-shaped cursor loops (2 weeks)

Status: implementation shipped; live revalidation pending in local kind/GKE after the current workspace Podman issue is resolved.

Goal: collapse N single-row cursor claims into N/50 multi-row frame claims and allow selected steps to run the declared task pipeline once per frame.

  • Extend cursor_worker.py to enforce the frame_policy payload now emitted alongside the existing cursor spec. The first implementation is in noetl/noetl#435: bounded claim windows are processed in-process and captured as Arrow IPC frame refs.
  • New POST /api/stages/{stage_id}/frames/claim endpoint; under the hood it calls existing claim_next_loop_indices with LIMIT = frame_policy.size.
  • Worker either iterates the claimed rows in-process (frame.process: row) or executes the task pipeline once with frame.rows (frame.process: frame). Both paths serialize the frame row window as Arrow IPC through TempStore. This pulls a narrow Tier 1.5 slice forward because serialization correctness is required before the frame API can be the sole path.
  • Worker commits the frame with one event per frame instead of one per row.
  • Migrate test_pft_flow_v2.yaml to opt in via loop.spec.frame: on each mode: cursor step. The high-volume PFT steps use process: frame to batch HTTP calls and Postgres writes from the declared playbook, rather than hiding the loop in Python.
  • loop.spec.max_in_flight now accepts either a positive integer or a renderable expression such as {{ workload.pft_http_concurrency }}. The rendered value is re-validated as a positive integer before collection/cursor dispatch and cursor recovery paths reuse the same resolver. This closes the catalog-registration workaround that previously forced PFT v2 to hard-code the resolved concurrency value.

Verification:

  • Total command.* count drops from ~150k to < 20k on PFT v2.
  • Wall time should drop versus the default row-processing baseline. The 2026-05-18 local kind validations improved the full PFT v2 run from ~31m21s to ~4m30s-~8m08s, with the latest clean lock-order validation favoring correctness over optimistic concurrent frame event writes.
  • Replay parity stays green for frame state, loop progress, execution status, and frame-mode payload references. The parity gate compares live and replayed frame projection checksums, including Arrow IPC rows_ref digest and schema metadata.

Phase 2 — Decentralized projection (2 weeks)

Status: implementation shipped behind opt-in deployment/config; live mirrored-event projector validation against PFT v2 remains pending.

Goal: extract projection from server, run as a StatefulSet, scale independently.

  • New noetl-projector binary entrypoint reusing the existing projection worker code.
  • Helm chart adds the StatefulSet, NATS durable consumer per replica, projection-store-only DB user.
  • Remove the in-process projection loop from the server. Server now appends canonical events and mirrors frame lifecycle, direct event ingestion, async batch ingestion, execution-route command/cancel events, and generated command events to the distribution stream.
  • Add per-shard projection lag/checkpoint metrics from durable projection metadata. Initial gauges are in noetl/noetl#458; dashboard wiring and live mirrored-event validation remain. Shard-skip counters from noetl/noetl#536 distinguish normal cross-shard traffic from malformed/unshardable envelopes during live mirrored-event validation. Stale-write counters from noetl/noetl#537 distinguish harmless duplicate/out-of-order delivery from missing projector output. Invalid projector shard settings now fail at startup, and env/CLI loaders no longer mask bad values, so a misconfigured StatefulSet replica count or shard id cannot quietly become a no-op projector. Metrics labels include the resolved shard index/count and NATS stream/subject to make that topology auditable during local-kind and GKE validation. Decode-error counters make malformed mirrored payloads visible even when the subscriber rejects the message before projection begins, while projection-error counters isolate failures after a notification reaches handle_notification; the aggregate error counter covers both. Terminal action counters show ACK throughput, immediate and delayed NAK pressure, and TERM drops before retry attempts re-enter the projector. Last-action and last-batch gauges make the most recent projector decision inspectable without replaying logs, and /summary plus scripts/check_projector_metrics_summary.py expose the same action, batch, error, and lag state as stable validation payloads. run_projector_phase2_validation.py is the preferred phase runner; use render_projector_phase2_command.py to produce a repeatable command for the current projector replica URLs, live-row/live-checksum source, and parity requirement.
  • noetl.outbox is now the canonical distribution bridge for server, frame, execution, broker, command-claim, backend-neutral event-store, and DSL executor events. The standalone python -m noetl.outbox publisher gives deferred rows a traffic-independent retry path, and projector subscribers decode both JSON envelopes and Arrow Feather payload bytes.

Verification:

  • Postgres pool depth high-watermark drops by ~3× on PFT v2 (writer fan-out).
  • Server CPU stops spiking during MDS bursts.

Phase 3 — Arrow IPC Tier 1.5 (3 weeks)

Status: Python implementation shipped for cursor/frame payloads; colocated projector/worker hit-ratio validation remains pending.

Goal: add zero-copy data plane for colocated workers and projectors.

  • Add pyarrow and arrow-rs (arrow, arrow-ipc) to dependencies. pyarrow is required for Python cursor/frame serialization; a Rust storage crate is not present in this repository yet, so the Rust parity item remains future adapter work.
  • Implement Tier 1.5 in result_store.py (Python). TempStore.put_ipc_bytes writes durable bytes first, then admits an optional same-node IPC hint.
  • Extend PayloadReference with optional IpcHint, including node_id so foreign-node hints become deterministic cache misses.
  • Producer worker writes RecordBatch bytes to durable storage plus best-effort shared memory; consumers use TempStore.resolve() / get_ipc_bytes() to try IPC first and fall back to durable bytes before decoding rows at the consumer boundary.
  • Per-node IPC budget + lease expiry sweep are implemented in ArrowIpcSharedMemoryCache; broader node-broker/multi-process coordination remains future work.
  • Metrics: noetl_storage_ipc_read_hit_ratio and noetl_storage_ipc_* counters per server/worker process; dashboard rollups should group by worker pool, runtime, node, and tenant when those labels become available.

Verification:

  • tier15_hit_ratio > 60% when projector is colocated with worker.
  • End-to-end PFT v2 wall time targets ÷2 vs Phase 0 baseline.

Phase 4 — Cloud OS surfaces (3 weeks)

Status: foundational identity pieces shipped; a first opt-in backlog scaler is available in the Helm chart; multi-cluster surfaces remain open.

Goal: lift the runtime from "well-behaved on one cluster" to "addressable, schedulable, autoscalable across clusters."

  • Implement unified resource locator across all subsystems. The core parser/builder is now present and adopted for frame durable-reference validation plus compact result-ref normalization. Worker metrics, frame claims, command claims, frame-dispatch metadata, and worker-reported runtime events now share the same topology helper; remaining work is broader server-side envelope injection and scheduler enforcement of locality hints.
  • StatefulSet identity for workers (not just projectors).
  • KEDA scaler with frame backlog signal. Initial Helm implementation is optional and reads noetl_frame_backlog_total; follow-up work should enrich the metric with tenant/stage-aware labels once those labels are present.
  • Multi-cluster supercluster docs + an ops playbook to provision two GKE regions feeding the same NATS supercluster. Initial opt-in playbook exists for rendering/deploying a gateway-enabled NATS member; GKE-specific load-balancer/TLS defaults remain pending.
  • Topology-aware scheduling via locality hint on claim. The claim payload and frame.dispatched event metadata now carry the hint plus replayable placement evaluation when source locality exists; selection enforcement remains pending.

Phase 5 — Pluggable event store / payload store / projection store (rolling, separate PRs per adapter)

Goal: every backend class in §§ 8–9 has a working adapter, language-paired, behind a feature flag. Product choices stay deployment-specific; the compliance contract is architectural.

Order (mirroring the existing distributed plan):

  1. NATS JetStream event store adapter (refactor existing to fit the new port). Python + Rust.
  2. S3 payload store adapter (refactor existing to fit the new port). Python + Rust.
  3. Postgres projection store adapter (refactor existing). Python + Rust.
  4. Kafka event store adapter.
  5. GCS payload store adapter.
  6. Cloud key-value/document projection adapter.
  7. Columnar analytical projection adapter.
  8. Streaming materialized-view adapter.
  9. … then the remainder in cloud-priority order.

Every adapter ships with:

  • Compliance test suite (language-agnostic spec, run against both implementations).
  • Replay parity suite for canonical events, payload references, snapshots, and projection checksums.
  • Docker-compose entry for local development.
  • Cloud-provisioning ops playbook in repos/ops/automation/.

Phase 6 — Stage planner for fan-out / reduce (4 weeks)

Goal: extend the stage/frame model from loops to fan-out and reduce, completing the map-reduce shape.

  • Stage kind='fanout': explodes a single input into N partitions, each handed to a frame.
  • Stage kind='reduce': consumes M partitions, emits one output. Reduce frame waits on partition availability events instead of cursor rows.
  • Replace the design in distributed_fanout_mode_spec.md with this materialized version.

13. Risks and open questions

  • Tier 1.5 GC under crash. A producer worker that crashes after writing shm but before committing the frame leaves shm regions that no one will unlink. Mitigation: per-node shm_unlink sweep on worker start (scans /dev/shm/noetl-* and unlinks regions older than tier_15_grace_seconds).
  • NATS supercluster cost. Cross-region replication of every event stream is not free. Mitigation: opt-in per execution; default is single-region.
  • Frame size tuning. A frame too big means more work lost on crash; too small means coordination dominates. Mitigation: start with the §5.2 default, expose per-step override, measure with the §4 dashboard. Operators should choose max_rows from lost-work tolerance: a 50-row frame means a crash can replay up to 50 claimed cursor rows as one unit; high-cost or externally side-effecting steps should use smaller frames until idempotency is proven. Increase lease_seconds to at least the p99 frame runtime plus one heartbeat interval, and keep max_attempts low enough that repeated whole-frame failures page an operator instead of silently cycling.
  • Reduce-side back-pressure. A reduce stage that is slower than its upstream fanout fills the projection store inbox. Mitigation: reuse the existing max_inflight concept at the stage level, not the worker level.
  • Multi-process per pod. Some MCPs prefer multiple processes. Tier 1.5 then needs a node-local broker. Out of scope for v1; revisit if a real MCP demands it.

14. Source code anchors (current state)

For implementers. Existing code that this spec extends, not replaces:

ConcernFileKey symbol
Loop expansion (parallel mode)noetl/core/dsl/engine/executor/commands.py_create_command_for_step
Cursor dispatchnoetl/core/dsl/engine/executor/transitions.py_issue_cursor_loop_commands
Cursor workernoetl/worker/cursor_worker.pyexecute_cursor_worker
Worker tool dispatchnoetl/worker/nats_worker.py_execute_tool
TempStore tiersnoetl/core/storage/result_store.pyResultStore.put / resolve
TempStore tier enumnoetl/core/storage/models.pyStoreTier
NATS clientnoetl/core/messaging/nats_client.pyNatsClient.connect
NATS K/V cachenoetl/core/cache/nats_kv.pyNatsKv
Claim APInoetl/server/api/core/commands.pyclaim_command
Command reapernoetl/server/command_reaper.pyCommandReaper
Schema DDLnoetl/database/ddl/postgres/schema_ddl.sqlnoetl.event, noetl.command

PFT v2 driver: repos/e2e/fixtures/playbooks/pft_flow_test/test_pft_flow_v2.yaml — six cursor steps, ~120k HTTP calls per execution, the canonical benchmark for every phase above.


15. Out of scope (deferred to future specs)

  • Replacing the DSL with a different language. The DSL is fine; this spec rewires the execution layer underneath it.
  • Replacing noetl.event with a different source of truth. Event log stays Postgres-backed by default; the event store port lets other backends mirror it.
  • Replacing NoETL's existing MCP server architecture. Workers and MCPs are orthogonal.
  • A new container orchestrator. We assume Kubernetes; the locator scheme is friendly to other orchestrators but they are not a v1 target.

16. Decision log (what changed vs the original event-store-design-prompt)

The original event-store-design-prompt.md (now archived) framed the problem as "add an event store abstraction layer." That framing is necessary but not sufficient. This revision changes the framing:

  1. Event sourcing becomes the system kernel. The goal is not just to swap storage backends; it is to reproduce tenant/org system state at a requested time from canonical events plus immutable payloads.
  2. Shared memory becomes a product advantage, not just a cache. The runtime uses Arrow IPC, local disk, distributed K/V, materialized views, and distributed indexes as a rebuildable shared-state fabric around the event log.
  3. Worker-side loop interpretation remains the dominant immediate cost reduction. The PFT v2 baseline shows that even a perfect event store cannot save us from server-side per-row coordination overhead.
  4. A specific GC + admission story for Tier 1.5 (budget, grace, unlink sweep). The original handwaved this; we have to make it concrete or the shm region count will run away.
  5. A staged additive rollout with frame-shaped cursor loops in Phase 1, before Arrow IPC and before pluggable backends. This lets us ship a measurable performance win in two weeks rather than waiting for a multi-quarter abstraction overhaul.

Other elements (three-layer model, backend adapters, content-addressed payloads, configuration schema) carry over from the original with edits to match the existing TempStore and projection-worker code that has shipped since the original was drafted. Product-specific analytical or streaming engines are intentionally not named as dependencies; NoETL adopts the underlying algorithms and contracts.


17. Implementation Roadmap

This section complements §12 (the phased refactor plan) with a phase dependency graph, per-phase acceptance checklist, replay parity gate, and operational readiness criteria. Section 12 defines what each phase builds; this section defines when each phase is ready to ship and what to do if it needs to be rolled back.

17.1 Phase dependency graph

Phases 1 and 2 can run in parallel once the Phase 0 baseline is captured. Phase 3 requires both because it depends on frame-shaped Arrow output (Phase 1) and colocated projectors (Phase 2).

17.2 Pre-flight checklist (before Phase 0 begins)

These conditions must be true before work begins on Phase 0:

  • noetl.stage and noetl.frame DDL reviewed and approved by database owner.
  • Grafana / VictoriaMetrics dashboards provisioned with the §4 metric tile set (empty panels are acceptable at this stage).
  • A reproducible trigger for a full PFT v2 run exists as an ops runbook entry and has been tested by a team member who did not write it.
  • NOETL_TENANT_ID, NOETL_ORGANIZATION_ID, NOETL_CLUSTER_ID env vars injectable via Helm values in the target cluster.
  • Replay harness skeleton (even a no-op stub) runs to completion without error against the current noetl.event table.
  • Schema registry mount path (/etc/noetl/schemas) is provisionable from a ConfigMap.
  • On-call alert routing confirmed for frame_reaper_republish_rate and pg_pool_depth_highwatermark.

17.3 Per-phase acceptance criteria

PhaseShip gateReplay parity requiredRollback procedure
0 — InstrumentationAll §4 metrics non-null on dashboard from a complete PFT v2 run; baseline values pinned to memoryReplay harness exits 0; checksum report attached to memory entryDrop noetl.stage / noetl.frame via inverse Alembic migration; zero runtime impact
1 — Frame loopsTotal command.* count < 20k on PFT v2; wall time not regressed vs Phase 0 baselineFrame state, loop progress, and execution status checksums match live projectionSet frame_policy: null on PFT v2 steps; disable frame claim endpoint; legacy command path unchanged
2 — Decentralized projectionPostgres pool depth < 30 sustained during MDS burst; per-shard projection lag metric visible on dashboardProjection checksums match single-writer baselineRe-enable in-process server projection loop; scale noetl-projector StatefulSet to 0
3 — Arrow IPC Tier 1.5tier15_hit_ratio > 60% on colocated consumer; PFT v2 wall time ≤ Phase 0 baseline ÷ 2Payload digest and projection checksums unchanged vs Phase 2Set tier_15_node_budget_mb: 0; admission control skips Tier 1.5 with no data-path change
4 — Cloud OS surfacesUnified resource locator present on all event envelopes; KEDA scaler responds to frame backlog signal within 30 sLocator fields do not alter event content digest; replay checksum unchangedDisable KEDA scaler; revert locator injection to no-op middleware; locator fields are additive
5 — Pluggable adaptersEach adapter passes compliance + replay parity suite; Docker-compose local dev environment validated end-to-endPython and Rust parity corpus checksums identical for each adapter under testRevert backend config values to previous setting; no code removal required
6 — Fan-out / reduceStage kind='fanout' and kind='reduce' pass all PFT v2 fan-out phases; distributed_fanout_mode_spec.md supersededReduce-frame output checksums match single-partition baselineRevert fan-out DSL steps to mode: parallel; stage/frame tables remain intact

17.4 Replay parity release gate

Every phase after Phase 0 must pass all of the following before any merge to main:

Run the fast deterministic local gate first:

scripts/check_replay_release_gate.sh
[ ] Event envelope validation passes for 100% of events emitted in the phase's PFT v2 run.
[ ] Every snapshot records: event position, projection code version, upcaster versions,
payload digest set, tenant encryption context, and deterministic fold checksum.
[ ] Replay harness runs start-to-finish against the phase's full event log and payload refs.
[ ] Execution state checksum (frame rows committed, loop progress) matches live projection.
[ ] At least one configured business projection type checksum matches live projection.
[ ] Python and Rust replay paths produce identical fold checksums on the golden corpus.
[ ] No event references an IpcHint path as its only payload copy
(grep for events missing a Tier 3 payload_ref.uri).
[ ] Idempotency key present on 100% of externally retried transitions in the phase's events.

Replay parity failures block the phase merge. They are not deferred to the next phase.

17.5 Operational readiness (per phase shipped to production)

Before enabling any phase for a production tenant:

  • Runbook exists in repos/ops/automation/ covering: how to trigger a replay, how to compare checksums, how to roll back the phase, and how to scale the projector StatefulSet.
  • On-call alerts live for frame_reaper_republish_rate > 0.1/s and tier15_budget_exhaustion_total > 0.
  • Frame budget defaults reviewed against the tenant's actual execution profile (not only PFT v2).
  • Tenant encryption context set and verified for all payload refs in the phase's event schema version.
  • Retention and TTL policy for Tier 3 objects confirmed with legal / compliance for the tenant.
  • Cross-tenant projection isolation verified: no query returns rows from a different (tenant_id, organization_id) pair without explicit operator permission.