AgentInbox Event Bus Backend Design
This document defines the next-step event bus architecture for AgentInbox.
The goal is not to turn AgentInbox into a general-purpose broker.
The goal is to separate three concerns that are currently too tightly coupled:
- source ingress
- event retention and consumer offsets
- inbox delivery for local agents
Why This Refactor Exists
The current implementation keeps one source-level checkpoint for each hosted source.
That is enough to prove shared source hosting, but it is not enough to model independent consumers on top of the same source.
Today:
- one source has one consumption position
- multiple subscriptions on the same source share that position
- later subscriptions can miss earlier source events
The new design changes that:
- source ingress position remains source-level
- subscription consumption position becomes subscription-level
- one source can safely feed multiple independent subscriptions
This is closer to Kafka, JetStream, and other mature event log systems.
Goals
- keep
AgentInboxas the shared ingress and delivery layer - define a backend abstraction that can support:
- embedded SQLite today
- JetStream later
- Kafka later
- a future Rust implementation later
- keep product vocabulary close to
AgentInbox - keep backend semantics close to mature stream systems
- support one source feeding multiple subscriptions with independent cursors
- support replay and backfill without re-polling the external source
Non-Goals
- do not expose Kafka partitions or broker administration directly in the main
AgentInboxAPI - do not replace
Inboxwith generic queue terminology everywhere - do not make the backend abstraction responsible for source-specific delivery
- do not push runtime task semantics into the backend
Vocabulary
Use two layers of terms on purpose.
Product Vocabulary
Source- external ingress source, such as a GitHub repo
Subscription- agent-specific filtered consumer on top of a source
Inbox- agent-facing inbox destination
InboxItem- item delivered to an inbox after subscription matching
DeliveryHandle- outbound reply/update target
Backend Vocabulary
Stream- append-only event log for one source
StreamEvent- one normalized source event written once into the stream
Consumer- backend consumer identity for one subscription
Offset- consumer progress in the stream
Mapping
- one
Sourcemaps to one backendStream - one
Subscriptionmaps to one backendConsumer - one
StreamEventcan produce zero, one, or manyInboxItems InboxItemack is not the same thing as backend offset commit
Target Architecture
The refactor should separate the runtime into three pipelines.
1. Source Ingress
Responsibilities:
- host the external subscription or poll loop
- normalize source-native payloads into
StreamEvent - append them to the backend
- advance only the source ingress checkpoint
Source ingress does not fan out directly to inboxes.
2. Subscription Consumption
Responsibilities:
- read
StreamEvents from the backend using a subscription-specific consumer - evaluate subscription match rules
- materialize matching
InboxItems - create
Activations - commit that subscription's offset
Each subscription consumes independently.
3. Inbox Delivery
Responsibilities:
- expose inbox read and ack APIs to runtimes
- keep
InboxItemstate independent from stream offsets - preserve
DeliveryHandlefor outbound replies
Current To Target Data Model
Current model:
SourceInterestInboxItem
Target model:
SourceSubscriptionStreamEventInboxInboxItem
Notes:
Interestshould becomeSubscriptionInboxItemshould stop doubling as the source journalStreamEventbecomes the single durable source-side event record
Backend Interface
The backend interface should be small and opinionated.
It should describe event-log behavior, not provider-specific behavior.
export interface EventBusBackend {
ensureStream(input: EnsureStreamInput): Promise<StreamRecord>;
append(input: AppendEventsInput): Promise<AppendEventsResult>;
ensureConsumer(input: EnsureConsumerInput): Promise<ConsumerRecord>;
getConsumer(input: GetConsumerInput): Promise<ConsumerRecord | null>;
read(input: ReadEventsInput): Promise<ReadEventsResult>;
commit(input: CommitOffsetInput): Promise<CommitOffsetResult>;
reset(input: ResetConsumerInput): Promise<ConsumerRecord>;
getStreamStats(streamId: string): Promise<StreamStats>;
getConsumerLag(input: GetConsumerLagInput): Promise<ConsumerLag>;
}
Suggested supporting types:
export interface StreamRecord {
streamId: string;
sourceId: string;
streamKey: string;
backend: string;
createdAt: string;
}
export interface StreamEventRecord {
offset: number;
streamEventId: string;
streamId: string;
sourceId: string;
sourceNativeId: string;
eventVariant: string;
occurredAt: string;
metadata: Record<string, unknown>;
rawPayload: Record<string, unknown>;
deliveryHandle: Record<string, unknown> | null;
}
export interface ConsumerRecord {
consumerId: string;
streamId: string;
subscriptionId: string;
consumerKey: string;
nextOffset: number;
startPolicy: "latest" | "earliest" | "at_offset" | "at_time";
createdAt: string;
updatedAt: string;
}
export interface ReadEventsResult {
consumer: ConsumerRecord;
events: StreamEventRecord[];
highWatermarkOffset: number;
}
Backend Rules
append()must be idempotent on source identityread()must not change offsetscommit()must advance one consumer only- offsets belong to consumers, not streams
- stream offsets are monotonic and append-only
SQLite Backend
The first implementation should be embedded and local.
SQLite is enough because the current problem is durable local event retention, not high-throughput distributed messaging.
Tables
streams
One record per hosted source stream.
create table if not exists streams (
stream_id text primary key,
source_id text not null unique,
stream_key text not null unique,
backend text not null,
created_at text not null
);
stream_events
Append-only normalized source journal.
create table if not exists stream_events (
offset integer primary key autoincrement,
stream_event_id text not null unique,
stream_id text not null,
source_id text not null,
source_native_id text not null,
event_variant text not null,
occurred_at text not null,
metadata_json text not null,
raw_payload_json text not null,
delivery_handle_json text,
created_at text not null,
unique(stream_id, source_native_id, event_variant)
);
create index if not exists idx_stream_events_stream_offset
on stream_events(stream_id, offset);
create index if not exists idx_stream_events_stream_occurred_at
on stream_events(stream_id, occurred_at);
Notes:
offsetis the backend ordering keyunique(stream_id, source_native_id, event_variant)keeps source ingress idempotentcreated_atis insertion timeoccurred_atis source event time
consumers
One record per subscription consumer.
create table if not exists consumers (
consumer_id text primary key,
stream_id text not null,
subscription_id text not null unique,
consumer_key text not null unique,
start_policy text not null,
start_offset integer,
start_time text,
next_offset integer not null,
created_at text not null,
updated_at text not null
);
create index if not exists idx_consumers_stream
on consumers(stream_id);
Notes:
next_offsetis the next unread offset for this consumer- offset state lives here, not on
sources
consumer_commits
Optional audit table for debugging and observability.
create table if not exists consumer_commits (
commit_id text primary key,
consumer_id text not null,
stream_id text not null,
committed_offset integer not null,
committed_at text not null
);
create index if not exists idx_consumer_commits_consumer
on consumer_commits(consumer_id, committed_offset);
This table is not required for correctness. It is useful for debugging, replay audits, and operator visibility.
Existing Tables That Stay
The following current tables still make sense:
sourcesinterests, to be renamed tosubscriptionsinbox_itemsactivationsdeliveries
The key change is:
inbox_itemsstops being the only durable event storestream_eventsbecomes the source journal
Subscription Start Policy
New subscriptions need an explicit start policy.
Recommended options:
latest- begin after the current high watermark
earliest- consume from offset
1
- consume from offset
at_offset- consume from an explicit offset
at_time- consume from the first event at or after a timestamp
Recommended default:
- runtime-created subscriptions default to
latest - operator/debug subscriptions may choose
earliest
This avoids accidental inbox floods while keeping replay explicit.
Event Consumption Logic
Source Ingress Logic
- source adapter receives or polls source-native events
- source adapter normalizes them into
StreamEventpayloads - source adapter calls
backend.append() - source adapter updates source ingress checkpoint
Important:
- source ingress checkpoint tracks external source progress
- it does not track any subscription's read progress
For GitHub, source ingress checkpoint still includes things such as:
uxcJobIdafterSeqlastEventAtlastError
But this checkpoint now means:
- how far the GitHub adapter has read from GitHub/UXC
It no longer means:
- how far subscriptions have consumed the stream
Subscription Consumption Logic
- ensure backend consumer exists for the subscription
- call
backend.read(streamId, consumerId, limit) - for each returned
StreamEvent:- evaluate subscription match rules
- if matched, write one
InboxItem - if matched, create one
Activation
- after the batch is processed, call
backend.commit()with the last consumed offset
Important:
- commit happens after processing the batch
- commit does not depend on inbox ack
- inbox ack is runtime-side handling state, not stream consumption state
Multiple Subscriptions On One Source
This is the main reason to add the backend.
Behavior:
- one source writes one stream
- subscription A has consumer A and its own offset
- subscription B has consumer B and its own offset
- both can read the same
StreamEvent - both can materialize different
InboxItems - A can advance without affecting B
This is the Kafka-like property the current design is missing.
No Subscription Case
When a source exists but there are no subscriptions:
- source ingress should still be allowed to append to the stream
- no consumer offset is advanced because no consumer exists
- backlog remains available for future subscriptions using
earliestorat_time
This is better than the current temporary rule that skips source progress when there are zero subscriptions.
With the backend refactor, that workaround should be removed.
Inbox Ack Logic
Inbox ack remains separate:
- subscription commit says: this subscription has processed the stream batch
- inbox ack says: the runtime has handled this inbox item
This separation is deliberate.
It avoids coupling stream progress to runtime behavior.
Delivery Semantics
DeliveryHandle should continue to be captured on the StreamEvent and copied
into InboxItem.
Reasons:
- replayed inbox items still need the same outbound target context
- runtime should not reconstruct provider-specific reply targets
Backend Swappability
The abstraction should preserve these invariants across backends:
- one source maps to one stream
- one subscription maps to one consumer
- offsets belong to consumers
- stream events are append-only
- commit is explicit
How this maps later:
- SQLite backend
stream_events.offsetis the source of truth
- JetStream backend
- stream sequence maps to offset
- Kafka backend
- topic + partition offset maps to offset
AgentInbox should keep the same higher-level API even if the backend changes.
Migration Plan
Phase 1
- add backend interface
- add SQLite backend implementation
- keep current source adapters
Phase 2
- add
stream_eventsandconsumers - keep existing
inbox_items - change source adapters to append into backend first
Phase 3
- rename
InteresttoSubscription - add subscription start policy
- materialize inbox items from backend consumer reads
Phase 4
- remove direct source-to-inbox assumptions from adapters
- make future JetStream/Kafka backend implementations possible
Recommended First Refactor Slice
The smallest valuable refactor is:
- add
stream_events - add
consumers - move source append into
stream_events - introduce one subscription consumer loop for GitHub
- keep existing inbox API unchanged
This gets the data model right without forcing a full external-broker decision.