NoETL Distributed Runtime + Event-Sourced Shared Memory Spec
Status: staged implementation in progress.
Owner: NoETL core (repos/noetl).
Implementation tracker:
-
Umbrella:
noetl/noetl#434. -
Phase 0 PR:
noetl/noetl#435. -
Phase 1 frame execution:
noetl/noetl#436. -
Projectors:
noetl/noetl#437. -
Shared payload cache / Arrow IPC hints:
noetl/noetl#438. -
Event/projection store ports:
noetl/noetl#439. -
Replay regression / serialization gates:
noetl/noetl#440. -
Docs alignment:
noetl/noetl#441. -
Batch event mirror:
noetl/noetl#460. -
Remaining event append mirror audit:
noetl/noetl#461. -
Transactional outbox:
noetl/noetl#464. Companion specs that this revision builds on (do not re-spec): -
noetl_async_sharded_architecture.md— sharded projection workers, epoch barriers. -
noetl_cursor_loop_design.md— cursor-driven worker loops (already in production for PFT v2). -
noetl_data_plane_architecture.md— control/data plane split, reference-only event envelopes, TempStore tiers. -
noetl_distributed_processing_plan.md— phased rollout: P0 shipped, P1–P3 partial. -
noetl_pft_performance_optimization.md— PFT-specific tuning. -
distributed_fanout_mode_spec.md— designed but not shipped; this spec absorbs it. -
nats_kv_distributed_cache.md— NATS K/V scope.
Not in scope here: DSL surface changes or the playbook authoring guide. Event-sourcing invariants are in scope because they are the contract that lets the distributed runtime reproduce system state at a requested point in time.
Latest implementation checkpoint:
- Local kind full PFT v2 execution
629193644754337792completed on 2026-05-18 in 487.855s (~8m08s) usingframe.process: frame, per-stage frame-stream lock ordering, and the full frame lifecycle (frame.dispatched -> frame.started -> frame.committed). - The previous local kind default-row-frame baseline was 1880.739s (~31m21s); the row-concurrency experiment was 2002.841s (~33m23s).
- The full lock-order validation produced 5,496 committed frames, 49.94 average rows/frame, 50 max rows/frame, and 5,496/5,496 frames with claim, start, and terminal event links.
- Validation passed for all 10 facilities: all patient domains were 1000/1000 and MDS was 224,443/224,443.
- The canonical PFT v2 fixture shape remains preserved for regression coverage. Page-continuation fidelity is covered by a companion fixture,
fixtures/playbooks/pft_flow_test/test_pft_page_continuation.yaml, which followspaging.hasMorewith one HTTP request per patient/data-type/page record instead of changing the canonical benchmark. - 2026-05-18 hardening update:
noetl/noetl#453removes the remaining lazy frame-mint advisory lock from the cursor claim hot path by using an indexed stage/worker-slot/frame-index claim key withINSERT ... ON CONFLICT DO NOTHING;noetl/noetl#454fixes execution-list status flicker when a late non-terminal projection event follows an already-terminalnoetl.execution.status. - 2026-05-18 serialization gate update:
noetl/noetl#456rejects frame commitoutput_refenvelopes that advertise an IPC/shared-memory hint without any durableref,uri, orlocator, preserving replay authority after cache GC. - 2026-05-18 event mirror update:
noetl/noetl#460wires the async batch-ingestion path into the same opt-in event mirror used by direct/api/eventsand frame lifecycle routes. Batch item events,batch.accepted, batch status events, and batch-generatedcommand.issuedenvelopes now carry command, stage, frame, and parent lineage into the distribution stream after the canonical database commit. - 2026-05-18 execution-route mirror update:
noetl/noetl#462mirrors/api/executegeneratedcommand.issuedenvelopes and/api/executions/{execution_id}/cancelexecution.cancelledevents after commit. Remaining direct event append paths are tracked bynoetl/noetl#461. - 2026-05-18 outbox pivot:
noetl/noetl#464addsnoetl.outboxas the preferred distribution bridge. Event writers should enqueue mirrored envelopes in the same transaction asnoetl.event; a publisher drains committed outbox rows to NATS. The table stores JSONB for operator visibility plus pre-encoded payload bytes and a codec field so the drain path can publish bytes without reserializing JSON when the subject is known. - 2026-05-18 core event outbox update:
noetl/noetl#466moves direct/api/eventslifecycle envelopes and generatedcommand.issuedenvelopes onto the same-transaction outbox path. The route keeps HTTP semantics unchanged, but distribution intent is now durable before any publish attempt. - 2026-05-18 batch event outbox update:
noetl/noetl#467applies the same boundary to/api/events/batch: item events,batch.accepted, batch status events, and batch-generatedcommand.issuedchunks enqueue tonoetl.outboxbefore commit, then drain committed rows. - 2026-05-18 execution event outbox update:
noetl/noetl#468moves/api/executeinitialcommand.issuedenvelopes and/api/executions/{execution_id}/cancelexecution.cancelledenvelopes to same-transaction outbox enqueue. - 2026-05-18 auto-resume outbox update:
noetl/noetl#469moves recovery-generatedexecution.cancelledenvelopes to the same-transaction outbox path. - 2026-05-18 broker event outbox update:
noetl/noetl#470moves generic brokerEventService.emit_eventappends to same-transaction outbox enqueue. - 2026-05-19 command-claim outbox update:
noetl/noetl#471movescommand.claimedworker lifecycle events to same-transaction outbox enqueue. - 2026-05-19 event-store outbox update:
noetl/noetl#472moves backend-neutralPostgresEventStore.appendwrites to same-transaction outbox enqueue. - 2026-05-19 DSL executor outbox update:
noetl/noetl#473moves engine-owned lifecycle,stage.opened, andstage.closedevents to same-transaction outbox enqueue with explicit caller-owned transaction handling. - 2026-05-19 projector codec update:
noetl/noetl#479lets projector NATS consumers decode both JSON envelopes and Arrow Feather payloads, sonoetl.outboxcan keep the low-serialization bytes path without making projector workers JSON-only. - 2026-05-19 outbox publisher update:
noetl/noetl#480addspython -m noetl.outboxas a standalone publisher loop. Route-local drains remain a latency optimization, while the worker gives failed or deferred outbox rows a traffic-independent retry path. - 2026-05-20 autoscaling surface update:
noetl/ops#107adds an optional KEDA Prometheus scaler fornoetl-workerthat reads the NoETL runtime backlog metricnoetl_frame_backlog_total. The scaler consumes the NoETL metric contract, not the current storage implementation, so the default chart still renders the fixed replica count / existing HPA behavior unless explicitly enabled. - 2026-05-20 tenant backlog metric update:
noetl/noetl#491addsnoetl_frame_backlog_detail_totalwith tenant, organization, stage-kind, and status labels. KEDA keeps using the globalnoetl_frame_backlog_totalseries, while policy engines and dashboards can inspect tenant/org pressure without coupling to a storage table. - 2026-05-20 topology label update:
noetl/noetl#492addscluster_id,region, andzonelabels to worker-local metrics when the environment provides them;noetl/ops#108exposes matching Helm values. This gives operators a locality signal for IPC/cache hit analysis before scheduler claim hints become enforceable. - 2026-05-20 topology helper update:
noetl/noetl#494centralizes worker locality extraction, canonical worker-locator construction, and locality-distance comparison (node,zone,region,cluster,any) innoetl.core.runtime.topology, so worker metrics, frame claims, frame-dispatch metadata, and scheduler enforcement use the same identity rules. - 2026-05-20 placement evaluation update:
noetl/noetl#495recordssource_localityand placement evaluation (distance,max_distance,within_max_distance) onframe.dispatchedmetadata when a frame cursor carriessource_locality/producer_locality. This makes locality decisions replayable before frame selection is changed. - 2026-05-20 runtime-event topology update:
noetl/noetl#496enriches worker-reported runtime events withmeta.localityand a canonicalmeta.worker_locatorfrom the shared topology helper. Explicit caller-provided locality or locator metadata remains authoritative, so event replay can distinguish observed worker identity from producer-owned placement context. - 2026-05-20 command-claim topology update:
noetl/noetl#497lets command claim requests carry optional worker locality and recordslocality,worker_locator,source_locality, and placement evaluation oncommand.claimedmetadata. Existing clients remain compatible, while topology-aware workers make command ownership replayable by placement distance. - 2026-05-20 replay command projection update:
noetl/noetl#498foldscommand.issued,command.claimed,command.started, and terminal command events into the deterministic replay state. The reconstructed command view now carries stage/frame linkage, parent command id, worker id, locality,worker_locator, source locality, and placement evaluation, closing the command-ownership gap in point-in-time replay. - 2026-05-20 business-object replay update:
noetl/noetl#499adds a genericbusiness_objectsreplay projection for events that explicitly identify a domain object throughaggregate_type=business_objector business-object metadata. The fold records object identity, status, version, event lineage, attributes, and payload references without introducing storage-specific logic. - 2026-05-20 command replay parity update:
noetl/noetl#500adds normalized live/replayed command projection rows andcommand_projection_checksum, so release gates can compare command ownership, lifecycle event links, worker topology, and placement metadata the same way frame replay parity compares frame rows. - 2026-05-20 business-object replay parity update:
noetl/noetl#501adds normalized live/replayed business-object projection rows andbusiness_object_projection_checksum, covering object identity, lifecycle/event lineage, payload summaries, and folded attributes. - 2026-05-20 loop replay parity update:
noetl/noetl#502adds normalized live/replayed loop projection rows andloop_projection_checksum, covering loop totals, done/failed counters, completion state, and last event linkage. - 2026-05-20 execution replay parity update:
noetl/noetl#503adds normalized live/replayed execution projection rows andexecution_projection_checksum, covering tenant/org scope, execution status, last node/event, event count, payload-reference summary, projection name, and upcaster registry digest. - 2026-05-20 stage replay parity update:
noetl/noetl#504adds normalized live/replayed stage projection rows andstage_projection_checksum, covering stage status, kind, parent/loop linkage, open/close event ids, frame/row/emitted/failed counters, and last event linkage. - 2026-05-20 replay checksum bundle update:
noetl/noetl#505addsreplay_projection_checksum_bundle(state)and returns it asprojection_checksumsfrom replay state responses. Replay-state projector records also copy the bundle into metadata for cheap checkpoint inspection. The single deterministic checksum map covers execution, stages, frames, commands, business objects, and loops, keeping release gates and operators on one replay parity contract while each surface remains independently comparable. The same PR adds storage-neutral live-bundle and parity-report helpers plus an offline JSON comparator for future adapters and GKE validation, without depending on database routines. - 2026-05-20 replay event-reader port update:
noetl/noetl#506introducesReplayEventReaderas the storage-neutral read port for canonical replay inputs. The current SQL reads move behindPostgresReplayEventReader, soReplayServicefolds events and snapshots through configurable or per-call adapters instead of owning storage-specific routines. - 2026-05-20 replay payload resolver update:
noetl/noetl#507introducesReplayPayloadResolveras the storage-neutral read port for immutable replay payload references. Replay state remains cheap by default; callers opt in withresolve_payloads=trueto receive bounded checksum/shape summaries rather than payload bodies. Duplicate payload references are resolved once per replay call and reported for every lineage occurrence, andpayload_resolution_summarygives total/resolved/unresolved/unique-ref counts plus a deterministic checksum. - 2026-05-20 replay payload-resolution gate update:
noetl/noetl#508addsscripts/check_replay_payload_resolution_report.py, an offline validator for replay-state JSON that recomputespayload_resolution_summary, rejects summary checksum drift, and fails when any payload reference is unresolved. - 2026-05-20 replay state report fetcher update:
noetl/noetl#509addsscripts/fetch_replay_state_report.py, a small API fetcher for/api/replay/stateJSON with tenant/org scope, cutoffs, and optionalresolve_payloads=true, so local kind and GKE validation can produce the report consumed by offline parity/payload gates. - 2026-05-20 replay validation runner update:
noetl/noetl#510addsscripts/run_replay_validation.py, a single local-kind/GKE smoke command that fetches replay state and runs the offline projection parity and payload-resolution gates, emitting a JSON run report with per-step command output. - 2026-05-20 replay upcaster registry injection update:
noetl/noetl#511makesReplayServiceuse a configurable or per-callEventUpcasterRegistryinstead of hard-coding the process default. Validation tools and future storage adapters can now replay with an explicit schema registry and compare its digest without changing endpoint behavior. - 2026-05-20 replay time-cutoff snapshot update:
noetl/noetl#512lets the reference replay reader use replay-state snapshots foras_of_timerequests when the snapshot version is at or before the resolved cutoff event id, avoiding unnecessary full-history folds while keeping theReplayEventReaderport storage-neutral. - 2026-05-20 replay snapshot registry guard update:
noetl/noetl#513makesReplayServiceignore snapshot seeds whose recorded upcaster registry digest does not match the registry used for the current fold. Snapshots remain accelerators only; mismatched schema reducers fall back to canonical events. - 2026-05-20 replay state-integrity gate update:
noetl/noetl#514addsscripts/check_replay_state_report.pyand wires it intoscripts/run_replay_validation.py, so smoke runs validate replay JSON structure, projection checksum drift, and snapshot metadata before comparing against live projection checksums. - 2026-05-20 replay state-checksum gate update:
noetl/noetl#515extends the state-integrity gate to recompute the replay state's top-level checksum, catching mutated state reports before projection parity or payload gates run. - 2026-05-20 replay parity checksum-shape update:
noetl/noetl#516makesscripts/check_replay_parity_report.pyreject replay/live checksum bundles with non-lowercase-SHA-256 values by default, while keeping an explicit legacy escape hatch for ad-hoc reports. - 2026-05-20 replay state-envelope gate update:
noetl/noetl#517makesscripts/check_replay_state_report.pyrequire canonical replay-state envelope fields andchecksum_algorithm=sha256, so incomplete state reports fail before parity comparison. - 2026-05-20 replay payload checksum-shape update:
noetl/noetl#518makesscripts/check_replay_payload_resolution_report.pyreject resolved payload checksums that are not lowercase SHA-256 hex digests, with ref/scope diagnostics for the failing payload. - 2026-05-20 replay registry digest-shape update:
noetl/noetl#519makesscripts/check_replay_state_report.pyrequireupcaster_registry_digestand snapshot registry digests to be lowercase SHA-256 hex values before compatibility checks run. - 2026-05-20 replay state type/range gate update:
noetl/noetl#520makesscripts/check_replay_state_report.pyvalidate canonical replay envelope string fields, non-negative integer fields, and snapshot version typing before checksum recomputation. Malformed reports now fail as structured validation output instead of crashing the gate. - 2026-05-20 replay payload row-shape gate update:
noetl/noetl#521makesscripts/check_replay_payload_resolution_report.pyreject non-objectpayload_resolutionrows and non-object rowresolutionvalues before checksum and summary validation, so malformed payload evidence cannot be silently filtered out. - 2026-05-20 replay strict integer-shape update:
noetl/noetl#522makesscripts/check_replay_state_report.pyrequire replay state counters, event ids, and snapshot versions to be JSON integers instead of coercible strings, keeping the serialized replay contract independent of Python-specific type coercion. - 2026-05-20 replay parity required-surface update:
noetl/noetl#523makesscripts/check_replay_parity_report.pyrequire the complete canonical projection checksum bundle (execution,stages,frames,commands,business_objects,loops) on both replayed and live inputs, and reject unknown extra surfaces. - 2026-05-20 replay state projection-surface update:
noetl/noetl#524makesscripts/check_replay_state_report.pyrequire the same complete canonicalprojection_checksumssurface set and reject unknown extra surfaces before parity or payload validation runs. - 2026-05-20 replay payload summary-shape update:
noetl/noetl#525makesscripts/check_replay_payload_resolution_report.pyvalidate suppliedpayload_resolution_summaryobjects before checksum comparison, requiring the exact summary field set, strict non-negative integer counters, booleanall_resolved, and lowercase SHA-256 checksum shape. - 2026-05-20 replay validation manifest phase update:
noetl/noetl#526upgradesscripts/run_replay_validation.pyfrom a gate launcher into a reproducible validation manifest writer. Runs now include config, UTC timestamps, per-step duration, parsed JSON stdout when available, optional--report-output, runner-level cutoff validation, and fetch-artifact existence checks;scripts/fetch_replay_state_report.pyalso rejects non-object replay JSON before writing gate input. - 2026-05-20 replay live checksum artifact phase update:
noetl/noetl#527addsscripts/build_live_projection_checksums.pyso any storage adapter can export live projection rows as JSON and produce the canonical live checksum bundle.scripts/run_replay_validation.pynow accepts--live-rows, buildslive-checksums-<execution_id>.json, and runs parity from that artifact while rejecting simultaneous--live-checksums/--live-rowsinputs. - 2026-05-20 replay validation manifest gate phase update:
noetl/noetl#528addsscripts/check_replay_validation_manifest.pyso local-kind/GKE validation manifests are independently gateable. The manifest checker validates config shape, timestamps, required step order, step return codes/durations, optional artifact existence, and mutually exclusive live parity inputs; the replay release gate now covers this checker. - 2026-05-20 replay live-row export adapter phase update:
noetl/noetl#529addsscripts/export_live_projection_rows_postgres.pyas the current reference adapter for producing the storage-neutral live-row JSON artifact from deployed clusters. The exporter uses plain reads against the current projection tables and replay-state projection record; the validation contract remains the JSON row artifact consumed byscripts/build_live_projection_checksums.py, so future storage engines can replace the adapter without changing replay parity gates. - 2026-05-20 replay live-row artifact gate phase update:
noetl/noetl#530makes the exported live-row evidence self-describing and independently checkable. Live-row artifacts now carryschema_version,exported_at, per-surfacerow_counts, and a canonicalrows_checksum;scripts/check_live_projection_rows.pyvalidates those fields before checksum generation, andscripts/run_replay_validation.pyinserts alive_rows_integritystep beforelive_checksums. - 2026-05-20 replay validation artifact-index phase update:
noetl/noetl#531addsscripts/package_replay_validation_artifacts.pyandrun_replay_validation.py --artifact-index-output ...so local-kind/GKE evidence can be shipped as one storage-neutral index. The index records each validation artifact role, path, existence, size, and SHA-256 digest, and can be checked later for missing files or digest drift. - 2026-05-20 replay artifact-index manifest finalization update:
noetl/noetl#532makesrun_replay_validation.py --artifact-index-output ...record theartifact_indexstep in the validation manifest before packaging. The index therefore hashes the final manifest content, including the artifact-index step and path, instead of an intermediate manifest. - 2026-05-20 replay artifact-index role gate update:
noetl/noetl#533makesscripts/package_replay_validation_artifacts.py --checkvalidate the evidence role contract in addition to file existence, size, and SHA-256 drift. A portable validation bundle must contain exactly onemanifest,replay, andreportartifact, and live-row evidence must be paired with the derived live-checksum artifact when either role is present.scripts/check_replay_validation_manifest.py --check-artifactsnow validates the referenced artifact index, rejects indexes that point at a different manifest, and requires theartifact_indexstep to be present and final whenartifacts.artifact_indexis set, so the manifest gate verifies the complete shipped evidence bundle. - 2026-05-20 replay artifact-index portability update:
noetl/noetl#534stores artifact-index paths relative to the artifact-index directory when possible and resolves relative paths from that directory during--check. The checker also validates the top-levelpath_basemarker and manifest pointer. Validation bundles can now be copied and rechecked from a different working directory without rewriting paths; external artifacts outside the bundle remain absolute. - 2026-05-20 replay validation bundle checker update:
noetl/noetl#535addsscripts/check_replay_validation_bundle.pyas the operator entrypoint for shipped local-kind/GKE evidence. It runs the manifest gate with artifact checks, validates the referenced artifact index, and rejects mismatches between an explicit--artifact-indexargument and the manifest's own index reference. - 2026-05-20 projector shard-skip metric update:
noetl/noetl#536splits projector skip telemetry intonoetl_projector_events_unowned_totalfor healthy other-shard traffic andnoetl_projector_events_unshardable_totalfor events missing or carrying invalid shard keys. The existing empty/unowned notification counter remains for compatibility. - 2026-05-20 projector stale-write metric update:
noetl/noetl#537addsnoetl_projector_projection_stale_records_totalfor projection writes skipped by version monotonicity. The counter is based on attempted tenant/org/execution projection groups, not raw events, so same-execution multi-event batches are not reported as stale. - 2026-05-20 projector shard settings validation update:
noetl/noetl#538makes projector workers fail fast whenshard_idresolves to an index outsideshard_countor runtime limits are non-positive. Bad StatefulSet/shard-count wiring now exits at startup instead of running a shard that owns no events. - 2026-05-20 projector strict settings loader update:
noetl/noetl#539removes env/CLI clamping for projector shard and runtime limits. Invalid values now flow into the same startup validation path instead of being silently repaired into defaults. - 2026-05-20 projector metrics identity label update:
noetl/noetl#540adds stableshard_index,shard_count,stream, andsubjectlabels to projector metrics, alongsideshard_idandconsumer, so dashboards can verify the live shard topology they are scraping. - 2026-05-20 projector decode-error metric update:
noetl/noetl#541addsnoetl_projector_decode_errors_totalfor projector notification payload decode failures. Malformed JSON/Arrow payloads are now visible in projector metrics before the subscriber NAKs the message. - 2026-05-20 projector projection-error metric update:
noetl/noetl#542addsnoetl_projector_projection_errors_totalfor failures inside the projection callback while keepingnoetl_projector_errors_totalas the aggregate compatibility counter. - 2026-05-20 projector aggregate error metric update:
noetl/noetl#543makesnoetl_projector_errors_totalcount both decode failures and projection callback failures, while the split counters continue to identify where the failure occurred. - 2026-05-20 projector redelivery metric update:
noetl/noetl#544adds subscriber action observation and projector counters fornoetl_projector_redelivery_requests_totalplusnoetl_projector_delayed_redelivery_requests_total, making NAK/retry intent visible before JetStream redelivery. - 2026-05-20 projector terminal action metric update:
noetl/noetl#545adds projector ACK and TERM counters from the same subscriber action observer:noetl_projector_acknowledged_notifications_totalandnoetl_projector_terminated_notifications_total. - 2026-05-20 projector action/batch metric bundle update:
noetl/noetl#546adds terminal-action recency gauges, last redelivery delay, last-batch shape gauges for extracted/unowned/unshardable/stale counts, and an action-summary helper with ACK/redelivery/termination ratios. - 2026-05-20 projector metrics summary helper bundle:
noetl/noetl#547adds in-process summaries for last-batch shape, decode/projection error split, and the combined projector runtime state. Operators and validation tools can consume the same stable ratio payloads without scraping Prometheus text. - 2026-05-20 projector metrics summary endpoint bundle:
noetl/noetl#548exposes the summary payload at/summaryon the projector metrics server, addsscripts/check_projector_metrics_summary.py, and wires that checker into the replay release gate. Phase 2 validation can now capture scrape-independent projector state evidence from live kind/GKE runs. - 2026-05-20 Phase 2 projector evidence bundle:
noetl/noetl#549addsscripts/fetch_projector_metrics_summary.py, extendsscripts/run_replay_validation.pywith projector summary artifacts, validates those artifacts in replay manifests, and addsscripts/check_projector_phase2_evidence.pyas the phase gate for live projector evidence plus optional projection parity. - 2026-05-20 Phase 2 projector validation runner bundle:
noetl/noetl#550addsscripts/run_projector_phase2_validation.pyas the operator entrypoint for replay + projector-summary evidence, lets bundle checks require Phase 2 projector evidence, and addsscripts/render_projector_phase2_command.pyfor reproducible StatefulSet projector validation commands.
0. Status — all seven phases done (2026-05-23 close-out)
The v2 distributed-runtime spec is complete across all seven phases (0–6). NoETL reached v2.100.2 across the closing session. The per-phase landing PRs:
| Phase | Topic | Closing PR(s) |
|---|---|---|
| 0 | Instrumentation + stage/frame tables + replay API | #435 … #550 |
| 1 | Frame-shaped cursor loops | #585 |
| 2 | Projector StatefulSet behind NATS durable consumers | predates the close-out session |
| 3 | Apache Arrow IPC Tier 1.5 | #587 |
| 4 | URN + KEDA + NATS supercluster | #593 (URN) · #594 (KEDA) · #595 (NATS topology) · #596 (supercluster runtime fixes) · #597 (KEDA NATS-account fix) |
| 5 | Port/adapter event/projection/payload | #582 … #592 (rounds 1–5) |
| 6 | Stage planner for fanout/reduce | #588 |
The architectural shape the spec described is now operational:
events flow through durable JetStream consumers; projectors are
sharded StatefulSets; Arrow IPC Tier 1.5 is wired; KEDA scales
worker pools off NATS consumer lag; the URN scheme encodes
locality and supports per-segment subject derivation; PayloadStore
has a Protocol + adapters for filesystem + S3 + GCS + Azure +
SeaweedFS-via-S3; EventRecord.payload_ref accepts a typed
PayloadReference and serializes through a canonical dict
discriminator; a multi-cluster NATS supercluster generator emits
gateway-meshed JetStream topologies.
Phase 4 — three rounds, all merged
- Round 1,
#593— URN extension.KNOWN_RESOURCE_KINDStaxonomy,NoetlResourceLocator.to_nats_subject()/from_nats_subject(),locality()extraction,dataset_locator/stream_locator/partition_locatorbuilders.WorkerLocatorPartsgainsregion/zone;worker_locator()emits coarse-to-fine (tenant → org → region → zone → cluster → node → worker). - Round 2,
#594— KEDA scaler.ScaledObjectSpec+build_worker_scaledobject()+dump_scaledobject_yaml(). Sample manifest atci/manifests/keda/scaledobject-worker-cpu-01.yaml(now innoetl/ops— see "Scope A/B" below). Live kind validation drovenoetl-worker1 → 12 → 1 under a 200-message lag burst. - Round 3,
#595+#596— NATS supercluster. Multi-cluster topology generator + 2-cluster sample atci/manifests/nats-supercluster/. Three live-validation fixes baked in (#596): uniqueserver_nameper pod via downward API, split/healthzendpoints (?js-server-only=truefor liveness,?js-enabled-only=truefor readiness, plus a long-failureThresholdstartupProbe), andpublishNotReadyAddresses: trueon the headless Service so gateway DNS doesn't deadlock on cluster startup. - Round 3 follow-up,
#597— KEDA NATS account fix. Live scale-up smoke caught thatScaledObjectSpec.nats_accountdefaulted to the global$Gaccount, but NoETL'snoetluser lives in theNOETLaccount — the scaler was silently reading 0 lag. Default fixed to"NOETL"with a pinning assertion. With the fix, the end-to-end scale-up loop validated: burst →TARGETS=200/10 (avg)→replicas=4→8→16→20(cap) → drain →1via HPA stabilization.
Phase 5 — five rounds, port + adapters + envelope binding
- Round 1,
#582— PayloadStore Protocol + FilesystemPayloadStore. Sharded layout (<root>/<sha[:2]>/<sha[2:4]>/<sha>), atomic writes via tempfile + fsync +os.replace, optional metadata sidecar. - Round 2,
#589— S3PayloadStore + parametrized compliance suite. Sync boto3 +asyncio.to_threadto keepmotocompatibility; documented as the canonical cloud-adapter pattern (every subsequent adapter follows the same shape). - Round 3,
#590— GCSPayloadStore. Syncgoogle-cloud-storage+unittest.mocktests (no in-process emulator equivalent ofmoto;fake-gcs-serveris a process-based binary deferred to a future test-infra round). - Round 4,
#591— AzureBlobPayloadStore + SeaweedFS-via-S3 docs. Addsazure-storage-blob>=12.20.0runtime dep. - Round 5,
#592—EventRecord.payload_reftyped binding. Thepayload_reffield accepts either aPayloadReferenceor the legacydict[str, Any]; the envelope serializes throughpayload_ref_to_dict()to a canonical shape carrying akind: "payload_store"discriminator. Thereplay_payload_ref_locatorhelper recognizes the discriminator. Closes Phase 5.
0.1 Post-closeout — manifest consolidation (Scopes A + B)
After the seven phases closed, two follow-up rounds consolidated
the operational artifacts that had been drifting in parallel
between noetl/noetl/ci/manifests/ and noetl/ops/ci/manifests/:
- Scope A (
noetl/ops#112+noetl/noetl#598) — moved the Phase 4 KEDA + NATS supercluster sample manifests fromnoetl/noetl/ci/manifests/tonoetl/ops/ci/manifests/. - Scope B (
noetl/ops#113+noetl/noetl#599) — completed the move for the remaining 11 directories (analytics, clickhouse, gateway, jupyterlab, nats, noetl, postgres, qdrant, test-server, tshoot, etc.) and updatednoetl/ops/automation/development/noetl.yamlto read from localci/manifests/...paths.noetl/noetlnow carries only aci/MOVED.mdbreadcrumb where itsci/manifests/used to be. - Scope B operator follow-ups (
noetl/ops#114) — addsaction=resetto the deploy playbook (clears NoETL namespaces + patches static PVclaimRefs so the next deploy rebinds cleanly), adds retry/backoff around the test-server contract verifier's first/healthcall, and targets a Ready paginated-api pod by name to avoid exec-mid-rollout SIGKILL.
After Scopes A + B, noetl/ops/ci/manifests/ is the sole
home for NoETL operational manifests. The Python generators
(noetl/core/runtime/keda.py, noetl/core/runtime/nats_topology.py)
stay in noetl/noetl; only the committed sample YAML moved.
0.2 Documentation conventions (for AI agents + humans)
"Where to look" for everything the v2-spec implementation touches:
| What you need | Where it lives |
|---|---|
| Application code | noetl/noetl |
| Operational manifests, deployment playbooks, infra automation | noetl/ops |
| Design docs, feature specs (this doc) | noetl/docs |
| Python API + DSL semantics reference | noetl/noetl wiki |
| Operational guides (install / verify / tuning) | noetl/ops wiki |
| AI session state, handoffs, rules | noetl/ai-meta |
Wiki entry points for the v2 work:
noetl/wiki/resource_locator— URN scheme + NATS subject derivation (Phase 4 round 1).noetl/wiki/kedaops/wiki/manifests-keda— KEDA scaler generator (Python API + operational guide).
noetl/wiki/nats_superclusterops/wiki/manifests-nats-supercluster— supercluster topology (generator + operational guide).
noetl/wiki/payload_store— PayloadStore Protocol + adapters + theEventRecord.payload_refbinding.
Two-wiki convention codified in
ai-meta/agents/rules/wiki-maintenance.md
Rule 0: Python API / DSL / core architecture docs live in the
noetl/noetl wiki; operational / deployment / infra docs live
in the noetl/ops wiki; hybrid topics split across both with
prominent cross-link callouts.
0.3 Local-kind validation evidence (2026-05-23)
After the Scope B consolidation merged, the full lifecycle was
exercised end-to-end in a local kind cluster (kind-noetl,
single control-plane node, podman backend):
- Teardown + provision: namespaces deleted, then
noetl run automation/development/noetl.yaml --runtime local --set action=deploy --set image_tag=2026-05-22-09-37fromrepos/ops/provisioned cleanly. (The "PV claimRef" + "test- server contract" friction was caught here and is fixed innoetl/ops#114.) - NoETL API:
/api/healthHTTP 200 in 2.8 ms. 415 playbooks listable from catalog (Postgres PV isRetain, so state survived the teardown). - DB connection footprint: 8 idle (server pool) + 5 idle (projector) + 2 idle (outbox-publisher) + 0 (workers — NATS- only by design) = 15 idle, matching the pre-Scope-B baseline of 11–12 within normal pool variance. No connection-count regression.
- Test playbook latency: 5×
test/simple_pythonruns: 1.20s → 0.97s → 0.73s → 0.67s → 0.66s. Steady-state ~0.66s warm. - KEDA scale-up loop: 200-message lag burst against the
paused
noetl_worker_poolconsumer drove HPATARGETS=0/10→200/10andreplicas=1→4→8→16→20(cap) → drain →1via HPAstabilizationWindowSeconds. - Supercluster gateway mesh: with
cluster_size=1per cluster (CPU budget on single kind node),nats-cluster-a-0reportscluster.name=a,gateway.outbound_gateways=['b'],inbound_gateways=['b'], JetStreamdomain=tenant_default_org_default_region_us_east_1_cluster_a— URN-derived, bidirectional. - Node resource pressure: post-validation 78% CPU
reservation on a 4-vCPU podman VM. Adding more pods (e.g.
full 3-replica supercluster + KEDA-driven worker scale-up)
exceeds capacity; documented in
ops/wiki/manifests-nats-superclusterunder "Resource footprint".
1. Why this revision exists
The PFT v2 workload (fixtures/playbooks/pft_flow_test/test_pft_flow_v2.yaml) is the canonical stress test for the distributed runtime. It exercises:
- 10 facilities × 1000 patients × (assessments × 4 pages + conditions × 3 + medications × 3 + vital signs + demographics + MDS detail fan-out) ≈ 120k HTTP requests + ~26k claim cycles per execution.
- A successful 2026-05-15 local-kind run completed in 3h 54m 21s, wrote 26,199 commands to
noetl.eventwithevent_type='command.*', and required two automatic recoveries by the in-process command reaper.
Even after the cursor-loop refactor (noetl_cursor_loop_design.md) eliminated per-iteration collection materialization, three structural costs remain:
| Cost | Where it shows up | Order of magnitude in PFT v2 |
|---|---|---|
| Server-side per-claim coordination | claim_next_loop_indices and _issue_cursor_loop_commands in repos/noetl/noetl/core/dsl/engine/executor/ | one HTTP claim round-trip per cursor row |
| Per-fragment event amplification | each worker claim emits command.issued / claimed / started / call.done / step.exit / command.completed | 6× events × ~26k fragments ≈ 150k event rows |
| Server-centric projection | only the server folds events into projection state | single writer to projection store; saturates Postgres pool under MDS bursts |
The 2026-05-15 GKE amber run (memory/archive/2026/05/20260515-193100-gke-runtime-reaper-pft-v2-amber.md) saw both the NoETL API DB pool and NATS get into pressure during facility-1 MDS, confirming the central path is the bottleneck.
This spec proposes the next reduction step, but with one correction to the mental model: the event log is not an implementation detail or a backend adapter concern. It is the temporal kernel of the NoETL business operating system. Frames, Arrow IPC, distributed caches, local indexes, streaming materialized views, analytical projections, and object storage are accelerators around that kernel. They must never become the only source of state.
That means every durable state transition must be representable as an append-only event with a deterministic serialization contract. Projections, caches, materialized views, local shared memory regions, analytical tables, streaming state, and distributed indexes are all rebuildable derived state. Given an execution id, tenant id, and timestamp or event position, NoETL must be able to replay the canonical event stream and reconstruct the system's command state, frame state, loop progress, projection outputs, and payload references as they existed then.
The implementation priorities are:
- Event-sourced state reproduction.
noetl.eventremains the canonical append-only ledger. Every frame, lease, cursor, payload reference, projection checkpoint, and tenant-visible business state transition is replayable from event data plus immutable payload objects. - Shared memory as a core advantage. The runtime uses a tiered shared-state fabric: in-process memory, node-local Arrow IPC, local disk/NVMe, NATS K/V for small distributed coordination state, and durable object storage. This fabric makes colocated execution fast without weakening replay.
- Worker-side batched loop execution. Workers claim and execute frames (multi-row windows) instead of single cursor rows. Per-fragment events drop by a factor equal to the frame size.
- Decentralized projection. Projection workers become pluggable, sharded, and horizontally scalable, with NATS consumer groups for fan-out and a stable identity scheme so they can re-attach to their shard after restart.
- Cloud-agnostic event / payload / projection store abstractions. Port/adapter architecture for NATS JetStream / Kafka / Pub/Sub / Event Hubs / Kinesis; for S3 / GCS / Azure Blob / SeaweedFS; and for operational, analytical, search, vector, and streaming projection backends.
The end state is a distributed business operating system in the NoETL sense: every tenant, organization, execution, compute actor, queue, stream, cache object, projection, and payload is addressable via a unified resource locator; workers scale on real backlog signals; loops collapse to map-reduce style stages whose data plane uses shared memory whenever colocated and durable storage otherwise; and auditors/operators can reproduce the state of the organization at a given event position.
2. Non-goals
- This is not a rewrite of the NoETL DSL. Existing
next.arcs[],set:,mode:semantics stay. - This does not replace Postgres as the default projection store. Postgres remains the reference; new backends are opt-in.
- This is not a new Arrow distribution mechanism. We use
pyarrowandarrow-rsas-is. - This is not a green-field event sourcing platform. We keep
noetl.eventas the canonical source of truth; new layers wrap, distribute, cache, mirror, and project it, but they do not replace it.
3. Architecture overview
┌─────────────────────────────────────────────┐
│ Server (planner) │
│ - schedules stages, not iterations │
│ - mints frame leases on each loop │
│ - exposes claim/heartbeat/commit endpoints │
└────────────┬──────────────────┬─────────────┘
│ control events │ frame leases
▼ ▼
┌─────────────────────────┐ ┌──────────────────────────┐
│ Event Store (Tier A) │ │ Projection Store (B) │
│ NATS JS / Kafka / │ │ Postgres / DynamoDB / │
│ Pub/Sub / Event Hubs / │ │ Firestore / Cassandra / │
│ Kinesis │ │ analytical / search / │
│ │ │ operational backends │
│ (append-only ledger + │ │ (derived, replayable │
│ stream adapters) │ │ state) │
└──────────┬──────────────┘ └────────────┬─────────────┘
│ subscribe │ project
▼ ▼
┌─────────────────────────────────────────────┐
│ Workers (runners) │
│ - claim a frame (N rows / N seconds / │
│ bounded memory) │
│ - run inner DSL block in-process │
│ - accumulate results in Arrow RecordBatch │
│ - flush to IPC + Tier 3 + emit one event │
│ - heartbeat lease; on crash, frame is │
│ handed off via cursor checkpoint │
└────────────┬──────────────────┬─────────────┘
│ │
▼ ▼
┌─────────────────────────┐ ┌──────────────────────────┐
│ Tier 1.5 — Arrow IPC │ │ Tier 3 — Payload Store │
│ shared memory │ │ S3 / GCS / Azure Blob / │
│ (same host only) │ │ SeaweedFS (durable, │
│ │ │ content-addressed) │
└─────────────────────────┘ └──────────────────────────┘
Three core moves vs today:
- Event stream as the kernel. Append-only events plus immutable payload objects are the durable operating-system journal. Everything else is an index, cache, materialized view, or delivery stream derived from that journal.
- Stage-shaped scheduling. The server no longer thinks in cursor rows; it thinks in frames. A frame is a unit of work a worker can execute end-to-end, of bounded duration and bounded result size, and is independently recoverable.
- Worker as the loop interpreter. The inner DSL block of a loop step is interpreted inside the worker that owns the frame, not via N round-trips to the server.
- Data plane separate from control plane. Frame outputs flow as Arrow record batches over shared memory when possible and as content-addressed objects in Tier 3 always. The event store only ever carries lightweight manifests.
3.1 Event-sourced state contract
NoETL has one canonical state rule:
If it must be reproduced, audited, routed, billed, retried, or explained later, it must be represented by an event and any referenced immutable payload objects.
The event stream is the durable timeline for a multitenant organization. Projections are allowed to be fast, denormalized, and backend-specific, but they must be disposable. A projection rebuild from (tenant_id, organization_id, execution_id, from_position, to_position) must produce the same state as the original run, modulo explicitly declared non-deterministic external side effects.
Required event envelope fields:
{
"event_id": "snowflake-or-ulid",
"tenant_id": "tenant-123",
"organization_id": "org-456",
"execution_id": "987654321",
"stream_id": "execution/987654321/stage/fetch_patients",
"aggregate_id": "frame/123",
"aggregate_type": "frame",
"event_type": "frame.committed",
"schema_name": "noetl.frame.committed",
"schema_version": 1,
"event_time": "2026-05-16T12:34:56.789Z",
"ingest_time": "2026-05-16T12:34:56.812Z",
"producer": "noetl://tenant/tenant-123/org/org-456/cluster/prod-1/.../worker/cpu-01",
"causation_id": "event-that-caused-this-event",
"correlation_id": "execution-or-business-flow-id",
"idempotency_key": "tenant/org/execution/frame/event-kind/attempt",
"payload_ref": {
"uri": "noetl://tenant/tenant-123/org/org-456/payloads/sha256/...",
"sha256": "...",
"media_type": "application/vnd.apache.arrow.stream"
},
"meta": {}
}
Envelope invariants:
tenant_idandorganization_idare mandatory on every event and payload reference. They are the isolation boundary for routing, replay, retention, encryption, and billing.event_idis globally unique and monotonic enough for local ordering, but replay correctness depends on(stream_id, stream_version)or backend position, not wall-clock time.expected_versionis enforced per aggregate/stream where the backend supports it. Where the transport cannot enforce it directly, a side index records stream versions.idempotency_keyis mandatory for externally retried transitions. Duplicate delivery may occur; duplicate durable effects must not.payload_refpoints to immutable content. Events never point to mutable cache paths as their only copy.
3.2 Time-travel and replay
The replay API is a first-class implementation target, not a diagnostic afterthought:
GET /api/replay/state
query:
tenant_id
organization_id
execution_id
as_of_event_id | as_of_position | as_of_time
projection = execution | frame | loop | business_object | all
Implementation status:
GET /api/replay/stateshipped in Phase 0 (noetl/noetl#435).- Phase 0 folds execution, frame, stage, loop, and payload-reference lineage from
noetl.event. - The endpoint returns a deterministic
sha256checksum over the folded state. - The live reader tolerates pre-migration event tables during rolling rollout. If the additive tenant/org envelope columns are not present yet, replay treats only
tenant_id=defaultandorganization_id=defaultas visible legacy events. - A core replay upcaster registry now exists in Phase 0 (
noetl.core.replay.EventUpcasterRegistry) and is wired intoReplayService.replay_state. The default registry preserves legacy events asschema_name=noetl.event,schema_version=1. ReplayServicecan use either its configured registry or a per-callupcaster_registry, keeping schema migration validation storage-neutral and deterministic without mutating process-global state.- Replay state includes
upcaster_registry_digest, a stable SHA-256 digest of the registered(schema_name, from_version)transitions used for the fold. - A golden replay corpus now lives under
tests/fixtures/replay/inrepos/noetland asserts a stable fold checksum for execution, command, business-object, stage, frame, payload-reference, and registry-digest state. - Event-position snapshot selection now exists for
projection_snapshotrows withaggregate_type=replay_stateandaggregate_id=execution/<execution_id>/<projection>. Replay can seed from the snapshot, skip earlier events, and fold the remaining stream. - Snapshot seeds are accepted only when their recorded upcaster registry digest matches the registry used for the current replay fold.
- Time-based snapshot selection now resolves the time cutoff to an event position before selecting a safe snapshot. Payload resolution, registered production upcasters for future schema revisions, and domain-specific business-object reducers remain tracked by
noetl/noetl#440.
Replay reconstructs state by:
- Loading the latest validated snapshot at or before the requested position.
- Reading canonical events from that snapshot position through the requested cutoff.
- Resolving immutable payload references through the payload store.
- Applying schema upcasters in deterministic order. The Phase 0 registry enforces monotonic version advancement so an upcaster cannot silently rewrite an event without moving the schema version forward.
- Folding events with the same projection code used by live projectors.
Snapshots are performance accelerators. They are never the authority. A snapshot must record:
- event position covered;
- projection code version;
- schema upcaster versions and registry digest, matching the replay state's
upcaster_registry_digest; - payload digest set or Merkle root;
- tenant encryption context;
- deterministic fold checksum.
Replay verification becomes a release gate for every phase after Phase 0: run live execution, rebuild projections from events, and compare checksums for execution state, frame state, loop progress, and configured business projections.
The first golden checksum gate is intentionally small. It does not replace live PFT replay parity; it gives every future serializer/upcaster/projection change a cheap deterministic test that fails before a full cluster run.
Implementation status:
noetl/noetl#481adds the local replay/serialization release gate atscripts/check_replay_release_gate.sh. The gate runs replay golden corpus checksums, upcaster behavior, replay route folding, projector folding/metrics, outbox publishing, command/broker/executor outbox coverage, frame event serialization, Arrow Feather serialization, and IPC durability/cache tests. UsePYTHON_BIN=/path/to/python scripts/check_replay_release_gate.shwhen the default.venv/bin/pythonis not available.
3.3 Visual component diagram
The diagram below maps all named components to their roles, storage tiers, and primary data flows. Solid arrows are runtime data paths; dashed arrows are control or fallback paths.
Key observations from the diagram:
- Event Kernel is the single append authority. The canonical log is the only target for durable writes. The distribution stream mirrors it for low-latency fan-out but carries no replay authority.
- Payload Store and Canonical Log are the correctness pair. Losing either breaks replay. Every other tier and store can be dropped and rebuilt from these two.
- Shared-state fabric follows a strict miss chain. Tier 1 → 1.5 → 2 → Tier 3 (Payload Store). Writers always write Tier 3 first; higher tiers are best-effort and admission-controlled.
- Projectors are the sole writers to projection stores. After Phase 2, the server never writes projection state directly.
- Port/Adapter boundaries (labeled on subgraphs) mark where backend substitutions happen at deploy time; the runtime image does not change.
4. Workload baseline (instrument before refactor)
Before any code change ships, baseline the PFT v2 run with telemetry the team can reason from. This is the metric set every later phase is judged against.
The PFT fixture API must not become the benchmark bottleneck. The
paginated-api test server enforces the PFT endpoint request limit in each
process, so local kind and GKE deployments should run at least three replicas
for PFT v2 validation. That gives enough pod-level capacity for the target
30-50 requests/second workload while preserving the fixture's per-process
50 requests/second contract and keeping 429s meaningful when a single worker
bursts too hard.
The playbook's pft_http_concurrency variable documents the client-side
pressure target. loop.spec.max_in_flight now accepts either a positive
integer or a renderable expression, with the rendered value re-validated as a
positive integer before dispatch. PFT v2 cursor loops should use that workload
variable directly so catalog registration and runtime scheduling share one
source of truth.
Each PFT v2 cursor loop must also opt into loop.spec.frame with
max_rows: 50 so the stage/frame runtime is exercised. A run that leaves
noetl.stage and noetl.frame empty is still on the legacy cursor-command
path and is not a valid Phase 1 benchmark.
Per execution, capture:
| Metric | How | Today's value (PFT v2 GKE 2026-05-15) |
|---|---|---|
total command.* event count | SELECT count(*) FROM noetl.event WHERE execution_id = $1 AND event_type LIKE 'command.%' | ≈ 26k × 6 ≈ 150k |
| frame count | sum of cursor claims that returned > 0 rows | 5,498 in local kind frame-mode run 629145120213828019; older row-shaped baselines were ~5.5k committed frames but still ran one task pipeline per claimed row |
| mean rows per frame | total rows / frame count | 49.92 in frame-mode run 629145120213828019 |
server CPU on /claim hot path | request-log percentiles from gateway | dominated by GKE pool pressure |
| Postgres pool depth high-watermark | pg_stat_activity poll | hit 50 waiters |
| NATS reschedule events | kubectl get events -n nats | 1 during facility-1 MDS |
| payload bytes written to Tier 3 | TempStore counter | not currently instrumented |
| projection store write rate | counter on mark_step_completed | single writer |
| execution wall time | noetl.execution.end_time - start_time | 279.785s local kind frame-mode run; earlier local kind default-row-frame baseline was 1880.739s |
Target after Phases 1–3:
| Metric | Target | Mechanism |
|---|---|---|
total command.* event count | ÷10 | frame-shaped claims, mean rows/frame ≥ 50 |
server /claim requests per execution | ÷50 | one claim per frame |
| Postgres pool depth high-watermark | < 20 sustained | claim path narrower + projection sharded |
| Tier 3 bytes written | unchanged | data goes to Tier 3 either way |
| Tier 1.5 cache hit ratio | > 60% on colocated consumers | new metric, see Phase 3 |
| execution wall time | ÷2 | parallelism + reduced coordination |
A separate dashboard tile per metric is mandatory before merging any phase that claims an improvement.
5. Control plane: stage and frame model
5.1 New tables (additive, no breaking change)
-- Stage describes a unit of orchestration the planner cares about.
-- One stage per loop step or per fan-out step. Tool steps keep using
-- the existing noetl.command path.
CREATE TABLE IF NOT EXISTS noetl.stage (
stage_id BIGINT PRIMARY KEY, -- snowflake id
execution_id BIGINT NOT NULL REFERENCES noetl.execution(execution_id),
parent_event_id BIGINT REFERENCES noetl.event(event_id),
parent_stage_id BIGINT REFERENCES noetl.stage(stage_id),
loop_event_id TEXT,
opened_event_id BIGINT,
closed_event_id BIGINT,
kind TEXT NOT NULL CHECK (kind IN ('loop','fanout','reduce')),
step_name TEXT NOT NULL,
dsl_ref TEXT NOT NULL, -- pointer to playbook step
status TEXT NOT NULL DEFAULT 'OPEN',
frame_policy JSONB NOT NULL, -- size, time, memory bounds
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ended_at TIMESTAMPTZ
);
-- Frame is a worker-claimable window of work inside a stage.
CREATE TABLE IF NOT EXISTS noetl.frame (
frame_id BIGINT PRIMARY KEY, -- snowflake id
stage_id BIGINT NOT NULL REFERENCES noetl.stage(stage_id),
execution_id BIGINT NOT NULL REFERENCES noetl.execution(execution_id),
parent_frame_id BIGINT REFERENCES noetl.frame(frame_id),
command_id BIGINT,
claimed_event_id BIGINT,
terminal_event_id BIGINT,
cursor JSONB NOT NULL, -- driver-specific resume hint
row_count INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'PENDING',
owner_worker TEXT,
lease_until TIMESTAMPTZ,
output_ref JSONB, -- {tier3_sha, ipc_handle?}
events_emitted INTEGER NOT NULL DEFAULT 0,
attempts INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS frame_open_idx
ON noetl.frame (stage_id, status, lease_until)
WHERE status IN ('PENDING','CLAIMED','RUNNING');
CREATE INDEX IF NOT EXISTS idx_stage_execution_step_loop
ON noetl.stage (execution_id, step_name, loop_event_id)
WHERE loop_event_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_frame_stage_frame
ON noetl.frame (stage_id, frame_id);
CREATE INDEX IF NOT EXISTS idx_frame_command
ON noetl.frame (command_id)
WHERE command_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_command_stage_worker_slot
ON noetl.command (execution_id, stage_id, ((meta->>'worker_slot_id')))
WHERE stage_id IS NOT NULL AND meta ? 'worker_slot_id';
CREATE INDEX IF NOT EXISTS idx_event_exec_stage_event_id_desc
ON noetl.event (execution_id, stage_id, event_id DESC)
WHERE stage_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_event_exec_frame_event_id_desc
ON noetl.event (execution_id, frame_id, event_id DESC)
WHERE frame_id IS NOT NULL;
These coexist with noetl.event and noetl.command. Legacy tool steps are not migrated.
5.1.1 Replay lineage graph
The stage/frame tables are operational projections, but their keys must make replay and audit cheap without scraping JSON. The current implementation uses this graph:
execution
└─ stage
├─ parent_stage_id -> parent stage for nested/fan-out chains
├─ loop_event_id -> loop epoch that opened the stage
├─ opened_event_id -> noetl.event(stage.opened)
├─ closed_event_id -> noetl.event(stage.closed)
├─ command.stage_id -> worker commands issued for the stage
└─ frame
├─ parent_frame_id -> upstream frame for derived/reduce work
├─ command_id -> noetl.command that owns the worker slot
├─ claimed_event_id -> noetl.event(frame.dispatched)
└─ terminal_event_id-> noetl.event(frame.committed|frame.failed)
noetl.event also carries direct stage_id and frame_id columns for stage/frame lifecycle events and command events where the producer knows the projection key. Those columns are indexed for replay scans by (execution_id, stage_id, event_id DESC) and (execution_id, frame_id, event_id DESC). noetl.command carries stage_id and frame_id; the cursor-worker path resolves frame.command_id from the command context and has a server-side fallback indexed by (execution_id, stage_id, meta->>'worker_slot_id').
parent_stage_id and parent_frame_id are the explicit traversal edges for runtime projections. Cursor loops currently use loop_event_id as the direct stage correlation key and mint one root frame per stage; subsequent frames in that stage carry parent_frame_id to form a deterministic stage-local chain. Fan-out/reduce will extend the same parent ids to preserve the execution tree without replaying every event payload.
5.2 Frame policy
frame_policy is the configurable bound that decides how much work a single frame contains. Phase 0 adds a typed runtime model as FramePolicy and accepts it on cursor loops as loop.spec.frame (or loop.spec.frame_policy internally). Defaults preserve today's one-row cursor behavior until a playbook opts into batching.
loop:
cursor: ...
spec:
mode: cursor
max_in_flight: 100 # worker slots
frame:
max_rows: 50 # max rows per frame
max_seconds: 30 # max wall-time per frame
max_bytes: 67108864 # max in-flight serialized output bytes
lease_seconds: 120 # initial lease window
heartbeat_seconds: 30 # expected heartbeat cadence
process: frame # row = run body once per row; frame = run body once per frame
retry_mode: whole_frame
max_attempts: 3
For PFT v2, the validated opt-in frame is { max_rows: 50, max_seconds: 30, max_bytes: 64MB, lease_seconds: 120, heartbeat_seconds: 30, process: frame }. process: row remains the default for backwards-compatible semantics: the worker claims a frame but runs the declared task pipeline once per row. process: frame exposes frame.rows, frame.row_count, frame.index, and iter.<iterator>_rows to the task pipeline and runs it once per claimed frame. That is the mode that produced the 2026-05-18 full PFT v2 local kind validations in ~4m30s to ~5m40s.
row_concurrency remains available as an opt-in tuning knob for process: row, but the 2026-05-17 local kind validation showed row_concurrency: 4 regressed the full PFT v2 run from ~31m21s to ~33m23s. Leave it at the default 1 unless a workload explicitly needs concurrent row execution inside one frame.
retry_mode: whole_frame is the only supported Phase 1 recovery mode. If a
worker crashes or its lease expires before terminal commit, reclaim emits
frame.abandoned and then a new frame.dispatched attempt for the same frame
boundary. Row-split retry is intentionally not supported yet: splitting a
failed frame would create a second event-sourcing boundary and requires a
future subframe model. A task failure that reaches terminal commit emits
frame.failed with the frame cursor, output reference summary, row count,
events_emitted, and recovery policy so an operator can audit the lost-work
unit and decide whether to requeue at the business cursor layer.
Implementation status:
FramePolicynow exists innoetl.core.dsl.engine.models.workflow.- Cursor loop dispatch includes the resolved frame policy in both
cursor_workertool config and command metadata. - The
cursor_workerruntime passes__frame_max_rowsand__frame_policyinto cursor claim rendering, then asks the driver for a frame-sized row window. Drivers without a batched path fall back to repeated single-row claims. - The Postgres cursor driver now implements
claim_many(...). If the playbook claim SQL usesLIMIT {{ __frame_max_rows | default(1) | int }}, one database round-trip claims a whole frame instead of one row. PFT v2 uses this form for all patient-domain and MDS-detail cursor queues. - The
cursor_workerruntime supports both row-mode and frame-mode execution. Row mode runs the existing task pipeline once per claimed row in-process and returns frame metadata in the terminal command result. Frame mode runs the task pipeline once per claimed frame withframe.rowsin context, so playbooks can batch HTTP, database writes, and reducer-style work while preserving one replayable frame boundary. frame.row_concurrencyis an opt-in bound for processing rows within one frame concurrently in row mode; the default remains1to preserve legacy ordering and side-effect behavior.- Cursor frame commits now carry profiling metadata under
frame.cursor.metrics: row outcome counts, row concurrency, total task/render/tool milliseconds, and rollups by task kind/name. This is the release-gate instrumentation for deciding whether the next PFT improvement should target HTTP calls, Postgres saves, serialization, or control-plane commits. - Claimed frame rows are serialized as Apache Arrow IPC stream bytes through
TempStore.put_ipc_bytes. The command result carries a durablerows_refwith schema digest, row count, media type, and optional Tier 1.5 IPC hint. Defaultmax_rows=1preserves existing one-row behavior unless a playbook opts into batching. - Frame commit validation now treats IPC metadata as a cache-only hint: if
output_refcontainsipc, the same envelope must also contain a durableref,uri, orlocator, including nestedrows_refshapes. IPC-only frame events are rejected because they cannot be replayed once shared memory is swept.
5.3 Claim / heartbeat / commit API
New endpoints on the NoETL server, additive to the existing /api/commands/*:
POST /api/stages/{stage_id}/frames/claim
body: { worker_id, command_id?, requested_count, lease_seconds, cursor, frame_policy, locality? }
returns: [ { frame_id, cursor, lease_until, dsl_ref, frame_policy } ... ]
POST /api/frames/{frame_id}/heartbeat
body: { worker_id, lease_seconds, status }
returns: { lease_until }
POST /api/frames/{frame_id}/commit
body: { worker_id, cursor, output_ref, row_count, status, events_emitted }
returns: { ok, next_action } # may immediately hand the worker the next frame
Implementation status:
- The frame control-plane endpoints are now present in
repos/noetlbehind the main API router:POST /api/stages/{stage_id}/frames/claimPOST /api/frames/{frame_id}/heartbeatPOST /api/frames/{frame_id}/commit
- Phase 1 starts with explicit replayable frame leases. Claim either takes a pending/expired frame or lazily creates a frame row for the requested stage.
- Each lifecycle transition emits a canonical frame event:
frame.dispatchedframe.startedframe.heartbeatframe.abandonedframe.committedframe.failed
- Frame lifecycle events now populate
stream_versionandenvelope_checksum, so replay can order stage-local frame transitions and detect envelope drift with the same checksum contract as the event-store port. Frame telemetry streams use the Snowflake event id as a sparse monotonicstream_version; this removes the former per-frame stream advisory lock andmax(stream_version)+1scan from heartbeat and commit writes. - Frame claims now persist
frame.command_idfrom the worker command context. If an older worker omits it, the server resolves it from(execution_id, stage_id, worker_slot_id)using the indexed command metadata path. - Lazily minted runtime frames use the cursor claim key
(stage_id, cursor.worker_slot_id, cursor.frame_index)as their idempotency boundary. The hardening patch innoetl/noetl#453replaces the remaining stage-scoped frame-mint advisory lock with a partial unique index and conflict reload, keeping duplicate retries deterministic without serializing frame claimers. - Cursor workers now send a
RUNNINGheartbeat immediately after a successful frame claim and before frame execution. The heartbeat endpoint records the firstCLAIMED -> RUNNINGtransition asframe.started; laterRUNNINGlease extensions are recorded asframe.heartbeat. This keeps the normal replay sequence clear:frame.dispatched -> frame.started -> frame.heartbeat* -> frame.committed|frame.failed. - When a worker reclaims an expired
CLAIMED/RUNNINGframe, the server emitsframe.abandonedbefore the newframe.dispatchedevent. Replay therefore sees the recovery chain as append-only history rather than inferring it from a mutable row overwrite. - Recovery metadata is persisted on
frame.abandonedandframe.dispatched:retry_mode=whole_frame,row_split_retry=false,max_attempts, previous owner, reclaimer, prior lease, and previous attempt count. This is enough to audit whether a frame was retried as a whole unit and whether operator intervention is needed after repeated attempts. - Duplicate or late terminal commits are rejected with
409 frame_already_terminaland the existingterminal_event_id, so a retrying worker cannot emit a secondframe.committed/frame.failedevent for the same frame. Heartbeats against terminal frames receive the same conflict response. - Worker-side cursor integration uses the existing
cursor_workerpath with frame policy and batched driver claims. The remaining Phase 1 work is validation, telemetry, and removing any residual per-row server coordination from non-PFT cursor shapes.
The HTTP surface is the operational fallback. The primary path uses NATS JetStream pull consumers (see §6) for lower-latency claim and built-in lease semantics.
5.4 Server's narrower role
After the refactor:
- For a loop step the server emits one
stage.openedevent, mints frames lazily as workers ask, and emits onestage.closedwhen all frames commit. - The server does not touch per-row state. Cursors are opaque; the server only records the latest committed cursor per frame.
- The server does not issue per-row
command.issuedevents. Frame claims are observable viastageandframerows plus a singleframe.dispatchedevent per claim.
The current command_reaper repurposes to frame reaper: it scans noetl.frame for lease_until < now() AND status IN ('CLAIMED','RUNNING'), marks the lease abandoned, and republishes the frame. Same correctness guarantees; smaller scan surface.
6. Data plane: Arrow IPC zero-copy (Tier 1.5)
Tier 1.5 is part of a broader shared-state fabric, not a one-off optimization. The design target is a proven distributed-data pattern: keep durable state in an append-only log and immutable payload objects; keep hot execution state in rebuildable shared caches, indexes, and materialized views.
| Layer | Backends | Purpose | Rebuildable from events? |
|---|---|---|---|
| Canonical log | noetl.event Postgres partitions; optional mirrored JetStream/Kafka/Pub/Sub/Event Hubs/Kinesis streams | Durable timeline and replay authority | Source |
| Immutable payloads | S3 / GCS / Azure Blob / SeaweedFS / local durable store | Large event data, Arrow batches, files | Referenced by log |
| Hot shared memory | in-process LRU + Arrow IPC shm/memfd | Same-process and same-node zero-copy reads | Yes |
| Warm node cache | local NVMe/PVC disk cache | Reuse payload blocks after restart or reschedule | Yes |
| Small distributed cache | NATS K/V | Lease hints, loop counters, small coordination state | Yes |
| Streaming materialization | source/table/materialized-view/sink engine with barriers | Incremental state over event streams and work queues | Yes |
| Analytical materialization | columnar analytical projection store | High-volume queryable facts, metrics, audit/event lake views | Yes |
Only the first two layers are required for correctness. The rest are latency, throughput, and serving-shape advantages. This is the core product advantage: NoETL can run like a low-latency distributed shared-memory system while remaining reproducible from an append-only event log.
6.1 Where it fits in TempStore
The existing repos/noetl/noetl/core/storage/result_store.py already tiers payloads as MEMORY → KV → DISK → S3/GCS/DB. We insert a new tier between MEMORY and DISK:
| Tier | Mechanism | Scope | Lifetime |
|---|---|---|---|
| 1 | in-process LRU (existing) | per process | configurable bytes |
| 1.5 | Apache Arrow IPC over POSIX shm / memfd | per host (all processes on the node) | frame lease + 30s grace |
| 2 | local NVMe disk cache (existing) | per node | configurable GB |
| 3 | S3 / GCS / Azure Blob / SeaweedFS (existing) | global, content-addressed | retention policy |
Tier 1.5 is the zero-copy hop for colocated workers. Tier 3 is always written; Tier 1.5 is best-effort fast path.
6.2 Format
Workers materialize loop results as pyarrow.RecordBatch (Python) / arrow_array::RecordBatch (Rust). The on-disk and on-wire format is Arrow IPC stream. The shared memory carrier is one of:
- POSIX
shm_open+mmap(Linux/macOS) — simplest, ubiquitous. memfd_create(Linux only) — preferred where available; no path collisions, anonymous file descriptor passed via SCM_RIGHTS over a Unix domain socket from a node-local broker.
For cross-runtime parity (Python ↔ Rust), the SHM region carries:
- A 16-byte header: magic
NOETLIPC, format versionu32, payload lengthu64. - The Arrow IPC stream bytes.
This is intentionally simpler than the Plasma object store: NoETL workers are co-tenants of a single Kubernetes pod (one process per container today; if we ever go multi-process per pod we add a small node-local broker — see §6.5). No Plasma client/server fan-out, no shared catalog.
6.3 Reference shape
PayloadReference (existing in result_store.py) gains optional IPC metadata:
@dataclass
class PayloadReference:
tier3_uri: str # noetl://tenant/<tenant>/org/<org>/payloads/sha256/<sha256>
sha256: str
media_type: str # "application/vnd.apache.arrow.stream"
rows: int
bytes: int
# Tier 1.5 fast-path hint, may be None or stale
ipc: Optional[IpcHint] = None
@dataclass
class IpcHint:
node_id: str # noetl://tenant/<tenant>/org/<org>/cluster/<id>/node/<id>
shm_name: str # /noetl-<execution>-<frame>-<seq>
schema_digest: str # quick sanity check before attach
valid_until: datetime # writer-promised minimum lifetime
Implementation status:
repos/noetlnow exposesIpcHintonResultRef/TempRef.ResultRefMetarecords Arrow/replay-relevant payload metadata:media_type,schema_digest, androw_count.IpcHintis explicitly best-effort. It carriesshm_name,schema_digest,byte_length, optionalrow_count,producer,node_id,lease_expires_at, and media type.NOETL_NODE_IDis populated from Kubernetesspec.nodeNamein the worker/projector/outbox publisher pods when deployed through Helm.ArrowIpcSharedMemoryCachenow provides the first Tier 1.5 implementation surface innoetl.core.storage.ipc_cache: budget enforcement, POSIX shared-memory allocation,IpcHintcreation, read/attach, delete, and expired-lease sweep.TempStore.put_ipc_bytes/get_ipc_bytesnow provide an explicit raw Arrow IPC path: durable bytes are always written, IPC admission is optional, and reads fall back to the durable tier when the shared-memory hint is expired or missing.TempStore.ipc_stats()exposes local counters for admission attempts/success/failures, read attempts/hits/misses, and durable fallback reads./metricsnow exports the server process's TempStore IPC counters asnoetl_storage_ipc_*Prometheus metrics, plusnoetl_storage_ipc_read_hit_ratio.- Worker processes can expose their own TempStore IPC counters with
NOETL_WORKER_METRICS_PORT. The endpoint is intentionally lightweight (/health,/metrics), suppresses scrape access logs, and labels counters byworker_id,worker_pool, andruntimeso pod-level cache admission/read behavior is visible independently from server metrics. The Helm chart publishes this as thenoetl-worker-metricsheadless service by default. noetl.core.storage.arrow_ipcnow provides the runtime serialization primitive:rows_to_arrow_ipcandarrow_ipc_to_rows. Schema digests are computed from Arrow's serialized schema, not from row values, so replay/projector code can compare logical frame shape independently from payload content.cursor_workernow writes multi-row frame captures through this path whenframe_policy.max_rows > 1orNOETL_CURSOR_FRAME_CAPTURE_ENABLED=true.NOETL_CURSOR_FRAME_IPC_ENABLED=falsedisables only the same-node IPC admission; the durable payload write remains authoritative.TempStore.resolve(result_ref)is Arrow-aware forapplication/vnd.apache.arrow.streamreferences: it attempts the same-node IPC hint first, falls back to durable bytes, then decodes to row dictionaries only at the consumer boundary. Generic JSON result refs continue through the existing JSON resolver.- The durable
ResultRefremains authoritative; consumers must treat expired, missing, or foreign-node IPC hints as cache misses and fall back through durable storage. - Worker-side metrics aggregation into dashboards and colocated worker/projector validation remains tracked by
noetl/noetl#438.
6.4 Producer / consumer protocol
Producer (any worker that emits a record batch):
- Serialize batch to Arrow IPC stream bytes.
- Write to Tier 3 keyed by
sha256(idempotent, exists-first check). - Attempt Tier 1.5 write: create / open shm, copy buffer (one memcpy from the IPC stream), set
valid_until = now() + lease_until + 30s. - Emit one event whose envelope carries
PayloadReferencewith both Tier 3 URI and the optionalIpcHint.
Consumer (frame commit handler, reducer, projection worker):
- Read
PayloadReferencefrom event envelope. - Try
IpcHintif present andnode_id == self.node_idandvalid_until > now(): open shm, mmap, wrap aspyarrow.RecordBatchStreamReader. Zero-copy. - If hint is missing, stale, or another node: read from Tier 1 cache; on miss, Tier 2; on miss, Tier 3.
The consumer never trusts the IPC hint blindly. It validates schema_digest (cheap), bumps a tier metric, and on any error falls through to the durable read path. The durable read is the source of truth.
6.5 Garbage collection and back-pressure
- Each shm region is owned by the producer worker for the duration of its frame lease + 30s grace. After grace, the worker
shm_unlinks the region. - Workers track a per-node
tier15_bytes_in_usecounter. New shm writes are admission-controlled by a configurable budget (default 1 GB per node). On budget exhaustion the writer skips Tier 1.5 and emits only the durable Tier 3 reference. No data plane stall. - For multi-process per pod (future): introduce a
noetl-node-brokersidecar that owns the shm namespace and brokers lifetimes via Unix-socket RPC. Not required for the current one-process-per-pod deployment shape.
6.6 Serialization contract
Serialization is part of the replay contract. If two runtimes read the same event stream and payload objects, they must fold the same state.
Rules:
- Event envelopes: canonical JSON for the persisted envelope metadata. Writers must emit stable field names, UTC timestamps, explicit nulls only where schema allows them, and deterministic map key ordering before signing/checksumming.
- Large tabular payloads: Apache Arrow IPC stream, media type
application/vnd.apache.arrow.stream. Every payload recordsschema_digest,row_count,byte_length, compression codec, and content SHA-256. - Nested business payloads: JSON Schema / OpenAPI-compatible payloads for small objects; Arrow
struct,list, andmaptypes for large repeated data. Avoid pickle, language-native binary serialization, or runtime-specific object graphs. - Decimals and time: decimals carry precision/scale; timestamps are UTC with explicit unit; local time zones are data fields, not implicit runtime settings.
- Schema evolution: every event has
schema_nameandschema_version. Upcasters are pure functions registered by(schema_name, from_version)and must advance to a laterschema_versionbefore the next upcaster can run. The registry is deterministic and covered by replay tests; production releases must also record the registry digest in replay/projection snapshots. - Cross-language parity: Python and Rust compliance tests read the same golden event/payload corpus and compare fold checksums.
The durable payload digest is computed over the exact serialized bytes. The projection checksum is computed over a canonical projection serialization, not over backend-specific storage bytes.
6.7 Why Arrow, not raw bytes
- Columnar layout is the right shape for the heavy paths (Postgres bulk reads, DuckDB joins, fanout reducers, projection writes to analytical stores).
- Zero-copy between Python and Rust workers via the C Data Interface comes for free with Arrow. Important once we run a mix of
pyarrow-using Python workers andarrow-rsRust workers on the same pod. - The wire schema is self-describing enough for payload reads, while the event envelope still records schema identity/version for governance and replay.
7. Decentralized projection
7.1 Current state
noetl_async_sharded_architecture.md already specifies async projection workers and epoch barriers. The projection worker skeleton exists. What is not yet decentralized: the projection still runs inside the server process, and the projection write rate caps at the server's single-writer Postgres connection.
7.2 Refactor
- Extract projection into its own deployable:
noetl-projector. Same image, different entrypoint. - Keep the reducer transport-neutral: the same deterministic projector core must accept events from tests, replay, NATS JetStream, Kafka, or cloud stream adapters. Transport code owns delivery and checkpointing; reducer code owns ordering, folding, lineage, checksum, and idempotent writes.
- Each projector instance owns one or more shards via NATS JetStream pull consumer group (
noetl.projection.shard.<n>). - Shard assignment is sticky: a projector keeps a shard as long as it heartbeats. On stop, NATS reassigns. This is the standard JetStream durable consumer pattern.
- Each shard has its own Postgres connection (or its own backend entirely, see §8 for the projection store abstraction). Total projection throughput scales linearly with shard count.
- Projector reads events from NATS, resolves
PayloadReferencevia the cache hierarchy (Tier 1 → 1.5 → 2 → 3), and writes projection state. Tier 1.5 is the hot path when the projector and the producing worker are colocated.
7.3 Stable identity
To get StatefulSet-style stable identity (required for Tier 2 disk cache continuity and for NATS durable consumer affinity), projectors and workers run as StatefulSet not Deployment:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: noetl-projector
spec:
serviceName: noetl-projector
replicas: 4
template:
spec:
containers:
- name: projector
env:
- name: NOETL_SHARD_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NOETL_NODE_ID
valueFrom:
fieldRef:
fieldPath: status.podIP
NOETL_SHARD_ID = noetl-projector-0..3 is stable across restarts and maps directly to a NATS durable consumer name.
7.4 Projection store abstraction (port/adapter)
class ProjectionStorePort(Protocol):
async def save_projection(
self, projection_id: str, state: bytes, version: int
) -> None: ...
async def load_projection(
self, projection_id: str
) -> Optional[Tuple[bytes, int]]: ...
async def save_snapshot(
self, aggregate_id: str, aggregate_type: str,
snapshot: bytes, version: int
) -> None: ...
async def load_snapshot(
self, aggregate_id: str
) -> Optional[Tuple[bytes, int]]: ...
async def query(
self, projection_type: str, filters: Mapping[str, Any],
pagination: Pagination
) -> AsyncIterator[Mapping[str, Any]]: ...
Adapters: Postgres (reference), cloud/serverless document and key-value stores, wide-column stores, columnar analytical stores, search stores, vector stores, and streaming materialized-view engines. Specific products are deployment choices, not architectural dependencies.
Implementation status:
- The first projection-store port is present in
repos/noetl/noetl/core/projection_store. ProjectionRecordandProjectionSnapshotdefine idempotent projection and snapshot writes.projection_checksum()provides deterministic JSON checksums for replay parity.PostgresProjectionStoreis the reference adapter and writes to additivenoetl.projectionandnoetl.projection_snapshottables.- The first transport-neutral reducer is present in
repos/noetl/noetl/core/projector.ReplayStateProjectorgroups events by tenant, organization, and execution, orders by event position, folds them with the same replay-state reducer used byGET /api/replay/state, and writes lineage-richProjectionRecordrows with checksum and upcaster-registry metadata. - The first projector process entrypoint is present at
python -m noetl.projector. It wraps the reducer with a NATS durable pull consumer, stable shard identity, and modulo execution-id shard filtering so a StatefulSet replica only projects the events it owns. noetl/noetl#457adds optional projector/metricsand/healthendpoints (NOETL_PROJECTOR_METRICS_PORT/--metrics-port) with counters for notifications, extracted events, shard-owned events, projection records, empty/unowned notifications, errors, and last batch gauges.noetl/noetl#536adds separate counters for events assigned to another shard and events that cannot be sharded because the execution id is missing or invalid.noetl/noetl#537adds a stale projection-write counter for duplicate/out-of-order delivery that loses the version race against an already-written projection. Access logging is suppressed for scrape paths.noetl/noetl#458exports durable checkpoint gauges from successful projector writes: last source event id, event-time watermark, projected-at timestamp, latest projection lag, and max projection lag since process start.NATSEventPublisheris present inrepos/noetl/noetl/core/messaging. It mirrors canonical event envelopes to subjects shaped asnoetl.events.<tenant>.<org>.<execution>.<shard>, creates the event stream with a wildcard subject, and retries once after reconnect on publish failure.noetl/noetl#464addsnoetl.outboxplusnoetl.core.outbox. The outbox is the preferred Phase 2 commit boundary: appendnoetl.eventand enqueue the distribution envelope in one database transaction, then let a publisher drain only committed rows to NATS. Rows include JSONB payloads for debugging and pre-encoded payload bytes with a codec so the publisher can avoid repeat JSON serialization when a subject is known.noetl/noetl#479completes the matching consumer side for that codec choice. Projector subscribers keep JSON compatibility for direct envelopes and decode Arrow Feather payload bytes from the outbox path into the same event dictionaries before sharding and replay-state folding.noetl/noetl#480adds the standalone outbox publisher entrypoint (python -m noetl.outbox). It initializes the database pool, ensures the outbox table, publishes ready rows in bounded batches, and supports a--oncemode for smoke tests and Kubernetes Jobs.noetl/noetl#466applies that boundary to direct/api/events: command lifecycle events, workflow/playbook lifecycle events, and engine-generatedcommand.issuedrows enqueue their mirrored envelopes intonoetl.outboxbefore the database commit. After commit, the route asks the outbox publisher to drain committed rows. This removes the direct post-commit publish hook from the core event path without changing the external worker API.noetl/noetl#467applies the same boundary to/api/events/batch: item events,batch.accepted,batch.processing,batch.completed,batch.failed, and batch-generatedcommand.issuedrows enqueue in the writer transaction or insert chunk. The route preserves the existing external batch-request contract; it only changes how distribution intent becomes durable.noetl/noetl#468applies the boundary to execution routes:/api/executeinitialcommand.issuedrows,/api/executions/{execution_id}/cancelexecution.cancelledrows, and cleanup-stuckexecution.cancelledrows enqueue before commit, then drain committed outbox rows. Command recovery publish remains separate because it is worker dispatch, not projector/event distribution.noetl/noetl#469applies the boundary to auto-resume recovery cancellations so interrupted parent executions markedexecution.cancelledby startup recovery participate in the same projector distribution bridge as API-generated cancellations.noetl/noetl#470applies the boundary to generic broker event emission.EventService.emit_eventstores the sanitized event row and outbox envelope in one transaction, preserving the broker API response while making generic emits projector-visible.noetl/noetl#471applies the boundary to command claims.claim_commandnow storescommand.claimedand updates the mutable command row in the same transaction as the outbox envelope, then drains committed rows after commit.noetl/noetl#472applies the boundary to the backend-neutral event-store port.PostgresEventStore.appendwrites the canonical envelope and outbox envelope in the same expected-version transaction, so future event-store users inherit durable projector fan-out without adding route-local hooks.noetl/noetl#473applies the boundary to DSL executor-owned events._persist_event,stage.opened, andstage.closedenqueue before commit; outbox drain is skipped for caller-owned transactions and only run after executor-owned commits.- Command-event idempotency keeps command identifiers lossless as strings at the event boundary while still using the numeric projection column when available. Duplicate guards compare both the typed
command_idcolumn andmeta.command_id, preserving replay compatibility with older worker events and newer snowflake command rows. noetl/noetl#478applies the boundary to frame lifecycle events. Frame claim, heartbeat/start, abandon, commit, and failure routes enqueue replay envelopes intonoetl.outboxin the same transaction asnoetl.frameandnoetl.event, then drain only committed rows after the route commit.- Frame lifecycle mirroring remains opt-in via
NOETL_EVENT_MIRROR_ENABLED=true, but the route no longer publishes directly; distribution is mediated bynoetl.outbox. noetl/noetl#459originally extended the same opt-in mirroring to persisted/api/eventslifecycle events and generatedcommand.issuedevents via post-commit direct publish.noetl/noetl#466supersedes that route-local hook with same-transaction outbox enqueue.noetl/noetl#460closes the async batch-ingestion gap by mirroring/api/events/batchitem events,batch.accepted,batch.processing,batch.completed,batch.failed, and batch-generatedcommand.issuedevents after their canonical commits. The mirrored envelopes preservecommand_id,stage_id,frame_id,parent_event_id, andparent_execution_idwhere available so projector replay sees the same lineage as direct event ingestion.noetl/noetl#462covers execution-route appends: initial/api/executegeneratedcommand.issuedevents and APIexecution.cancelledevents are mirrored after their canonical commits.- Deployment wiring has started in
noetl/ops#102andnoetl/ops#103: the Helm chart gains an opt-innoetl-projectorStatefulSet, projector ConfigMap, headless service, stable shard identity from the StatefulSet pod name, automatic server event-mirror env wiring whenprojector.enabled=true, and a namedmetricsport on9090. - Projector startup validates the resolved shard settings. A worker whose stable pod identity maps to
shard_index >= shard_count, or whose fetch/ack/metrics runtime limits are non-positive, fails at process initialization instead of silently subscribing as a shard that can never own events. Environment and CLI loaders preserve invalid values for that validation step; they do not clamp bad shard counts, inflight limits, timeouts, or metrics ports into defaults. Projector metrics exposeshard_id,shard_index,shard_count,consumer,stream, andsubjectas stable labels so validation dashboards can confirm the running topology. Payload decode failures and projection callback failures have separate counters;noetl_projector_errors_totalis the aggregate counter over both categories. Projector ACKs, NAKs, delayed NAKs, and TERMs are counted separately from the subscriber action observer, with recency gauges and last redelivery delay available for validation dashboards. In-process summary helpers now report action ratios, last-batch shape ratios, decode/projection error ratios, and the combined projector runtime state; the projector metrics server exposes the same payload at/summaryfor local-kind/GKE evidence capture, andrun_projector_phase2_validation.pywraps replay validation plus projector evidence gating into the current Phase 2 operator workflow. - Projector checkpoint metadata landed in
noetl/noetl#455: each replay-state projection write recordsevent_count,source_event_id,event_time_watermark,projected_at, andprojection_lag_msinProjectionRecord.meta.noetl/noetl#458exposes those fields as per-shard Prometheus gauges. - The adapter currently supports:
save_projection(record)with version-monotonic upsert semantics;load_projection(projection_id);query_projections(query)by tenant, organization, projection type, execution id, and limit;save_snapshot(snapshot)with version-monotonic upsert semantics;load_snapshot(aggregate_id, aggregate_type=...).
- Live mirrored-event projector validation and remaining direct event append paths remain tracked by
noetl/noetl#437andnoetl/noetl#461. The next migration step is deploying the full outbox path in kind/GKE and validating projector replay against PFT v2. - Non-Postgres projection adapters remain tracked by
noetl/noetl#439.
Projection backend roles:
- Operational projection stores serve execution status, current frame state, user-facing API reads, and transactional admin surfaces.
- Columnar analytical projection stores are the high-volume analytical memory of the system: event lake tables, tenant/org audit timelines, PFT metrics, cost/billing facts, and queryable business histories. They are append-friendly and rebuildable from the event stream; they must not own unreplayable state.
- Streaming materialized-view engines maintain source/table/materialized-view/sink pipelines, incremental aggregations, work-queue state, and barrier-aligned derived state. They consume canonical events or table sources and emit derived updates, never replacing the canonical log.
- Search/vector stores serve retrieval shapes only. They are rebuilt from event/payload/projected source data and may lag without compromising correctness.
Every projection row should include enough lineage to support audit and replay comparison: tenant_id, organization_id, execution_id where applicable, source event_id or event-position range, projection version, and checksum.
Each projection type can target a different backend. The projector dispatches by the projection's configured backend (see §11 for the YAML).
8. Event store abstraction (port/adapter)
class EventStorePort(Protocol):
async def append(
self, stream_id: StreamId, events: Sequence[EventEnvelope],
expected_version: Optional[int]
) -> int: ...
async def read(
self, stream_id: StreamId, from_version: int = 0
) -> AsyncIterator[EventEnvelope]: ...
async def subscribe(
self, stream_pattern: str, consumer_group: str,
from_position: ConsumerPosition
) -> Subscription: ...
Adapters (priority order):
- Postgres
noetl.event— canonical ledger and replay source by default. Partitioned by tenant/time/execution as volumes grow. - NATS JetStream — reference distribution stream. Subject =
noetl.events.<tenant>.<execution>.<shard>. Durable consumers per projector / per worker. Events are persisted to the canonical ledger and mirrored to JetStream for low-latency fan-out. - Apache Kafka / Confluent / MSK — partition key = aggregate id; offset checks for
expected_version. - Google Pub/Sub — topic per category, ordering key per aggregate, side store (Spanner / Firestore) for
expected_version. - Azure Event Hubs — Kafka-compat mode reuses Kafka adapter; native mode uses Event Hubs SDK + Blob checkpoint store.
- Amazon Kinesis Data Streams — partition key per aggregate, DynamoDB for version tracking, KCL for consumer coordination.
Implementation status:
- The first backend-neutral event-store port is present in
repos/noetl/noetl/core/event_store. EventRecorddefines the canonical append envelope used by adapters.canonical_event_checksum()serializes JSON deterministically with sorted keys and compact separators.PostgresEventStoreis the reference adapter fornoetl.event.- The adapter currently supports:
append(stream_id, events, expected_version=...)read(stream_id, from_version=..., limit=...)- per-stream expected-version conflict detection;
- mapping to additive event envelope columns (
stream_id,stream_version, aggregate/schema/tenant fields,payload_ref,envelope_checksum).
- NATS/Kafka/cloud-native stream adapters and projection-store ports remain tracked by
noetl/noetl#439. The default NATS publisher now has an outbox-backed path tracked bynoetl/noetl#464; direct/api/eventsadopts it innoetl/noetl#466. Future adapters should drain the samenoetl.outboxcontract rather than adding writer-local publish hooks.
Adapter design constraints:
- Idempotent handlers as baseline; assume at-least-once delivery. Do not try to abstract exactly-once differences.
- Per-aggregate ordering only. No global ordering guarantee.
- Backend-specific retention / compaction / DLQ / monitoring stays out of the abstraction.
- Event schema evolution: every event carries
schema_version. Adapters do not transform; upcasting lives in the projection layer. - Configuration selects the backend at deploy time; same image runs anywhere.
- The adapter boundary separates canonical append from distribution. A deployment may use Postgres as the canonical append log and JetStream/Kafka as the fan-out transport, or a cloud-native log plus a compacted replay archive, but it must expose the same replay semantics.
9. Payload store abstraction (port/adapter)
class PayloadStorePort(Protocol):
async def store(
self, data: bytes | AsyncIterator[bytes],
content_type: str, metadata: Mapping[str, str]
) -> PayloadReference: ...
async def fetch(
self, reference: PayloadReference
) -> bytes | AsyncIterator[bytes]: ...
async def exists(self, reference: PayloadReference) -> bool: ...
async def delete(self, reference: PayloadReference) -> None: ...
async def resolve_envelope(
self, envelope: EventEnvelope
) -> EventEnvelope: ...
async def externalize_envelope(
self, envelope: EventEnvelope, threshold: int
) -> EventEnvelope: ...
Adapters: S3 (reference, covers SeaweedFS via S3-compat), GCS, Azure Blob, local filesystem.
Constraints:
- Content-addressed by SHA-256. Two events with identical large payloads share a reference. No duplicate uploads.
- Immutable on write. Versioning happens through new references, not in-place updates.
- Transparent to handlers via
resolve_envelope/externalize_envelopemiddleware. - Retention is reference-counted by event store retention; configurable TTL as safety net.
- Encryption at rest is per adapter.
- Streaming upload / download. No buffering full payloads in memory.
The cache hierarchy described in §6 (Tier 1 → 1.5 → 2 → 3) sits on top of this interface, transparent to user code.
10. Cloud distributed OS surfaces
This layer is what lets NoETL behave like a distributed business operating system for multitenant organizations. It provides identity, addressability, locality, replay, and resource accounting across clusters without coupling application logic to a specific cloud.
10.1 Unified resource locator
Every addressable thing in the system gets a stable URI:
noetl://tenant/<tenant>/org/<org>/cluster/<cluster>/region/<region>/zone/<zone>/node/<node>/process/<pid>/<kind>/<id>
Examples:
noetl://tenant/acme/org/care-network/cluster/prod-1/region/us-central1/zone/us-central1-a/
node/gke-noetl-pool-a-7b/process/1/worker/cpu-01
noetl://tenant/acme/org/care-network/cluster/prod-1/region/global/none/none/none/
stream/events/<execution>/<shard>
noetl://tenant/acme/org/care-network/payloads/sha256/<sha256>
Tenants, organizations, workers, projectors, MCP servers, JetStream streams, Tier 3 payloads, cache entries, projection shards, and frame leases are all referenceable. The locator is the join key across the event store, projection store, and observability layer.
Implementation status:
repos/noetl/noetl/core/resource_locator.pyprovides the first side-effect-free parser/builder for canonicalnoetl://...locators. It validates the scheme, rejects query/fragment data, percent-encodes path segments, extracts alternating key/value pairs, and supports existing execution result refs such asnoetl://execution/<id>/result/<step>/<ref>.- Initial adoption is intentionally non-invasive. Frame commit durable-reference validation, DSL compact result-ref normalization, and agent compact result-ref normalization now parse
noetl://locators with this utility. Remaining follow-up PRs should replace ad hocstartswith("noetl://")parsing where topology or tenant/org extraction is required.
10.1.1 Multitenant isolation
Multitenancy is enforced at the event/payload boundary first, then projected outward:
- Event partitions include
tenant_id,organization_id, and time/execution keys. - Payload object keys include tenant/org prefixes and content digests; encryption context is tenant-scoped.
- Cache keys include tenant/org/execution. Shared cache layers may share infrastructure, not key space.
- Projection stores use row-level security or physical database/schema separation depending on the tenant isolation tier.
- Replay APIs require tenant/org scope and never scan a global stream without an explicit operator permission.
- Cross-tenant materialized analytics are allowed only through curated, governed projections with redaction rules encoded as projection code.
10.2 Topology-aware scheduling
Frame claim has an optional locality preference:
POST /api/stages/{stage_id}/frames/claim
body:
worker_id: noetl://tenant/.../org/.../cluster/.../worker/cpu-01
locality:
prefer_node: <node_id> # for Tier 1.5 colocation
prefer_zone: us-central1-a # for Tier 2 cache locality
max_distance: zone | region | any
Initial implementation: cursor workers include best-effort node_id, cluster_id, region, zone, worker_pool, and runtime locality in the frame claim body, and the server persists that object on the frame.dispatched event metadata. When possible, the server also records worker_locator as a canonical noetl://tenant/.../org/.../cluster/.../node/.../worker/... identity. If the frame cursor includes source_locality or producer_locality, the server records a placement evaluation (distance, requested max_distance, and within_max_distance) on the dispatch event. The scheduler still claims the closest available frame using the existing indexed frame predicates; enforcing prefer_node / prefer_zone against selection is the next step after producer locality is persisted consistently.
Target scheduler behavior: try the closest match. Frames produced by a worker prefer to be reduced by a worker on the same node (Tier 1.5 hit) or in the same zone (Tier 2 hit). Cross-region only when local capacity is exhausted.
10.3 Autoscaling
The current deployable KEDA scaler reads the NoETL runtime backlog metric:
- Source:
noetl_frame_backlog_total{stage_kind="all",status="all"}via a Prometheus-compatible metrics endpoint. - Target: one backlog unit per desired worker by default, clamped by
worker.autoscaling.minReplicas/maxReplicas. - Enablement:
worker.autoscaling.enabled=trueplusworker.autoscaling.keda.enabled=truein the Helm chart. When this mode is on, the chart renders aScaledObjectand suppresses the CPU/memory HPA to avoid competing scale controllers. - Detail signal:
noetl_frame_backlog_detail_total{tenant_id=...,organization_id=...,stage_kind=...,status=...}exposes the same runtime backlog grouped by tenant and organization for dashboards and future policy engines. It is intentionally separate from the global KEDA metric so tenant cardinality does not affect the default scaler query.
No orchestration component should query the runtime database directly for scaling. Postgres is the current storage/projection implementation; the autoscaling contract is the NoETL metric stream so the backing store can change without rewriting cluster control surfaces.
The next scheduler-aware version should use two richer signals without changing replay semantics:
noetl_frame_backlog_detail_total{tenant_id=...,organization_id=...,stage_kind=...,status=...}— pending frames waiting for a worker, partitioned by stage kind and tenant/org.frame_p95_lease_duration— moving p95 of how long frames take.
Autoscale formula: desired_workers = max(1, ceil(frame_backlog_total / target_concurrent_frames_per_worker)), clamped by per-cluster max. No provisioned capacity number.
For multi-cluster, the NATS JetStream supercluster routes the same noetl.events.* subjects across clusters. The scheduler can claim frames from peer clusters when local backlog has cleared. This is opt-in and gated by an egress cost budget.
Initial operations surface: noetl/ops#109 adds automation/infrastructure/nats_supercluster.yaml, an opt-in playbook that renders and deploys a gateway-enabled NATS JetStream member for the active Kubernetes context. The existing single-cluster NATS manifest remains unchanged. Operators pass a logical cluster_name, externally reachable gateway_advertise address, and explicit remote_gateways entries. This matches the NATS gateway model: gateways use a dedicated gateway port, connect clusters into a supercluster, and each gateway node needs bidirectional reachability to remote gateway nodes. The playbook also exposes a gatewayz action for inspecting gateway links.
References:
10.4 Resilience
- Frame lease + cursor checkpoint = recovery primitive. Worker crash within a frame loses at most one frame's worth of work; the next worker resumes from the last committed cursor.
- Tier 3 is the durable read. Tier 1.5 / 2 are optimizations; their failure modes are graceful degradation, not data loss.
- Projection store writes are idempotent by
(stage_id, frame_id, partition_id). Replays are safe. - Frame reaper (rebranded command reaper) republishes stale leases.
10.5 Quantum-cloud posture
"Quantum cloud" here means cloud infrastructure that can place work across heterogeneous compute and storage fabrics while preserving one event-sourced timeline per tenant organization. The runtime should assume future worker classes beyond CPU/GPU, including confidential compute, specialized accelerators, and quantum-provider task queues.
The contract stays the same:
- Specialized workers receive frames with typed input payload references.
- Provider submissions, callbacks, measurements, and result ingests are events with immutable payload references.
- External provider state is modeled as side effects with correlation/idempotency keys, not as hidden runtime state.
- Replay can reconstruct what was submitted, what was observed, and what state NoETL derived from it, even when the physical computation cannot be rerun deterministically.
11. Configuration
YAML, shared between Python and Rust runtimes:
runtime:
tenant_id: ${NOETL_TENANT_ID}
organization_id: ${NOETL_ORGANIZATION_ID}
node_id: ${POD_IP} # mandatory, used in locator + IPC hint
cluster_id: ${NOETL_CLUSTER_ID}
region: ${NOETL_REGION}
zone: ${NOETL_ZONE}
shard_id: ${NOETL_SHARD_ID} # for StatefulSet projectors / workers
scheduler:
locality_preference: zone # node | zone | region | any
max_inflight_frames_per_worker: 4
event_store:
canonical_backend: postgres # append-only replay ledger
distribution_backend: nats-jetstream # kafka | google-pubsub | azure-event-hubs | aws-kinesis | aws-msk
connection:
url: nats://nats.nats.svc.cluster.local:4222
stream_prefix: noetl.events
replay:
snapshot_interval_events: 10000
checksum_algorithm: blake3
require_projection_parity: true
serialization:
envelope_format: canonical-json
tabular_payload_format: arrow-ipc-stream
schema_registry:
backend: file # file | db | external
path: /etc/noetl/schemas
compression: zstd
timezone: UTC
payload_store:
backend: s3 # gcs | azure-blob | seaweedfs | local
connection:
endpoint: https://storage.googleapis.com
bucket: noetl-payloads-prod
threshold_bytes: 262144 # 256 KB inline cap
content_addressing: sha256
encryption_at_rest: true
cache:
tier_1_memory_mb: 512
tier_15_node_budget_mb: 1024 # Arrow IPC budget per node
tier_15_grace_seconds: 30
tier_2_disk_gb: 10
tier_2_disk_path: /var/cache/noetl/payloads
gc:
strategy: reference-count # | ttl
ttl_days: 90
projection_stores:
default:
backend: postgres
connection:
dsn: postgresql://noetl:***@pg.noetl.svc:5432/noetl
search:
backend: search-store
connection:
hosts: ["http://search.search.svc:9200"]
analytics:
backend: columnar-analytics
connection:
dsn: ${NOETL_ANALYTICS_DSN}
streaming:
backend: streaming-materialized-view
connection:
dsn: ${NOETL_STREAMING_MV_DSN}
materializations:
event_lake:
backend: columnar-analytics
source: event_store
include_payload_refs: true
operational_mvs:
backend: streaming-materialized-view
source: event_store
barrier_interval_ms: 1000
snapshots:
backend: postgres # defaults to projection_stores.default
frame_policy_defaults:
loop:
size: 50
duration_ms: 30000
memory_bytes: 67108864
parallelism: 1
fanout:
size: 1
duration_ms: 5000
memory_bytes: 16777216
parallelism: 8
The same image runs anywhere. Backend changes are config-only.
12. Refactor plan (phased, additive)
Phase 0 — Instrumentation (1 week)
Implementation status: partially shipped in noetl/noetl#435.
- Add the metrics in §4 to Grafana / VictoriaMetrics dashboards (already deployed).
- Baseline a fresh PFT v2 run on GKE. Capture the metric values and pin to memory.
- Add
noetl.stageandnoetl.frametables via canonical Postgres DDL. Done in Phase 0 as additive DDL. - Add canonical event-envelope schema validation, including tenant/org scope,
schema_name,schema_version,idempotency_key, payload digest, and deterministic checksum fields. The additive columns are present; validation hardening remains incremental. - Add replay harness: rebuild execution/frame/loop projections from
noetl.event+ payload store and compare checksums against live projection rows. The first replay API is shipped; full parity harness remains tracked bynoetl/noetl#440.
Validation notes:
- Focused runtime/replay regression is green in
repos/noetl. - Local kind deployment is validated via
repos/ops/automation/infrastructure/kind.yamlandrepos/ops/automation/development/noetl.yamlusing Podman. - Latest local kind validation for
noetl/noetl#435: imagelocal/noetl:2026-05-16-22-49, health/replay/pod/log smokes passed,/metricsexposes the IPC counters, a temporary frame claim emittedstream_version=1, 64-characterenvelope_checksum, and non-nullcatalog_id, in-pod Arrow IPC TempStore round-trip passed, in-pod cursor-worker frame capture produced two Arrow-backed frames from three claimed rows, and the deployed server image importsReplayStateProjector,NATSProjectorWorker,NATSEventPublisher, and thepython -m noetl.projectorentrypoint. - Frame-loop local kind validation on 2026-05-17 (
noetl/noetl#435,noetl/e2e#22,noetl/ops#98) completed PFT v2 execution628959278765703700in 33m10s. The run populated 60noetl.stagerows and 5,570noetl.framerows, emitted 60stage.opened, 5,570frame.dispatched, and 5,570frame.committedevents, and emitted zeroframe.failedevents. The fixture used threepaginated-apireplicas withPFT_RATE_LIMIT=500; strict fixture 429 checks and worker/server error checks were clean in the final validation windows. - Stage terminal events are now implemented and validated: the executor emits
stage.closed, recordsclosed_event_id, and marks the stageCOMPLETEDwhen the loop epoch finishes. Trackernoetl/noetl#443is closed; future failed/abandoned/reaper-specific gaps should be tracked as narrower bugs. - Lineage validation on local kind image
localhost/local/noetl:2026-05-17-13-32completed PFT v2 execution629003028435042682in 33m36s withcompleted=trueandfailed=false. Final counters: 60stage.opened, 60stage.closed, 5,562frame.dispatched, 5,562frame.committed, zeroframe.failed, all 60 stagesCOMPLETED,frame.command_id/claimed_event_id/terminal_event_idpopulated on all 5,562 frames, and completed frames averaged 49.34 rows with max 50. - Frame-mode local kind validation on image
localhost/local/noetl:2026-05-17-18-07completed PFT v2 execution629145120213828019in 279.785s withcompleted=trueandfailed=false. Final counters: 5,498 frames, average 49.92 rows/frame, max 50, metrics present on every frame, all 10 facilities at 1000/1000 for the five patient domains, and MDS 224,443/224,443. - Frame-lineage local kind validation on image
localhost/local/noetl:2026-05-17-18-56completed PFT v2 execution629166136403165640in 269.156s withcompleted=trueandfailed=false. Final counters: 60 stages opened/closed, 5,500 frames, 5,440 frames withparent_frame_id(one root per stage), 5,500/5,500claimed_event_idandterminal_event_idlinks populated, 5,500frame.dispatched, 5,500frame.committed, zeroframe.failed, all 10 facilities at 1000/1000 for the five patient domains, and MDS 224,443/224,443. - Frame-lifecycle local kind validation on image
local/noetl:2026-05-17-19-10completed PFT v2 execution629173479832551841in 340.435s withcompleted=trueandfailed=false. Final counters: 60 stages opened/closed, 5,499 frames, 5,439 frames withparent_frame_id(one root per stage), 5,499/5,499claimed_event_idandterminal_event_idlinks populated, 5,499frame.dispatched, 5,499frame.started, 5,499frame.committed, zeroframe.failed, all 10 facilities at 1000/1000 for the five patient domains, and MDS 224,443/224,443. The validation command passed--set num_facilities=1 --set patients_per_facility=100, but catalog execution overrides were ignored and the run used defaults10/1000; this is tracked innoetl/cli#12. - Frame-stream lock-order validation on image
local/noetl:2026-05-17-19-51completed PFT v2 execution629193644754337792in 487.855s withcompleted=trueandfailed=false. Final counters: 60 stages opened/closed, 5,496 frames, 5,436 frames withparent_frame_id(one root per stage), 5,496/5,496claimed_event_idandterminal_event_idlinks populated, 5,496frame.dispatched, 5,496frame.started, 5,496frame.committed, zeroframe.failed, all 10 facilities at 1000/1000 for the five patient domains, MDS 224,443/224,443, and no frame endpoint deadlocks or 500s in the validation log window. Two prior validations on imageslocal/noetl:2026-05-17-19-26andlocal/noetl:2026-05-17-19-42were cancelled after exposing frame event deadlocks; the fix is consistent lock acquisition before frame row mutation. - Replay hardening now folds direct
event.stage_id,event.frame_id, andevent.command_idcolumns; records stage open/close event ids and frame claim/terminal event ids; and treatsframe.abandonedas a deterministic lease-recovery transition. The golden replay corpus checksum was updated to cover these extra lineage fields. - Frame replay parity now includes a deterministic frame projection checksum that can compare live
noetl.framerows with replayed frame state. The normalized parity surface includes frame/stage/parent/command ids, claim and terminal event links, status, row count, cursor metadata, emitted-event count, and Arrow IPCrows_refsummary fields (sha256,schema_digest,row_count,media_type,ref). Against local kind PFT execution629193644754337792, live and replayed frame projection checksums matched ata65a8ce90d228332d3ce50958c6f41f1f8d584c36ee45ecd1645d61cb0e90cefacross 5,496 frames. - Command replay parity now has the same normalized checksum surface for live command projection rows and replayed
commandsstate, including stage/frame ids, parent command id, issued/claimed/started/terminal event links, worker locator, locality, source locality, and placement metadata. - Business-object replay parity now has normalized checksum helpers for live business-object projection rows and replayed
business_objectsstate, including object ids, status/version, event lineage, deletion marker, latest payload summary, payload-ref count, and folded attributes. - Loop replay parity now has normalized checksum helpers for live loop progress rows and replayed
loopsstate, including totals, done/failed counters, completion state, and last folded event id. - Execution replay parity now has normalized checksum helpers for live execution projection rows and replayed top-level execution state, including tenant/org scope, status, last node/event, event count, latest payload summary, and upcaster registry digest.
- Stage replay parity now has normalized checksum helpers for live stage projection rows and replayed
stagesstate, including stage status, kind, parent/loop linkage, open/close event ids, frame/row/emitted/failed counters, and last folded event id. - Replay parity now has an aggregate checksum bundle helper exposed on replay responses as
projection_checksums, with named checksums forexecution,stages,frames,commands,business_objects, andloops. Replay-state projector metadata carries the same bundle so consumers do not need to deserialize the full state snapshot for parity dashboards. Storage adapters can produce the live side throughlive_projection_checksum_bundle(...), compare bundles throughprojection_checksum_parity_report(...), or usescripts/check_replay_parity_report.pyfor offline JSON bundle checks, giving release gates one stable comparison artifact while preserving surface-level diagnostics when any checksum diverges. - Replay input reads now use the
ReplayEventReaderport. The referencePostgresReplayEventReaderis only an adapter; replay folding, checksum bundles, and parity reporting remain storage-neutral and can be reused by future event-log backends.ReplayService.replay_state(..., event_reader=...)lets validation tools and future adapters inject a reader without mutating process-global state. - Replay payload reads now use the
ReplayPayloadResolverport. The referenceTempStoreReplayPayloadResolverresolves existing TempStore/ResultStore references and returns bounded verification data (checksum, size, row count, value type, error) so replay validation can prove payload availability without returning large row bodies or coupling to a specific durable store.payload_resolution_summaryis the release-report surface for payload availability: total refs, resolved/unresolved counts, unique refs, all-resolved status, and a deterministic checksum of the bounded report. - Replay schema evolution now uses the same dependency-injection pattern as event reads and payload resolution:
ReplayService.configure_upcaster_registry(...)sets the process default for a runtime, whileReplayService.replay_state(..., upcaster_registry=...)lets validation runs replay the same event stream with an explicit registry and assert the resultingupcaster_registry_digest. - Offline replay payload validation now has a release-gate script:
scripts/check_replay_payload_resolution_report.py --report replay.json. It consumes replay-state JSON, recomputes the bounded payload-resolution summary, and exits nonzero on unresolved payload refs or summary checksum mismatch. - Replay state report capture now has a small API fetcher:
scripts/fetch_replay_state_report.py --base-url ... --execution-id ... --output replay.json [--resolve-payloads]. The output is stable JSON intended for the offline projection parity and payload-resolution gates, and the fetcher rejects non-object replay JSON before writing the artifact. - Replay validation orchestration now has a runner:
scripts/run_replay_validation.py --base-url ... --execution-id ... --output-dir ... [--live-checksums live.json | --live-rows live-rows.json] [--resolve-payloads] [--report-output validation.json]. It captures replay state, verifies the fetch artifact was produced, optionally buildslive-checksums-<execution_id>.jsonfrom adapter-exported live projection rows, runs offline gates, and emits a reproducible validation manifest with config, UTC timestamps, per-step duration, command output, and parsed JSON stdout for local kind and GKE evidence. - Replay validation manifests are themselves gateable:
scripts/check_replay_validation_manifest.py --manifest validation.json [--check-artifacts]verifies manifest config shape, ISO timestamps, required step order, successful step return codes, non-negative durations, optional artifact paths, and mutually exclusive live parity inputs. Failed manifests can be inspected with--allow-failed, but release evidence should pass without it. - Live parity artifacts are storage-neutral:
scripts/build_live_projection_checksums.py --rows live-rows.json --output live-checksums.jsonconsumes adapter-exported JSON row arrays forexecution,stages,frames,commands,business_objects, andloops, validates row shape, and writes the canonicalprojection_checksumsbundle plus row counts. Postgres is only one possible exporter; the checksum builder operates on rows, not database routines. - The current deployed-cluster live-row exporter is
scripts/export_live_projection_rows_postgres.py --execution-id ... --tenant-id ... --organization-id ... --output live-rows.json. It is intentionally an adapter, not the contract: it emits the samerows.{execution,stages,frames,commands,business_objects,loops}JSON shape any future storage backend can produce. It uses ordinary reads fromnoetl.execution,noetl.stage,noetl.frame,noetl.command,noetl.event, and the replay-state projection record; no database functions or routines are required for validation evidence. - Live-row evidence is gateable before checksum comparison:
scripts/check_live_projection_rows.py --rows live-rows.jsonvalidatesschema_version, adapter identity, tenant/org/execution/projection scope,exported_at, complete canonical row surfaces, per-surfacerow_counts, and the canonicalrows_checksum.scripts/build_live_projection_checksums.pyalso rejects nested row artifacts whoserow_countsorrows_checksumdrift from the actual rows, so a parity pass cannot be based on mutated live-row evidence. Validation manifests that include live-row evidence must containlive_rows_integritybeforelive_checksums;scripts/check_replay_validation_manifest.pyenforces that order whenartifacts.live_rows,config.live_rows, orconfig.export_live_rows_postgres=trueis present. - A complete local-kind or GKE parity run can now produce all validation artifacts from one command when the current reference storage adapter is available:
python scripts/run_replay_validation.py \
--base-url "$NOETL_BASE_URL" \
--execution-id "$EXECUTION_ID" \
--tenant-id "$TENANT_ID" \
--organization-id "$ORGANIZATION_ID" \
--export-live-rows-postgres \
--resolve-payloads \
--output-dir "$ARTIFACT_DIR" \
--report-output "$ARTIFACT_DIR/validation-$EXECUTION_ID.json" \
--artifact-index-output "$ARTIFACT_DIR/artifact-index-$EXECUTION_ID.json"
That runner writes replay-$EXECUTION_ID.json, live-rows-$EXECUTION_ID.json, live-checksums-$EXECUTION_ID.json, validation-$EXECUTION_ID.json, and artifact-index-$EXECUTION_ID.json. The manifest records both the source live-row export and the derived live checksum artifact so --check-artifacts proves the full evidence chain exists. When --artifact-index-output is used, the runner records the artifact_index step in the final manifest before packaging, so the artifact index hashes the same manifest operators inspect. The artifact index records SHA-256 and size for each evidence file, and --check enforces exactly one manifest, replay, and report role plus paired live_rows / live_checksums roles when live evidence is present. Paths inside the index are relative to the artifact-index directory when the files live in the same bundle, while artifacts outside the bundle remain absolute; the checker validates both the path_base marker and the top-level manifest pointer before checking entry digests. The manifest checker also validates the referenced artifact index under --check-artifacts, verifies that the index points back to the same manifest, and requires artifact_index to be the final manifest step. This means the whole bundle can be moved out of the cluster and rechecked from any working directory for completeness as well as drift:
python scripts/check_replay_validation_manifest.py \
--manifest "$ARTIFACT_DIR/validation-$EXECUTION_ID.json" \
--check-artifacts
python scripts/package_replay_validation_artifacts.py \
--check "$ARTIFACT_DIR/artifact-index-$EXECUTION_ID.json"
python scripts/check_replay_validation_bundle.py \
--manifest "$ARTIFACT_DIR/validation-$EXECUTION_ID.json"
For non-Postgres stores or offline adapter development, export the same live-row JSON shape separately and pass it to the runner:
python scripts/export_live_projection_rows_postgres.py \
--execution-id "$EXECUTION_ID" \
--tenant-id "$TENANT_ID" \
--organization-id "$ORGANIZATION_ID" \
--output "$ARTIFACT_DIR/live-rows-$EXECUTION_ID.json"
python scripts/run_replay_validation.py \
--base-url "$NOETL_BASE_URL" \
--execution-id "$EXECUTION_ID" \
--tenant-id "$TENANT_ID" \
--organization-id "$ORGANIZATION_ID" \
--live-rows "$ARTIFACT_DIR/live-rows-$EXECUTION_ID.json" \
--resolve-payloads \
--output-dir "$ARTIFACT_DIR" \
--report-output "$ARTIFACT_DIR/validation-$EXECUTION_ID.json"
python scripts/check_replay_validation_manifest.py \
--manifest "$ARTIFACT_DIR/validation-$EXECUTION_ID.json" \
--check-artifacts
- Replay validation now starts with state-report integrity:
scripts/check_replay_state_report.py --report replay.jsonrequires the canonical replay envelope, validates required string fields plus strict JSON integer fields, enforceschecksum_algorithm=sha256, requires the complete canonicalprojection_checksumssurface set, and recomputes the replay state's top-level checksum plus projection checksum bundles from the replay JSON. Mutated state, incomplete envelope data, malformed field types or snapshot metadata, missing/unknown checksum surfaces, or surface-level drift fail before parity or payload gates run. - Replay parity validation now treats bundle shape and checksum shape as part of the contract:
scripts/check_replay_parity_report.pyrequires the complete canonical surface set (execution,stages,frames,commands,business_objects,loops) for replayed and live bundles, rejects unknown surfaces, and requires lowercase SHA-256 hex values unless--allow-invalid-checksum-shapeis used for a legacy ad-hoc report. - Payload-resolution validation now treats row shape, resolved payload checksums, and summary shape as part of the replay contract:
scripts/check_replay_payload_resolution_report.pyrequires everypayload_resolutionentry and itsresolutionvalue to be objects, requires lowercase SHA-256 hex values for every resolved payload checksum, and validates suppliedpayload_resolution_summaryobjects with exact fields, strict non-negative integer counters, booleanall_resolved, and lowercase SHA-256 checksum shape before accepting the summary. - Replay state-report validation now also checks registry digest shape:
upcaster_registry_digestand any snapshot registry digest must be lowercase SHA-256 hex values before replay/snapshot compatibility is accepted. - Time-cutoff replay can now seed from
projection_snapshotby first resolvingas_of_timeto the maximum visible event id at or before that timestamp, then applying the sameversion <= cutoff_event_idguard used by position-based replay. This keeps snapshots as accelerators only; events after the snapshot version are still folded through the requested cutoff. - Replay snapshot compatibility now checks
upcaster_registry_digestbefore seeding. If a candidate snapshot was produced by a different registry, replay ignores it and reloads from canonical events instead of mixing reducer/schema versions. - Closed completed Phase 1 cleanup trackers after merged validation:
noetl/noetl#443stage terminal projection,noetl/noetl#444distributed workload override handling via merged CLI fix,noetl/noetl#446frame recovery hardening,noetl/noetl#447frame payload replay parity, andnoetl/noetl#448loop supervision event-id compatibility. Active hardening PRs arenoetl/noetl#453for the frame claim hot path andnoetl/noetl#454for execution status projection stability. - Full repository pytest collection currently has legacy collection blockers unrelated to Phase 0; tracked by
noetl/noetl#440.
Deliverable: dashboard URL + memory entry with baseline numbers, plus a replay parity report for the baseline run. No code change to hot paths.
Phase 1 — Frame-shaped cursor loops (2 weeks)
Status: implementation shipped; live revalidation pending in local kind/GKE after the current workspace Podman issue is resolved.
Goal: collapse N single-row cursor claims into N/50 multi-row frame claims and allow selected steps to run the declared task pipeline once per frame.
- Extend
cursor_worker.pyto enforce theframe_policypayload now emitted alongside the existing cursor spec. The first implementation is innoetl/noetl#435: bounded claim windows are processed in-process and captured as Arrow IPC frame refs. - New
POST /api/stages/{stage_id}/frames/claimendpoint; under the hood it calls existingclaim_next_loop_indiceswithLIMIT = frame_policy.size. - Worker either iterates the claimed rows in-process (
frame.process: row) or executes the task pipeline once withframe.rows(frame.process: frame). Both paths serialize the frame row window as Arrow IPC through TempStore. This pulls a narrow Tier 1.5 slice forward because serialization correctness is required before the frame API can be the sole path. - Worker commits the frame with one event per frame instead of one per row.
- Migrate
test_pft_flow_v2.yamlto opt in vialoop.spec.frame:on eachmode: cursorstep. The high-volume PFT steps useprocess: frameto batch HTTP calls and Postgres writes from the declared playbook, rather than hiding the loop in Python. loop.spec.max_in_flightnow accepts either a positive integer or a renderable expression such as{{ workload.pft_http_concurrency }}. The rendered value is re-validated as a positive integer before collection/cursor dispatch and cursor recovery paths reuse the same resolver. This closes the catalog-registration workaround that previously forced PFT v2 to hard-code the resolved concurrency value.
Verification:
- Total
command.*count drops from ~150k to < 20k on PFT v2. - Wall time should drop versus the default row-processing baseline. The 2026-05-18 local kind validations improved the full PFT v2 run from ~31m21s to ~4m30s-~8m08s, with the latest clean lock-order validation favoring correctness over optimistic concurrent frame event writes.
- Replay parity stays green for frame state, loop progress, execution status, and frame-mode payload references. The parity gate compares live and replayed frame projection checksums, including Arrow IPC
rows_refdigest and schema metadata.
Phase 2 — Decentralized projection (2 weeks)
Status: implementation shipped behind opt-in deployment/config; live mirrored-event projector validation against PFT v2 remains pending.
Goal: extract projection from server, run as a StatefulSet, scale independently.
- New
noetl-projectorbinary entrypoint reusing the existing projection worker code. - Helm chart adds the StatefulSet, NATS durable consumer per replica, projection-store-only DB user.
- Remove the in-process projection loop from the server. Server now appends canonical events and mirrors frame lifecycle, direct event ingestion, async batch ingestion, execution-route command/cancel events, and generated command events to the distribution stream.
- Add per-shard projection lag/checkpoint metrics from durable projection metadata. Initial gauges are in
noetl/noetl#458; dashboard wiring and live mirrored-event validation remain. Shard-skip counters fromnoetl/noetl#536distinguish normal cross-shard traffic from malformed/unshardable envelopes during live mirrored-event validation. Stale-write counters fromnoetl/noetl#537distinguish harmless duplicate/out-of-order delivery from missing projector output. Invalid projector shard settings now fail at startup, and env/CLI loaders no longer mask bad values, so a misconfigured StatefulSet replica count or shard id cannot quietly become a no-op projector. Metrics labels include the resolved shard index/count and NATS stream/subject to make that topology auditable during local-kind and GKE validation. Decode-error counters make malformed mirrored payloads visible even when the subscriber rejects the message before projection begins, while projection-error counters isolate failures after a notification reacheshandle_notification; the aggregate error counter covers both. Terminal action counters show ACK throughput, immediate and delayed NAK pressure, and TERM drops before retry attempts re-enter the projector. Last-action and last-batch gauges make the most recent projector decision inspectable without replaying logs, and/summaryplusscripts/check_projector_metrics_summary.pyexpose the same action, batch, error, and lag state as stable validation payloads.run_projector_phase2_validation.pyis the preferred phase runner; userender_projector_phase2_command.pyto produce a repeatable command for the current projector replica URLs, live-row/live-checksum source, and parity requirement. noetl.outboxis now the canonical distribution bridge for server, frame, execution, broker, command-claim, backend-neutral event-store, and DSL executor events. The standalonepython -m noetl.outboxpublisher gives deferred rows a traffic-independent retry path, and projector subscribers decode both JSON envelopes and Arrow Feather payload bytes.
Verification:
- Postgres pool depth high-watermark drops by ~3× on PFT v2 (writer fan-out).
- Server CPU stops spiking during MDS bursts.
Phase 3 — Arrow IPC Tier 1.5 (3 weeks)
Status: Python implementation shipped for cursor/frame payloads; colocated projector/worker hit-ratio validation remains pending.
Goal: add zero-copy data plane for colocated workers and projectors.
- Add
pyarrowandarrow-rs(arrow,arrow-ipc) to dependencies.pyarrowis required for Python cursor/frame serialization; a Rust storage crate is not present in this repository yet, so the Rust parity item remains future adapter work. - Implement Tier 1.5 in
result_store.py(Python).TempStore.put_ipc_byteswrites durable bytes first, then admits an optional same-node IPC hint. - Extend
PayloadReferencewith optionalIpcHint, includingnode_idso foreign-node hints become deterministic cache misses. - Producer worker writes RecordBatch bytes to durable storage plus best-effort shared memory; consumers use
TempStore.resolve()/get_ipc_bytes()to try IPC first and fall back to durable bytes before decoding rows at the consumer boundary. - Per-node IPC budget + lease expiry sweep are implemented in
ArrowIpcSharedMemoryCache; broader node-broker/multi-process coordination remains future work. - Metrics:
noetl_storage_ipc_read_hit_ratioandnoetl_storage_ipc_*counters per server/worker process; dashboard rollups should group by worker pool, runtime, node, and tenant when those labels become available.
Verification:
tier15_hit_ratio> 60% when projector is colocated with worker.- End-to-end PFT v2 wall time targets ÷2 vs Phase 0 baseline.
Phase 4 — Cloud OS surfaces (3 weeks)
Status: foundational identity pieces shipped; a first opt-in backlog scaler is available in the Helm chart; multi-cluster surfaces remain open.
Goal: lift the runtime from "well-behaved on one cluster" to "addressable, schedulable, autoscalable across clusters."
- Implement unified resource locator across all subsystems. The core parser/builder is now present and adopted for frame durable-reference validation plus compact result-ref normalization. Worker metrics, frame claims, command claims, frame-dispatch metadata, and worker-reported runtime events now share the same topology helper; remaining work is broader server-side envelope injection and scheduler enforcement of locality hints.
- StatefulSet identity for workers (not just projectors).
- KEDA scaler with frame backlog signal. Initial Helm implementation is optional and reads
noetl_frame_backlog_total; follow-up work should enrich the metric with tenant/stage-aware labels once those labels are present. - Multi-cluster supercluster docs + an
opsplaybook to provision two GKE regions feeding the same NATS supercluster. Initial opt-in playbook exists for rendering/deploying a gateway-enabled NATS member; GKE-specific load-balancer/TLS defaults remain pending. - Topology-aware scheduling via
localityhint on claim. The claim payload andframe.dispatchedevent metadata now carry the hint plus replayable placement evaluation when source locality exists; selection enforcement remains pending.
Phase 5 — Pluggable event store / payload store / projection store (rolling, separate PRs per adapter)
Goal: every backend class in §§ 8–9 has a working adapter, language-paired, behind a feature flag. Product choices stay deployment-specific; the compliance contract is architectural.
Order (mirroring the existing distributed plan):
- NATS JetStream event store adapter (refactor existing to fit the new port). Python + Rust.
- S3 payload store adapter (refactor existing to fit the new port). Python + Rust.
- Postgres projection store adapter (refactor existing). Python + Rust.
- Kafka event store adapter.
- GCS payload store adapter.
- Cloud key-value/document projection adapter.
- Columnar analytical projection adapter.
- Streaming materialized-view adapter.
- … then the remainder in cloud-priority order.
Every adapter ships with:
- Compliance test suite (language-agnostic spec, run against both implementations).
- Replay parity suite for canonical events, payload references, snapshots, and projection checksums.
- Docker-compose entry for local development.
- Cloud-provisioning ops playbook in
repos/ops/automation/.
Phase 6 — Stage planner for fan-out / reduce (4 weeks)
Goal: extend the stage/frame model from loops to fan-out and reduce, completing the map-reduce shape.
- Stage
kind='fanout': explodes a single input into N partitions, each handed to a frame. - Stage
kind='reduce': consumes M partitions, emits one output. Reduce frame waits on partition availability events instead of cursor rows. - Replace the design in
distributed_fanout_mode_spec.mdwith this materialized version.
13. Risks and open questions
- Tier 1.5 GC under crash. A producer worker that crashes after writing shm but before committing the frame leaves shm regions that no one will unlink. Mitigation: per-node
shm_unlinksweep on worker start (scans/dev/shm/noetl-*and unlinks regions older thantier_15_grace_seconds). - NATS supercluster cost. Cross-region replication of every event stream is not free. Mitigation: opt-in per execution; default is single-region.
- Frame size tuning. A frame too big means more work lost on crash; too small means coordination dominates. Mitigation: start with the §5.2 default, expose per-step override, measure with the §4 dashboard. Operators should choose
max_rowsfrom lost-work tolerance: a 50-row frame means a crash can replay up to 50 claimed cursor rows as one unit; high-cost or externally side-effecting steps should use smaller frames until idempotency is proven. Increaselease_secondsto at least the p99 frame runtime plus one heartbeat interval, and keepmax_attemptslow enough that repeated whole-frame failures page an operator instead of silently cycling. - Reduce-side back-pressure. A reduce stage that is slower than its upstream fanout fills the projection store inbox. Mitigation: reuse the existing
max_inflightconcept at the stage level, not the worker level. - Multi-process per pod. Some MCPs prefer multiple processes. Tier 1.5 then needs a node-local broker. Out of scope for v1; revisit if a real MCP demands it.
14. Source code anchors (current state)
For implementers. Existing code that this spec extends, not replaces:
| Concern | File | Key symbol |
|---|---|---|
| Loop expansion (parallel mode) | noetl/core/dsl/engine/executor/commands.py | _create_command_for_step |
| Cursor dispatch | noetl/core/dsl/engine/executor/transitions.py | _issue_cursor_loop_commands |
| Cursor worker | noetl/worker/cursor_worker.py | execute_cursor_worker |
| Worker tool dispatch | noetl/worker/nats_worker.py | _execute_tool |
| TempStore tiers | noetl/core/storage/result_store.py | ResultStore.put / resolve |
| TempStore tier enum | noetl/core/storage/models.py | StoreTier |
| NATS client | noetl/core/messaging/nats_client.py | NatsClient.connect |
| NATS K/V cache | noetl/core/cache/nats_kv.py | NatsKv |
| Claim API | noetl/server/api/core/commands.py | claim_command |
| Command reaper | noetl/server/command_reaper.py | CommandReaper |
| Schema DDL | noetl/database/ddl/postgres/schema_ddl.sql | noetl.event, noetl.command |
PFT v2 driver: repos/e2e/fixtures/playbooks/pft_flow_test/test_pft_flow_v2.yaml — six cursor steps, ~120k HTTP calls per execution, the canonical benchmark for every phase above.
15. Out of scope (deferred to future specs)
- Replacing the DSL with a different language. The DSL is fine; this spec rewires the execution layer underneath it.
- Replacing
noetl.eventwith a different source of truth. Event log stays Postgres-backed by default; the event store port lets other backends mirror it. - Replacing NoETL's existing MCP server architecture. Workers and MCPs are orthogonal.
- A new container orchestrator. We assume Kubernetes; the locator scheme is friendly to other orchestrators but they are not a v1 target.
16. Decision log (what changed vs the original event-store-design-prompt)
The original event-store-design-prompt.md (now archived) framed the problem as "add an event store abstraction layer." That framing is necessary but not sufficient. This revision changes the framing:
- Event sourcing becomes the system kernel. The goal is not just to swap storage backends; it is to reproduce tenant/org system state at a requested time from canonical events plus immutable payloads.
- Shared memory becomes a product advantage, not just a cache. The runtime uses Arrow IPC, local disk, distributed K/V, materialized views, and distributed indexes as a rebuildable shared-state fabric around the event log.
- Worker-side loop interpretation remains the dominant immediate cost reduction. The PFT v2 baseline shows that even a perfect event store cannot save us from server-side per-row coordination overhead.
- A specific GC + admission story for Tier 1.5 (budget, grace, unlink sweep). The original handwaved this; we have to make it concrete or the shm region count will run away.
- A staged additive rollout with frame-shaped cursor loops in Phase 1, before Arrow IPC and before pluggable backends. This lets us ship a measurable performance win in two weeks rather than waiting for a multi-quarter abstraction overhaul.
Other elements (three-layer model, backend adapters, content-addressed payloads, configuration schema) carry over from the original with edits to match the existing TempStore and projection-worker code that has shipped since the original was drafted. Product-specific analytical or streaming engines are intentionally not named as dependencies; NoETL adopts the underlying algorithms and contracts.
17. Implementation Roadmap
This section complements §12 (the phased refactor plan) with a phase dependency graph, per-phase acceptance checklist, replay parity gate, and operational readiness criteria. Section 12 defines what each phase builds; this section defines when each phase is ready to ship and what to do if it needs to be rolled back.
17.1 Phase dependency graph
Phases 1 and 2 can run in parallel once the Phase 0 baseline is captured. Phase 3 requires both because it depends on frame-shaped Arrow output (Phase 1) and colocated projectors (Phase 2).
17.2 Pre-flight checklist (before Phase 0 begins)
These conditions must be true before work begins on Phase 0:
-
noetl.stageandnoetl.frameDDL reviewed and approved by database owner. - Grafana / VictoriaMetrics dashboards provisioned with the §4 metric tile set (empty panels are acceptable at this stage).
- A reproducible trigger for a full PFT v2 run exists as an ops runbook entry and has been tested by a team member who did not write it.
-
NOETL_TENANT_ID,NOETL_ORGANIZATION_ID,NOETL_CLUSTER_IDenv vars injectable via Helm values in the target cluster. - Replay harness skeleton (even a no-op stub) runs to completion without error against the current
noetl.eventtable. - Schema registry mount path (
/etc/noetl/schemas) is provisionable from a ConfigMap. - On-call alert routing confirmed for
frame_reaper_republish_rateandpg_pool_depth_highwatermark.
17.3 Per-phase acceptance criteria
| Phase | Ship gate | Replay parity required | Rollback procedure |
|---|---|---|---|
| 0 — Instrumentation | All §4 metrics non-null on dashboard from a complete PFT v2 run; baseline values pinned to memory | Replay harness exits 0; checksum report attached to memory entry | Drop noetl.stage / noetl.frame via inverse Alembic migration; zero runtime impact |
| 1 — Frame loops | Total command.* count < 20k on PFT v2; wall time not regressed vs Phase 0 baseline | Frame state, loop progress, and execution status checksums match live projection | Set frame_policy: null on PFT v2 steps; disable frame claim endpoint; legacy command path unchanged |
| 2 — Decentralized projection | Postgres pool depth < 30 sustained during MDS burst; per-shard projection lag metric visible on dashboard | Projection checksums match single-writer baseline | Re-enable in-process server projection loop; scale noetl-projector StatefulSet to 0 |
| 3 — Arrow IPC Tier 1.5 | tier15_hit_ratio > 60% on colocated consumer; PFT v2 wall time ≤ Phase 0 baseline ÷ 2 | Payload digest and projection checksums unchanged vs Phase 2 | Set tier_15_node_budget_mb: 0; admission control skips Tier 1.5 with no data-path change |
| 4 — Cloud OS surfaces | Unified resource locator present on all event envelopes; KEDA scaler responds to frame backlog signal within 30 s | Locator fields do not alter event content digest; replay checksum unchanged | Disable KEDA scaler; revert locator injection to no-op middleware; locator fields are additive |
| 5 — Pluggable adapters | Each adapter passes compliance + replay parity suite; Docker-compose local dev environment validated end-to-end | Python and Rust parity corpus checksums identical for each adapter under test | Revert backend config values to previous setting; no code removal required |
| 6 — Fan-out / reduce | Stage kind='fanout' and kind='reduce' pass all PFT v2 fan-out phases; distributed_fanout_mode_spec.md superseded | Reduce-frame output checksums match single-partition baseline | Revert fan-out DSL steps to mode: parallel; stage/frame tables remain intact |
17.4 Replay parity release gate
Every phase after Phase 0 must pass all of the following before any merge to main:
Run the fast deterministic local gate first:
scripts/check_replay_release_gate.sh
[ ] Event envelope validation passes for 100% of events emitted in the phase's PFT v2 run.
[ ] Every snapshot records: event position, projection code version, upcaster versions,
payload digest set, tenant encryption context, and deterministic fold checksum.
[ ] Replay harness runs start-to-finish against the phase's full event log and payload refs.
[ ] Execution state checksum (frame rows committed, loop progress) matches live projection.
[ ] At least one configured business projection type checksum matches live projection.
[ ] Python and Rust replay paths produce identical fold checksums on the golden corpus.
[ ] No event references an IpcHint path as its only payload copy
(grep for events missing a Tier 3 payload_ref.uri).
[ ] Idempotency key present on 100% of externally retried transitions in the phase's events.
Replay parity failures block the phase merge. They are not deferred to the next phase.
17.5 Operational readiness (per phase shipped to production)
Before enabling any phase for a production tenant:
- Runbook exists in
repos/ops/automation/covering: how to trigger a replay, how to compare checksums, how to roll back the phase, and how to scale the projector StatefulSet. - On-call alerts live for
frame_reaper_republish_rate > 0.1/sandtier15_budget_exhaustion_total > 0. - Frame budget defaults reviewed against the tenant's actual execution profile (not only PFT v2).
- Tenant encryption context set and verified for all payload refs in the phase's event schema version.
- Retention and TTL policy for Tier 3 objects confirmed with legal / compliance for the tenant.
- Cross-tenant projection isolation verified: no query returns rows from a different
(tenant_id, organization_id)pair without explicit operator permission.