Command Queues, Logging, and Replayable Systems
What This Concept Is
The same Command object that lets you undo an action also lets you:
- queue it, to run later or asynchronously (job/task queues)
- log it, for audit or debugging
- serialize it, to persist or send across a network
- replay it, from a log, to reproduce state (event-sourced systems)
The common thread: a Command is a first-class record of intent. Anything you can do with records, you can now do with actions.
Why It Matters Here
This is the bridge from "neat pattern in a UI" to "production systems built on it":
- background job queues (Sidekiq, Celery, RabbitMQ consumers) are command queues
- CQRS and event sourcing treat commands and events as the durable representation of the system
- audit logs in banking and healthcare often are serialized commands with results
You do not need to adopt CQRS to benefit. Even an in-process queue with four workers is the same pattern, smaller.
Concrete Example
A tiny job queue in Python with logging and replay.
from dataclasses import dataclass, field
from queue import Queue
from threading import Thread
import json, time
@dataclass
class Command:
name: str
payload: dict
def execute(self, world: dict) -> None:
handlers[self.name](world, self.payload)
# domain handlers; commands stay thin
handlers = {
"credit": lambda w, p: w.__setitem__(p["acct"], w.get(p["acct"], 0) + p["amt"]),
"debit": lambda w, p: w.__setitem__(p["acct"], w.get(p["acct"], 0) - p["amt"]),
}
@dataclass
class CommandBus:
q: Queue = field(default_factory=Queue)
log: list = field(default_factory=list)
world: dict = field(default_factory=dict)
def submit(self, cmd: Command):
self.q.put(cmd)
def worker(self):
while True:
cmd = self.q.get()
if cmd is None: return
self.log.append((time.time(), cmd.name, cmd.payload))
try:
cmd.execute(self.world)
except Exception as e:
print(f"failed {cmd.name}: {e}")
bus = CommandBus()
t = Thread(target=bus.worker, daemon=True); t.start()
bus.submit(Command("credit", {"acct": "A", "amt": 100}))
bus.submit(Command("credit", {"acct": "B", "amt": 30}))
bus.submit(Command("debit", {"acct": "A", "amt": 40}))
bus.q.put(None); t.join()
print(bus.world) # {'A': 60, 'B': 30}
# Replay: rebuild state from the log alone.
replayed = {}
for _, name, payload in bus.log:
Command(name, payload).execute(replayed)
print(replayed) # same dict
The log is the history of intent. Replay reconstructs state. That is the seed of event sourcing.
Common Confusion / Misconception
- "Logging every command is cheap." Disk, storage cost, privacy. Do not log payloads that contain secrets without filtering; treat the command log like any other PII-bearing log.
- Commands that embed non-serializable references. A
Commandthat holds a live service object cannot be persisted. For queued commands, payloads should be pure data; the worker resolves services at execution time. - Treating queue delivery as in-memory. A queue that survives a restart must persist pending commands, and workers must be idempotent (the same command may run twice).
- Replaying mutable external calls. If the command calls a payment gateway, replay charges again. Split "decide + record" (safe to replay) from "apply external effect" (must not replay).
How To Use It
- Decide which commands are queued or logged. Usually, commands that change state of interest.
- Make command payloads pure data (plain dicts or value classes). Services are looked up by the worker.
- Choose transport: in-process queue, Redis list, RabbitMQ, Kafka, etc. Start small.
- Log on accept, log on complete, log on fail. Separate files or tables for each.
- Design for at-least-once delivery. Require handlers to be idempotent (use a command ID).
- If replaying for debugging or recovery, clearly separate "dry run" (state rebuild) from "live" (external effects) modes.
Check Yourself
- Why must queued commands generally carry plain data, not object references?
- What does "idempotent handler" mean and why is it required for queues with at-least-once delivery?
- What two modes of replay exist, and why must you distinguish them?
- What is the relationship between Command and an event log?
Mini Drill or Application
Extend the order-processing system from earlier exercises:
- Convert
placeOrder,cancelOrder, andfulfillOrderto Command classes. - Route them through an in-process queue with a single worker.
- Append each accepted command to an append-only file as JSON.
- Write a
replay()utility that rebuilds the order book from the log. - Prove that a crash between "accept" and "execute" leaves no state corruption.
Stop when a restart plus replay reproduces the state exactly.