Skip to main content

Command Queues, Logging, and Replayable Systems

What This Concept Is

The same Command object that lets you undo an action also lets you:

  • queue it, to run later or asynchronously (job/task queues)
  • log it, for audit or debugging
  • serialize it, to persist or send across a network
  • replay it, from a log, to reproduce state (event-sourced systems)

The common thread: a Command is a first-class record of intent. Anything you can do with records, you can now do with actions.

Why It Matters Here

This is the bridge from "neat pattern in a UI" to "production systems built on it":

  • background job queues (Sidekiq, Celery, RabbitMQ consumers) are command queues
  • CQRS and event sourcing treat commands and events as the durable representation of the system
  • audit logs in banking and healthcare often are serialized commands with results

You do not need to adopt CQRS to benefit. Even an in-process queue with four workers is the same pattern, smaller.

Concrete Example

A tiny job queue in Python with logging and replay.

from dataclasses import dataclass, field
from queue import Queue
from threading import Thread
import json, time

@dataclass
class Command:
name: str
payload: dict
def execute(self, world: dict) -> None:
handlers[self.name](world, self.payload)

# domain handlers; commands stay thin
handlers = {
"credit": lambda w, p: w.__setitem__(p["acct"], w.get(p["acct"], 0) + p["amt"]),
"debit": lambda w, p: w.__setitem__(p["acct"], w.get(p["acct"], 0) - p["amt"]),
}

@dataclass
class CommandBus:
q: Queue = field(default_factory=Queue)
log: list = field(default_factory=list)
world: dict = field(default_factory=dict)

def submit(self, cmd: Command):
self.q.put(cmd)

def worker(self):
while True:
cmd = self.q.get()
if cmd is None: return
self.log.append((time.time(), cmd.name, cmd.payload))
try:
cmd.execute(self.world)
except Exception as e:
print(f"failed {cmd.name}: {e}")

bus = CommandBus()
t = Thread(target=bus.worker, daemon=True); t.start()
bus.submit(Command("credit", {"acct": "A", "amt": 100}))
bus.submit(Command("credit", {"acct": "B", "amt": 30}))
bus.submit(Command("debit", {"acct": "A", "amt": 40}))
bus.q.put(None); t.join()
print(bus.world) # {'A': 60, 'B': 30}

# Replay: rebuild state from the log alone.
replayed = {}
for _, name, payload in bus.log:
Command(name, payload).execute(replayed)
print(replayed) # same dict

The log is the history of intent. Replay reconstructs state. That is the seed of event sourcing.

Common Confusion / Misconception

  • "Logging every command is cheap." Disk, storage cost, privacy. Do not log payloads that contain secrets without filtering; treat the command log like any other PII-bearing log.
  • Commands that embed non-serializable references. A Command that holds a live service object cannot be persisted. For queued commands, payloads should be pure data; the worker resolves services at execution time.
  • Treating queue delivery as in-memory. A queue that survives a restart must persist pending commands, and workers must be idempotent (the same command may run twice).
  • Replaying mutable external calls. If the command calls a payment gateway, replay charges again. Split "decide + record" (safe to replay) from "apply external effect" (must not replay).

How To Use It

  1. Decide which commands are queued or logged. Usually, commands that change state of interest.
  2. Make command payloads pure data (plain dicts or value classes). Services are looked up by the worker.
  3. Choose transport: in-process queue, Redis list, RabbitMQ, Kafka, etc. Start small.
  4. Log on accept, log on complete, log on fail. Separate files or tables for each.
  5. Design for at-least-once delivery. Require handlers to be idempotent (use a command ID).
  6. If replaying for debugging or recovery, clearly separate "dry run" (state rebuild) from "live" (external effects) modes.

Check Yourself

  1. Why must queued commands generally carry plain data, not object references?
  2. What does "idempotent handler" mean and why is it required for queues with at-least-once delivery?
  3. What two modes of replay exist, and why must you distinguish them?
  4. What is the relationship between Command and an event log?

Mini Drill or Application

Extend the order-processing system from earlier exercises:

  1. Convert placeOrder, cancelOrder, and fulfillOrder to Command classes.
  2. Route them through an in-process queue with a single worker.
  3. Append each accepted command to an append-only file as JSON.
  4. Write a replay() utility that rebuilds the order book from the log.
  5. Prove that a crash between "accept" and "execute" leaves no state corruption.

Stop when a restart plus replay reproduces the state exactly.

Read This Only If Stuck