A story for making sketches in progress to larger work. Hobbyist control planes, avoiding premature Kafka, PostegreSQL’s LISTEN/NOTIFY, and go’s sync.Cond.

Last week a fellow Lobste.r submitted a piece on PostreSQL’s LISTEN/NOTIFY. So, here is a sketch for LISTEN/NOTIFY that I happened to have written a few weeks earlier, while working on a larger project. It allows multiple goroutines to consume NOTIFY events from a single database connection, because that felt like a decent thing to do.

The code, first:

github.com/hblanks/sketches/2020-01-22-pgnotifier

An explanation of how and why I did it, plus the inevitable metacommentary, follows.

The why and the how

The author is building a control plane. Not a “10K customers operating on 1K servers” sort of control plane. More like 100 hobbyists operating on 2 to 10.1

And what really is a control plane? For our purpose, it’s simply:

  1. An API that customers use when they want to change things,
  2. A means of persisting these changes (new VMs, new routing rules, new DNS records, whatever the control plane is about), and
  3. A means of pushing these changes to the right servers (or the “agents” running on those servers), where the changes can be applied.

Large-scale control planes involve many systems, including load balancers, distributed databases, and distributed message queues or pub/sub services. But a small-scale one would ideally involve as few systems as possible. So, here’s a small-scale design:

Figure 1: a garden variety control plane

Customers talk to the public API, that API persists changes to a database, and somehow agents (talking to their own agent API) get these changes and apply them.

In this system, the database is the queue. Not because that’s a great idea. But because it’s expedient, and it’s reasonable for this particular kind of flow, where it’s OK to only process items in order, and to stop entirely if we fail to process any one item.

How might you do this? A typical way is for the customer API to write changes both to the domain-specific tables (insert a new row into the DNS record table, etc.) and to an event log table in the same transaction. Then at any point your agents can connect to their own API and get all events that have been written since their last access.


CREATE TABLE log (
    id SERIAL PRIMARY KEY,
    date_created TIMESTAMP WITH TIME ZONE
        NOT NULL DEFAULT CURRENT_TIMESTAMP,
    event_data JSONB
); -- for a larger system, also consider PARTITION BY RANGE (id)

INSERT INTO log (event_data) VALUES
  ('{"type": "door", "id": 3, "action": "open"}');

postgres=# SELECT * FROM log;
-[ RECORD 1 ]+--------------------------------------------
id           | 1
date_created | 2020-02-11 06:11:04.002166+00
event_data   | {"id": 3, "type": "sprinkler-valve", "action": "open"}
Figure 2: garden variety log table

And indeed, if your control plane tolerates latency, this might be a great place to stop: just have your agents wake up every few minutes, ask for all events later than the ID they saw last, and then apply those events.

But, old habits die hard. The author feels like even a hobbyist control plane should apply changes within a few hundred milliseconds. That the world would call him a real ham and egger if he built anything with more than 5 seconds’ latency, and that even if that meant running a Kafka cluster again (and being woken up by it regularly in the middle of the night…it was a chronically underprovisioned one, that), well, that would just be the price he’d have to pay.

So, in order to still avoid Kafka, and no doubt also to satisfy that magpie desire of finally using yet another PostgreSQL feature, we come to LISTEN/NOTIFY. The author imagines the system2:

  1. Customers submit changes to the public API.
  2. In a single transaction, the public API updates all domain-specific tables, inserts one or more new events into the event log table, and executes a NOTIFY 'event' '${EVENT_ID}' command with the id of the newly inserted event.
  3. Meanwhile, agents have connected to a streaming event resource in the agent API. The handlers for this resource first catch the agent up on all events after the latest one they’d read, and then they wait.
  4. The agent API, which has a single database connection dedicated for LISTEN event, receives the NOTIFY event of the transaction above, and it announces the new ID to all the streaming request handlers (goroutines, really) above.
  5. Without stampeding the database, all the streaming event request handlers get the relevant new events and send them to their respective agents.

The sketch

It can take a few minutes to imagine, but it can take hours to build – and you may even build the wrong thing!

