Repositories, Factories, and the Application Layer
What This Concept Is
Aggregates (concept 10) are the heart of a bounded context's domain model. Three collaborators surround them:
- Repository. A collection-like abstraction for one aggregate type. Loads aggregates by identity and saves them. Hides the persistence mechanism from the domain. Typical API:
load(id) -> Aggregate,save(aggregate), and only identity-based or domain-meaningful queries (find_pending_for_customer(customer_id), notfind_where_status_eq_and_country_in_and_...). - Factory. Creates a valid aggregate from primitive inputs or from external data. Lives either as a static method on the root (
Shipment.book(...)) or as a separate class when construction is complex. Enforces invariants at birth. - Application layer (application services / command handlers). Orchestrates one use case: load aggregates via repositories, call commands on them, collect emitted events, persist, publish. Contains no business logic -- all business logic lives in the domain model. The application layer is thin.
Zoomed-out picture of the layers in one bounded context:
┌──────────────────────────────────────────────────────────────────┐
│ Interface layer (HTTP, gRPC, CLI, message consumer) │
│ - parses requests, dispatches to application layer │
├──────────────────────────────────────────────────────────────────┤
│ Application layer (use case / command handler) │
│ - starts a transaction (unit of work) │
│ - loads aggregate via repository │
│ - calls aggregate command │
│ - saves aggregate │
│ - collects pending domain events, writes to outbox │
│ - commits; publisher relays │
├──────────────────────────────────────────────────────────────────┤
│ Domain layer (aggregates, VOs, domain events, domain services) │
│ - all invariants, all behavior │
│ - no framework / DB / HTTP concepts │
├───────────── ─────────────────────────────────────────────────────┤
│ Infrastructure layer (repositories impl, outbox, bus clients) │
│ - implements repository interfaces defined in the domain layer │
│ - SQL, ORM, Kafka client, S3, Stripe SDK, etc. │
└──────────────────────────────────────────────────────────────────┘
A domain service is a small seventh kind of object that shows up when a piece of domain logic naturally spans more than one aggregate or belongs to no single one (e.g., CarrierAssignmentPolicy that uses a Shipment and a set of CarrierContracts). It lives in the domain layer and is stateless.
Why It Matters Here
Without these collaborators, aggregate code gets contaminated: SQL in the aggregate, request parsing in the aggregate, business rules in the handler. The layers are how you keep the domain model clean.
A clean layered setup also pays off for testing:
- unit tests for aggregates: pure, no mocks needed
- unit tests for domain services: often pure
- tests for application layer: use a fake in-memory repository
- integration tests: exercise the real repository + outbox
Concrete Example
Case: Parcel Shipping -- booking a shipment
Domain layer: repository interface
# domain/ports.py (lives in the domain layer; pure interface)
from abc import ABC, abstractmethod
from typing import Optional
class ShipmentRepository(ABC):
@abstractmethod
def load(self, shipment_id: str) -> Optional[Shipment]: ...
@abstractmethod
def save(self, shipment: Shipment) -> None: ...
@abstractmethod
def next_awb(self) -> str: ... # ID generation stays out of the domain
The domain declares what it needs. Implementations live in infrastructure/.
Infrastructure layer: SQL implementation
# infrastructure/sql_shipment_repository.py
class SqlShipmentRepository(ShipmentRepository):
def __init__(self, session, outbox: Outbox):
self._session = session
self._outbox = outbox
def load(self, shipment_id):
row = self._session.query(ShipmentRow).get(shipment_id)
return _hydrate(row) if row else None
def save(self, shipment: Shipment):
row = _dehydrate(shipment)
self._session.merge(row)
for event in shipment.pull_events():
self._outbox.append(event) # same DB session
Note: the outbox writes in the same DB session as the aggregate save -- that is what makes the outbox transactional.
Domain layer: factory
# domain/shipment_factory.py
class ShipmentFactory:
def __init__(self, repo: ShipmentRepository, weight_limits: WeightLimitsPolicy):
self._repo = repo
self._weight_limits = weight_limits
def book(self, cmd: BookShipmentCommand) -> Shipment:
return Shipment.book(
customer_id=cmd.customer_id,
service_class=cmd.service_class,
origin=cmd.origin,
destination=cmd.destination,
parcels=[Parcel(**p) for p in cmd.parcels],
rate_snapshot_id=cmd.rate_snapshot_id,
weight_limit=self._weight_limits.for_class(cmd.service_class),
)
Factories become valuable when the construction has to pull in policy objects or external data before the aggregate's invariants can be checked.
Application layer: command handler (the thin glue)
# application/book_shipment.py
@dataclass
class BookShipmentCommand:
customer_id: str
service_class: ServiceClass
origin: Address
destination: Address
parcels: list[dict]
rate_snapshot_id: str
class BookShipmentHandler:
def __init__(self,
uow: UnitOfWork,
factory: ShipmentFactory,
repo: ShipmentRepository,
pricing: PricingGateway):
self._uow = uow
self._factory = factory
self._repo = repo
self._pricing = pricing
def handle(self, cmd: BookShipmentCommand) -> str:
# Optional orchestration before the aggregate is born:
if not self._pricing.is_rate_valid(cmd.rate_snapshot_id):
raise InvalidRate("Rate snapshot expired or unknown.")
with self._uow:
shipment = self._factory.book(cmd) # aggregate birth + invariants
self._repo.save(shipment) # persists + writes outbox events
return shipment.shipment_id
# commit here; outbox relay publishes integration events asynchronously
That's it. The handler has 5 lines of orchestration. All domain logic -- what "booking" means, which invariants hold -- lives in the aggregate and its factory.
Interface layer: HTTP adapter (thin)
# interface/http_shipments.py
@app.post("/shipments")
def create_shipment(request: Request):
body = request.json()
cmd = BookShipmentCommand(
customer_id=body["customer_id"],
service_class=ServiceClass(body["service_class"]),
origin=Address(**body["origin"]),
destination=Address(**body["destination"]),
parcels=body["parcels"],
rate_snapshot_id=body["rate_snapshot_id"],
)
shipment_id = book_shipment_handler.handle(cmd)
return {"shipment_id": shipment_id}, 201
If you add a CLI or a Kafka consumer variant, you add another thin adapter that builds the same command and calls the same handler. No business logic duplication.
Where a domain service appears
CarrierAssignmentPolicy is a domain service -- it reads a Shipment plus a CarrierContracts collection and returns the chosen CarrierId:
class CarrierAssignmentPolicy:
def choose_carrier(self, shipment: Shipment, contracts: list[CarrierContract]) -> CarrierId:
# pure domain rule -- no I/O
candidates = [c for c in contracts if c.supports(shipment.service_class, shipment.origin.iso_country)]
if not candidates:
raise NoEligibleCarrier()
return min(candidates, key=lambda c: c.cost_score(shipment)).carrier_id
The application layer loads the contracts via their repository and passes them to the policy. The policy stays pure.
What repositories should NOT do
Avoid:
ShipmentRepository.mark_delivered(id)-- that is business logic, belongs on the aggregateShipmentRepository.find_all_for_dashboard(filters)-- that is a read-model query; use CQRS (concept 13) instead of stretching the repository- caching inside the repository that smuggles stale aggregates -- if caching is needed, make it explicit and bounded
- returning ORM entities directly -- return domain objects
The repository is a collection illusion for one aggregate type.
What the application layer should NOT do
Avoid:
- checking business rules (
if shipment.weight_total > limit: raise) - directly manipulating aggregate fields
- hiding policy decisions in handler code
- spanning multiple aggregates in one transaction (this violates concept 10's rule)
The handler is 5-15 lines. If it grows, the logic probably wants to live in the domain layer instead.
Common Confusion / Misconception
"Repository = DAO." The DAO pattern is CRUD-shaped, per table. A repository is collection-shaped, per aggregate type. They differ in what they abstract.
"Why not let aggregates call the repository themselves?" Because the aggregate would then depend on persistence. It also becomes harder to test and to compose multi-aggregate use cases.
"Factories are overkill -- I'll just use constructors." Fine for simple cases. A factory earns its keep when construction requires external policy data or when the aggregate has multiple valid creation paths.
"The application layer is where I put helpful utility logic." No. Utility logic is a separate module. The application layer only orchestrates use cases.
"Hexagonal architecture and clean architecture say the same thing." They say overlapping things. DDD works cleanly on top of either: domain in the middle, application around it, infrastructure on the outside. Pick names, stay consistent.
"I should make the repository return Optional-wrapped futures with validation errors and…" Start simple. Return the aggregate or None. Raise a ConcurrencyError on version mismatch. Don't reinvent Haskell in the repository.
How To Use It
For each use case inside a bounded context:
- Define the command (plain data) in the application layer.
- Write a handler with this shape:
- begin unit of work
- resolve dependencies (policies, external gateways)
- load aggregates via repositories
- call aggregate commands
- save aggregates (outbox follows)
- commit
- Put all business rules in aggregates or domain services.
- Repository interfaces in the domain layer; implementations in infrastructure.
- Factories when construction is non-trivial.
- Test aggregates directly. Test handlers with a fake repository.
- Keep every handler under ~15 lines. If it grows, extract a domain service.
Check Yourself
- Why does the repository interface live in the domain layer and the implementation in infrastructure?
- Name two things that should never appear in the application layer.
- When is a domain service the right tool, and when is it just a hiding place for logic that should be on an aggregate?
Mini Drill or Application
For the e-commerce checkout aggregate you designed in concept 10:
- Write the
CheckoutRepositoryinterface. - Write a
SubmitCheckoutHandlercommand handler. - Write a
FraudCheckPolicyas a domain service that the handler consults before callingcheckout.submit(). - Identify one piece of code that could easily slip into the wrong layer and state where it actually belongs.
- Sketch a minimal in-memory repository used for tests and show the handler test using it.