← posts

a micro event bus for embedded go daemons

when you’re writing a control daemon for embedded hardware, coupling sneaks up on you fast.

it starts innocently: the NFC reader detects a tag, so it calls into the session manager. the session manager changes state, so it calls the LED manager to update the status light. the LED manager needs to know about errors, so it imports the error handler. before long you have a dependency graph that looks like a bowl of spaghetti, and adding a new component means touching half the codebase.

i ran into this while building thalassys-mini — a Go daemon running on a Raspberry Pi 4 that controls a marine water sampling device. the hardware has an NFC reader for operator authentication, a set of status LEDs, a sampling pump, and a watchdog. all of these need to react to each other’s state changes, but none of them should know about each other directly.

the solution was a small event bus. not a library, not a framework — about 40 lines of Go.

the problem with direct calls

consider the naive approach. the NFC reader finishes validating a tag and calls sessionManager.Start(tagID). the session manager calls ledManager.SetState(StateActive). the pump controller listens to the session manager via an interface it imports. each component has a concrete dependency on at least two others.

this works fine until you need to add a new component — say, a telemetry reporter that logs every state transition. now you’re back in every existing component adding a call to telemetry.Record(...). you’re also making testing harder: to test the NFC reader in isolation you need to mock the session manager, which means an interface, which means boilerplate.

the core issue is that publishers shouldn’t need to know who’s listening.

the bus

go
package bus

import "context"

type Event struct {
    Topic   string
    Payload any
}

type Handler func(Event)

type Bus struct {
    ch   chan Event
    subs map[string][]Handler
}

func New() *Bus {
    return &Bus{
        ch:   make(chan Event, 64),
        subs: make(map[string][]Handler),
    }
}

func (b *Bus) Subscribe(topic string, h Handler) {
    b.subs[topic] = append(b.subs[topic], h)
}

func (b *Bus) Publish(topic string, payload any) {
    b.ch <- Event{Topic: topic, Payload: payload}
}

func (b *Bus) Run(ctx context.Context) {
    for {
        select {
        case e := <-b.ch:
            for _, h := range b.subs[e.Topic] {
                h(e)
            }
        case <-ctx.Done():
            return
        }
    }
}

that’s the whole thing. a few decisions worth explaining.

buffered channel

the channel has capacity 64. this means Publish never blocks — the caller sends the event and moves on immediately. on embedded hardware this matters: the NFC interrupt handler shouldn’t stall waiting for the LED manager to finish its I2C write.

if the buffer fills up, Publish will block. on a device that’s generating 64 unprocessed events, something has already gone wrong upstream. i treat a full buffer as a bug to fix rather than something to handle gracefully.

sequential dispatch

handlers are called one after another in the Run loop, not in parallel. this is intentional. parallel dispatch would require every handler to be concurrency-safe, and on a single-core embedded device the overhead isn’t worth it. if a handler does something genuinely slow — a blocking I2C write, say — it spawns its own goroutine internally.

context-based lifecycle

Run takes a context.Context and exits cleanly when it’s cancelled. this integrates naturally with the rest of the daemon, which uses a top-level context derived from os.Signal for graceful shutdown:

go
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

b := bus.New()
go b.Run(ctx)

wiring it up

in main, everything subscribes before anything publishes:

go
b := bus.New()

// LED manager reacts to session and error events
b.Subscribe("session.start", leds.OnSessionStart)
b.Subscribe("session.end",   leds.OnSessionEnd)
b.Subscribe("error",         leds.OnError)

// watchdog reacts to errors
b.Subscribe("error", watchdog.OnError)

// telemetry subscribes to everything it cares about
b.Subscribe("session.start", telemetry.OnSessionStart)
b.Subscribe("session.end",   telemetry.OnSessionEnd)
b.Subscribe("sample.done",   telemetry.OnSampleDone)

go b.Run(ctx)

the NFC reader holds a reference to the bus and publishes when it validates a tag. it has no idea that the LED manager, watchdog, and telemetry reporter are all listening:

go
type NFCReader struct {
    bus *bus.Bus
    // ... hardware handles
}

func (n *NFCReader) onTagRead(tagID string) {
    if err := n.validate(tagID); err != nil {
        n.bus.Publish("error", err)
        return
    }
    n.bus.Publish("session.start", tagID)
}

adding the telemetry reporter required zero changes to the NFC reader, the session manager, or the LED manager. subscribe and done.

typed payloads

using any for the payload means handlers need a type assertion. i keep a central events package that defines concrete payload types and topic name constants:

go
package events

const (
    TopicSessionStart = "session.start"
    TopicSessionEnd   = "session.end"
    TopicSampleDone   = "sample.done"
    TopicError        = "error"
)

type SessionStartPayload struct {
    TagID     string
    Timestamp time.Time
}

type SampleDonePayload struct {
    SampleID string
    Volume   float64
    Duration time.Duration
}

handlers then assert the type and handle mismatches explicitly:

go
func (l *LEDManager) OnSessionStart(e bus.Event) {
    p, ok := e.Payload.(events.SessionStartPayload)
    if !ok {
        return
    }
    l.setState(StateActive, p.TagID)
}

it’s a bit more verbose than a generic typed bus would be, but it keeps the bus itself dependency-free. i considered using generics for the bus but the dispatch loop becomes awkward when you need a single channel that carries multiple payload types — chan Event[any] doesn’t buy you much over chan Event.

testing

the bus makes testing straightforward. to test that the LED manager responds correctly to a session start event, you don’t need a real NFC reader — you just publish directly:

go
func TestLEDOnSessionStart(t *testing.T) {
    b := bus.New()
    leds := NewLEDManager(b, fakei2c.New())

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    go b.Run(ctx)

    b.Publish(events.TopicSessionStart, events.SessionStartPayload{
        TagID:     "test-tag",
        Timestamp: time.Now(),
    })

    time.Sleep(10 * time.Millisecond) // let the bus drain
    assert.Equal(t, StateActive, leds.State())
}

the time.Sleep is inelegant. a cleaner approach is to add a Drain method that blocks until the channel is empty, or to make handlers signal completion via a channel in tests. for now the sleep is fine — the bus drains in microseconds on any real hardware.

limitations

this design has two meaningful constraints worth knowing upfront.

no delivery guarantees. if the daemon crashes between Publish and dispatch, the event is lost. for thalassys-mini this is acceptable — the hardware can recover state from its own sensors on restart. if you need guaranteed delivery, you need a persistent queue, which is a different problem entirely.

single process only. the bus is in-memory. if you ever need to fan events out to a second process or a remote service, you’ll need something else — MQTT, NATS, whatever fits. on an embedded device with one daemon this constraint is fine; it’s just worth being explicit about.


the full source for thalassys-mini isn’t public yet, but the bus package itself is self-contained. if you’re building something similar — a Pi-based controller, a hardware daemon, anything with multiple components that need loose coupling — 40 lines is all you need to start.