Skip to main content
  1. Posts/

Postgres Agent Orchestrator: Coordinating AI Agents with pgmq, LISTEN/NOTIFY, and ltree

Source: github.com/KellerKev/Postgres-Agent-Orchestrator


The Problem
#

Every AI agent framework ships with the same architectural diagram: a message broker for task queues, a vector database for memory, a pub/sub system for coordination, and a graph database for lineage. That’s four external systems before you’ve written a single line of agent logic.

Most teams already run PostgreSQL. What if the database they already have could do all four jobs?

The Idea
#

Postgres Agent Orchestrator is a ~200-line Python demo that coordinates two AI agents using only PostgreSQL native features:

  • pgmq replaces Redis/SQS for task queuing
  • JSONB columns replace vector databases for agent memory
  • LISTEN/NOTIFY replaces Kafka/pub-sub for event-driven coordination
  • ltree replaces graph databases for hierarchical lineage tracking

No external message brokers. No additional databases. One PostgreSQL instance serves as the message queue, memory store, event system, and audit trail.

Architecture
#

┌──────────────────────────────────────────────────────────┐
│                    PostgreSQL 18                         │
│                                                          │
│  ┌──────────┐   ┌──────────────┐   ┌────────────────┐   │
│  │   pgmq   │   │ agent_memory │   │ LISTEN/NOTIFY  │   │
│  │  (queue)  │   │   (JSONB +   │   │  (event-driven │   │
│  │          │   │    ltree)    │   │   wakeup)      │   │
│  └────┬─────┘   └──────┬───────┘   └───────┬────────┘   │
│       │                │                    │            │
└───────┼────────────────┼────────────────────┼────────────┘
        │                │                    │
   ┌────▼────┐      ┌────▼────┐          ┌───▼─────┐
   │ Fetcher │─────▶│  INSERT │─trigger─▶│Summarizer│
   │  Agent  │      │ (done)  │          │  Agent   │
   └─────────┘      └─────────┘          └──────────┘

Two agents run concurrently in a single Python process:

  1. Fetcher Agent — polls pgmq for tasks, fetches Hacker News headlines via API, asks the LLM to summarize them, writes results to agent_memory
  2. Summarizer Agent — listens for PostgreSQL notifications, wakes when the fetcher finishes, rewrites the technical summary as an executive briefing with parent-child lineage tracking

PostgreSQL as the Framework
#

Task Queuing with pgmq
#

The fetcher agent polls a tasks queue using pgmq, a lightweight message queue built as a PostgreSQL extension. Messages have visibility timeouts — if an agent crashes mid-processing, the message reappears automatically:

# Read one message with 30s visibility timeout
row = await conn.fetchrow("SELECT * FROM pgmq.read('tasks', 30, 1)")

# Process the task...

# Acknowledge on success
await conn.execute("SELECT pgmq.delete('tasks', $1::bigint)", msg_id)

No Redis. No SQS. No consumer groups to manage. The queue is a table with transactional guarantees.

Event-Driven Coordination with LISTEN/NOTIFY
#

The summarizer doesn’t poll. It sleeps until PostgreSQL wakes it up. A trigger on the agent_memory table fires pg_notify whenever an agent completes work:

CREATE OR REPLACE FUNCTION notify_agent_wakeup()
RETURNS trigger AS $$
BEGIN
  IF NEW.status = 'done' THEN
    PERFORM pg_notify('agent_wakeup', row_to_json(NEW)::text);
  END IF;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER agent_memory_notify
AFTER INSERT OR UPDATE ON agent_memory
FOR EACH ROW EXECUTE FUNCTION notify_agent_wakeup();

On the Python side, the summarizer registers a listener and processes events as they arrive:

async def run_summarizer() -> None:
    conn = await get_connection()
    await conn.add_listener("agent_wakeup", on_agent_wakeup)
    # Keep alive — all work happens in the callback
    while True:
        await asyncio.sleep(1)

No Kafka. No Redis pub/sub. No webhook endpoints. The database is the event bus.

Agent Memory with JSONB
#

Every agent interaction is stored in a single table with structured and unstructured data side by side:

