AgentInbox Event Bus Backend Design

This document defines the next-step event bus architecture for AgentInbox.

The goal is not to turn AgentInbox into a general-purpose broker. The goal is to separate three concerns that are currently too tightly coupled:

Why This Refactor Exists

The current implementation keeps one source-level checkpoint for each hosted source.

That is enough to prove shared source hosting, but it is not enough to model independent consumers on top of the same source.

Today:

The new design changes that:

This is closer to Kafka, JetStream, and other mature event log systems.

Goals

Non-Goals

Vocabulary

Use two layers of terms on purpose.

Product Vocabulary

Backend Vocabulary

Mapping

Target Architecture

The refactor should separate the runtime into three pipelines.

1. Source Ingress

Responsibilities:

Source ingress does not fan out directly to inboxes.

2. Subscription Consumption

Responsibilities:

Each subscription consumes independently.

3. Inbox Delivery

Responsibilities:

Current To Target Data Model

Current model:

Target model:

Notes:

Backend Interface

The backend interface should be small and opinionated.

It should describe event-log behavior, not provider-specific behavior.

export interface EventBusBackend {
  ensureStream(input: EnsureStreamInput): Promise<StreamRecord>;
  append(input: AppendEventsInput): Promise<AppendEventsResult>;

  ensureConsumer(input: EnsureConsumerInput): Promise<ConsumerRecord>;
  getConsumer(input: GetConsumerInput): Promise<ConsumerRecord | null>;
  read(input: ReadEventsInput): Promise<ReadEventsResult>;
  commit(input: CommitOffsetInput): Promise<CommitOffsetResult>;
  reset(input: ResetConsumerInput): Promise<ConsumerRecord>;

  getStreamStats(streamId: string): Promise<StreamStats>;
  getConsumerLag(input: GetConsumerLagInput): Promise<ConsumerLag>;
}

Suggested supporting types:

export interface StreamRecord {
  streamId: string;
  sourceId: string;
  streamKey: string;
  backend: string;
  createdAt: string;
}

export interface StreamEventRecord {
  offset: number;
  streamEventId: string;
  streamId: string;
  sourceId: string;
  sourceNativeId: string;
  eventVariant: string;
  occurredAt: string;
  metadata: Record<string, unknown>;
  rawPayload: Record<string, unknown>;
  deliveryHandle: Record<string, unknown> | null;
}

export interface ConsumerRecord {
  consumerId: string;
  streamId: string;
  subscriptionId: string;
  consumerKey: string;
  nextOffset: number;
  startPolicy: "latest" | "earliest" | "at_offset" | "at_time";
  createdAt: string;
  updatedAt: string;
}

export interface ReadEventsResult {
  consumer: ConsumerRecord;
  events: StreamEventRecord[];
  highWatermarkOffset: number;
}

Backend Rules

SQLite Backend

The first implementation should be embedded and local.

SQLite is enough because the current problem is durable local event retention, not high-throughput distributed messaging.

Tables

streams

One record per hosted source stream.

create table if not exists streams (
  stream_id text primary key,
  source_id text not null unique,
  stream_key text not null unique,
  backend text not null,
  created_at text not null
);

stream_events

Append-only normalized source journal.

create table if not exists stream_events (
  offset integer primary key autoincrement,
  stream_event_id text not null unique,
  stream_id text not null,
  source_id text not null,
  source_native_id text not null,
  event_variant text not null,
  occurred_at text not null,
  metadata_json text not null,
  raw_payload_json text not null,
  delivery_handle_json text,
  created_at text not null,
  unique(stream_id, source_native_id, event_variant)
);

create index if not exists idx_stream_events_stream_offset
  on stream_events(stream_id, offset);

create index if not exists idx_stream_events_stream_occurred_at
  on stream_events(stream_id, occurred_at);

Notes:

consumers

One record per subscription consumer.

create table if not exists consumers (
  consumer_id text primary key,
  stream_id text not null,
  subscription_id text not null unique,
  consumer_key text not null unique,
  start_policy text not null,
  start_offset integer,
  start_time text,
  next_offset integer not null,
  created_at text not null,
  updated_at text not null
);

create index if not exists idx_consumers_stream
  on consumers(stream_id);

Notes:

consumer_commits

Optional audit table for debugging and observability.

create table if not exists consumer_commits (
  commit_id text primary key,
  consumer_id text not null,
  stream_id text not null,
  committed_offset integer not null,
  committed_at text not null
);

create index if not exists idx_consumer_commits_consumer
  on consumer_commits(consumer_id, committed_offset);

This table is not required for correctness. It is useful for debugging, replay audits, and operator visibility.

Existing Tables That Stay

The following current tables still make sense:

The key change is:

Subscription Start Policy

New subscriptions need an explicit start policy.

Recommended options:

Recommended default:

This avoids accidental inbox floods while keeping replay explicit.

Event Consumption Logic

Source Ingress Logic

  1. source adapter receives or polls source-native events
  2. source adapter normalizes them into StreamEvent payloads
  3. source adapter calls backend.append()
  4. source adapter updates source ingress checkpoint

Important:

For GitHub, source ingress checkpoint still includes things such as:

But this checkpoint now means:

It no longer means:

Subscription Consumption Logic

  1. ensure backend consumer exists for the subscription
  2. call backend.read(streamId, consumerId, limit)
  3. for each returned StreamEvent:
    • evaluate subscription match rules
    • if matched, write one InboxItem
    • if matched, create one Activation
  4. after the batch is processed, call backend.commit() with the last consumed offset

Important:

Multiple Subscriptions On One Source

This is the main reason to add the backend.

Behavior:

This is the Kafka-like property the current design is missing.

No Subscription Case

When a source exists but there are no subscriptions:

This is better than the current temporary rule that skips source progress when there are zero subscriptions.

With the backend refactor, that workaround should be removed.

Inbox Ack Logic

Inbox ack remains separate:

This separation is deliberate.

It avoids coupling stream progress to runtime behavior.

Delivery Semantics

DeliveryHandle should continue to be captured on the StreamEvent and copied into InboxItem.

Reasons:

Backend Swappability

The abstraction should preserve these invariants across backends:

How this maps later:

AgentInbox should keep the same higher-level API even if the backend changes.

Migration Plan

Phase 1

Phase 2

Phase 3

Phase 4

Recommended First Refactor Slice

The smallest valuable refactor is:

  1. add stream_events
  2. add consumers
  3. move source append into stream_events
  4. introduce one subscription consumer loop for GitHub
  5. keep existing inbox API unchanged

This gets the data model right without forcing a full external-broker decision.