Thus the author decides, even though he has other parts of the system already in place, to start with a sketch. If it works, the code will greatly inform what comes next. And if it doesn’t, it will at least have taken less time than writing the whole system.

The sketch will have only four things:

  1. An empty database.
  2. A subroutine for sending NOTIFY events.
  3. A subroutine for receiving these events via LISTEN.
  4. Multiple subroutines that make use of these received events.

And as one would hope with a sketch, all but the last of these three things have easy, ready examples on hand. It won’t take more than 20 minutes to do items (1) to (3), after which the real learning and work can start in (4).

The core of it – what’s in the sketch, and what’s in only the white box below, turns out to be:

Figure 3: Early Agent API, and the sketch therein
  1. A ring buffer for storing recent events, including a sync.Cond:

    type RingBuffer struct {
        rw     *sync.RWMutex
        cond   *sync.Cond
        buffer []*Item
        size   int64 // Length of buffer
        start  int64 // Start offset
        end    int64 // End offset
    }
    
  2. A goroutine, WriteItems(), for receiving *pq.Notifications and writing them to the ring buffer:

    v := <-l.Notify:
            i, err := strconv.ParseInt(v.Extra, 10, 64)
            if err != nil {
                return nil, err
            }
    i, err := strconv.ParseInt(v.Extra, 10, 64)
    if err != nil {
        return nil, err
    }
    // Updates ringbuffer, unlocks, then
    // calls rb.cond.Broadcast()
    rb.Write(&ringbuffer.Item{i})
    
  3. A function, GetUpdates(), that waits for new items written to the ring buffer, then returns them:

    // Reads from ringbuffer,
    // usually after calling 
    // rb.cond.Wait()
    start, items, err = rb.Read(ctx, start)
    

The sketch has bugs. It fails to illustrate important cases found later. And it’s no replacement for tests. But it teaches something about what needs to be built next. And that something, at least sometimes, is useful.

Ends in metacommentary

This post, hacked out over a few evenings at the Market Street house in Oakland, had two points:

  1. to offer a few potentially useful examples (how to use LISTEN/NOTIFY from go, and how to dispatch information to a set of coming-and-going goroutines without creating and managing a lot of channels – for both of these, please see the code), and
  2. to remark or reiterate that often, the best way to try something out is not within your codebase, but without it.

About the latter: it’s common to see a painter draw or paint multiple studies before starting on a larger work, just as it’s common enough for engineers to sketch out ideas “on the back of a napkin” before testing them in a more rigorous fashion.

You as a programmer may find yourself doing the same, sometimes in a single file, sometimes in more than a few. These sketches may take more time than you want to spend, but you get faster at them the more you do. And there’s rarely another way to learn what you need to learn.

So, don’t be afraid to make a sketch, even though it usually means throwing it away. “You will, anyhow,” as the old man says.3


  1. Why is the author building this? Well, that’s just this story’s MacGuffin, isn’t it?
  2. Astute readers will note that, if this system really is just for 2 to 10 agents, it would work just as well for every request handler in the agent API to both (1) run its own LISTEN (using its own database connection) and (2) to SELECT new rows from the log table (using another, and probably pooled, database connection). Because it’s all going to come out of the page cache anyway, and what are a few extra connections and queries at such a small scale? So, even the sketch is overengineered.

  3. “Chemical engineers learned long ago that a process that works in the laboratory cannot be implemented in a factory in only one step. An intermediate step called the pilot plant is necessary to give experience in scaling quantities up and in operating in nonprotective environments. For example, a laboratory process for desalting water will be tested in a pilot plant of 10,000 gallon/day capacity before being used for a 2,000,000 gallon/day community water system.

    “Programming system builders have also been exposed to this lesson, but it seems to have not yet been learned… . In most projects, the first system built is barely usable. It may be too slow, too big, awkward to use, or all three. There is no alternative but to start again, smarting but smarter, and build a redesigned version in which these problems are solved… .

    “Hence plan to throw one away; you will, anyhow.

    (Brooks, Frederick P. The Mythical Man Month: Essays on Software Engineering Anniversary Edition. Boston: Addison-Wesley, 1995. P. 116.)