CREATE TABLE agent_memory (
  id          SERIAL PRIMARY KEY,
  agent_id    TEXT NOT NULL,
  task_id     TEXT NOT NULL,
  parent_id   TEXT,
  lineage     ltree,
  input       JSONB,
  output      JSONB,
  status      TEXT DEFAULT 'pending',
  created_at  TIMESTAMPTZ DEFAULT now()
);

The input and output columns hold arbitrary JSON — different agents store different shapes. The fetcher stores headlines and summaries; the summarizer stores executive briefings. No schema migrations needed when you add a new agent type.

Lineage Tracking with ltree
#

The ltree extension gives PostgreSQL native support for hierarchical data. Each agent writes its position in the processing chain:

  • Fetcher writes lineage = 'fetcher'
  • Summarizer writes lineage = 'fetcher.summarizer' and records the fetcher’s row as parent_id

This enables powerful ancestry queries:

-- Find all work descended from the fetcher
SELECT * FROM agent_memory WHERE lineage <@ 'fetcher';

-- Find the full chain for a specific task
SELECT agent_id, lineage, output, created_at
FROM agent_memory
WHERE task_id = '1'
ORDER BY lineage;

For debugging and audit trails, you can reconstruct the complete processing history of any task with a single query.

The Agent Flow
#

Here’s what happens when the system processes a task:

  1. Three tasks are pre-seeded in the pgmq queue: artificial intelligence, PostgreSQL internals, data sovereignty in Europe
  2. The fetcher picks up a task, hits the Hacker News Algolia API for relevant headlines
  3. The fetcher calls Ollama (local LLM) to generate a 2-sentence summary from those headlines
  4. The fetcher writes the result to agent_memory with status = 'done'
  5. The INSERT trigger fires pg_notify('agent_wakeup', ...) with the full row as JSON
  6. The summarizer wakes up, reads the technical summary, and asks the LLM to rewrite it as a one-sentence executive briefing
  7. The summarizer writes its result with lineage = 'fetcher.summarizer' and a parent_id pointing back to the fetcher’s row

The entire coordination happens through the database. No HTTP calls between agents, no shared memory, no coordination service.

Why This Matters
#

The pattern demonstrated here isn’t just a toy. Consider what PostgreSQL gives you for free:

  • Transactional safety — if an agent crashes, uncommitted work rolls back. No orphaned messages, no duplicate processing.
  • Visibility timeouts — pgmq messages reappear after a configurable timeout if not acknowledged. Built-in retry with no application code.
  • Indexable lineage — GiST indexes on ltree columns make ancestor/descendant queries fast even at scale.
  • Schema flexibility — JSONB columns accept any structure. Add a new agent type without migrations.
  • Observability for freeagent_memory is a regular table. Query it with SQL. No need for a separate monitoring system.

For the vast majority of agent workloads — especially those with single-digit agents processing tasks at human-interaction speeds — your database is the framework. The additional infrastructure only becomes necessary when you need sub-millisecond latency or thousands of concurrent producers.

Getting Started
#

Prerequisites:

  • Pixi for environment management
  • Ollama with a model pulled (e.g., ollama pull qwen3:8b)
git clone https://github.com/KellerKev/Postgres-Agent-Orchestrator.git
cd Postgres-Agent-Orchestrator

# Initialize PostgreSQL, install pgmq, create schema, seed tasks
pixi run setup

# Run both agents
pixi run run

# Query the results
pixi run query

The query command shows the full agent memory table — you’ll see both fetcher summaries and summarizer briefings with their ltree lineage.

What This Isn’t
#

This is a demonstration, not a production agent framework. There’s no authentication, no multi-tenant isolation, no horizontal scaling. But the patterns it demonstrates — pgmq for queuing, LISTEN/NOTIFY for coordination, ltree for lineage, JSONB for flexible memory — are the same patterns you’d use in a production system. The difference is configuration and access control, not architecture.

If you’re reaching for Redis, Kafka, and a vector database before you’ve tried what PostgreSQL can do natively, you might be solving problems you don’t have yet.

Kevin Keller
Author
Kevin Keller
Personal blog about AI, Observability & Data Sovereignty. Snowflake-related articles explore the art of the possible and are not official Snowflake solutions or endorsed by Snowflake unless explicitly stated. Opinions are my own. Content is meant as educational inspiration, not production guidance.
Share this article

Related