Part 4: Coding the steel thread (all the way to production)
From architecture to production-ready in one sprint — proving the shape of the pipeline before we wire up the message broker.

In this fourth article on data pipelines we move from architecture to code. Our goal is a working steel thread — a thin slice of the pipeline that touches every major component end-to-end. By the time we’re finished, raw data will flow in from an external source, pass through an anti-corruption layer, land in our internal cache, get transformed and annotated in stages and emerge as something a consumer can read.
We won’t be building a complete application. I’ll assume you can stand up a project, wire up a database and run a web server. What we will focus on is technique — the small architectural moves that make the difference between a pipeline that grows gracefully and one that gets thrown away in six months.
The previous three articles set the stage. We talked about why data pipelines are interesting, the foundational architecture needed to support our requirements and the refinements that make it scalable, resilient and ready to evolve. If you haven’t read them, this article will still make sense — but you’ll get more out of it if you have.
If you’re new, welcome to Customer Obsessed Engineering! I publish about one article each week. Free subscribers can read about half of every article, plus all of my free articles.
Two (small) sprints, one steel thread
Our architecture has a lot of moving parts: an external data source, an anti-corruption layer (or “ACL,” a term borrowed from Domain-Driven Design that I’ll explain in a moment), an internal cache, a chain of analytical stages, an annotation pattern, an event backbone and a visualization layer. Standing all of that up at once is too much for one sprint. We’ll split the work into two sprints, each producing a runnable system.
In sprint one we deliver the framework — the shape of the pipeline — with every piece wired up except real event streaming. Stages call each other directly through a small in-process API that mimics the eventual event bus. Source data flows through the ACL, lands in the cache, gets annotated through cascading stages and is read out through a simple consumer.
In sprint two, which is the subject of the next article, we evolve that thin pipeline into a true event-driven system. The synchronous dispatcher gets swapped for a message broker. Stages become independent processes. We add the ability to rewind history.
Why split it? By focusing on proving the rest of the architecture first, we’ll tackle a lot of foundational questions. If we tried to do everything at once, the messaging work would dominate our attention and we’d never get a chance to validate the pipeline shape itself — ACL, cache, staged transformations, annotation, readout. Validating that shape first is how we make sure the pipeline actually solves our business problem. We’re holding back messaging — but messaging is a well-understood space. This way, we get rid of more unknowns up front, and later we’ll use the running pipeline to inform what the event contracts should look like.
This is the steel thread approach in miniature. We’re tackling architectural risk early, but we’re also being honest about the order in which we tackle it.
What sprint one actually delivers
Here’s the thing about a steel thread: it’s tempting to over-build. You see the future architecture in your head and you want to get there now. Resist that urge. The point of sprint one isn’t to anticipate every future need — it’s to prove the shape is right.
Concretely, by the end of sprint one we’ll have:
An anti-corruption layer that pulls source records from an external SQL database and translates them into our domain types.
An internal cache that holds a copy of the cleaned source data plus any annotations we generate.
A staged transformation pipeline — a sequence of analytical steps, each one reading the previous data and adding new annotations on top.
An annotation pattern that lets us add new derived data alongside the source without touching the source itself.
A pipeline-shaped API that looks like an event bus from the outside but is implemented as in-process function calls underneath.
A read API that surfaces the latest annotations on demand.
A minimal output — for example, a simple web page or even a JSON dump — that proves the round-trip works.
What we’re not doing yet:
No real message broker. No Kafka, no RabbitMQ, no Phoenix PubSub fan-out across nodes.
No asynchronous stages. Each stage runs in the same process, in order.
No rewind-and-reprocess. We’ll design so we can move in that direction, but we won’t have all the pieces in place yet.
No production infrastructure beyond what we need to run the steel thread end-to-end.
That’s a deliberate choice. Every item on the “not yet” list maps to a piece of work we’ll pick up later. By naming them now, we’re reminding ourselves where the future seams are — and we’re writing today’s code in a way that respects them.
Here’s what sprint one looks like as a flow:
Notice the purple component: that’s the in-process dispatcher — the seam where the eventual event bus will live. Stages publish through it today, and they’ll publish through it tomorrow. Only the implementation underneath changes.
The anti-corruption layer
I introduced the term “anti-corruption layer” in Part 2. It comes from Eric Evans’ Domain-Driven Design and it does exactly what the name suggests: it protects our domain from being corrupted by the shape, types and assumptions of an external system. The ACL is where messy outside data becomes clean inside data.
In our pipeline the ACL has four jobs:
Translate external types into domain types. If the source schema stores duration as an
Integerof seconds and our domain prefersTimeorDuration, the translation happens here.Validate what comes in. Required fields, value ranges, referential integrity — checks at this boundary catch problems where they’re cheapest to fix.
Decouple our internal cache schema from the source. If the upstream team adds a column, drops a column or renames a field, the change stops at the ACL.
Alert loudly about problems with the inbound data. If the data doesn’t fit our internal shape, we need to know about it.
ACLs are typically shown as small annotations on context boundaries, like this:
We see two internal domains and an external daily calendar domain. Activities and sprints belong to our application. To keep both domains “pure,” an anti-corruption layer translates the external world into familiar, internal language. For example, reading an Integer time duration and translating it into Time. The Activities and Sprints contexts both use the same language, so they don’t need an ACL between them.
I’m using Elixir and the Ash Framework for these examples. Don’t worry if you’re not an Elixir programmer — Ash is a declarative resource framework, and the snippets read more or less like an annotated description of the resource. If you’re working in another stack, the shape of the solution is the same; only the syntax changes.
Here’s a stripped-down resource that represents an Activity in our internal cache. It’s the clean, domain-flavored version of whatever the source database hands us:
defmodule WastePipeline.Cache.Activity do
use Ash.Resource,
domain: WastePipeline.Cache,
data_layer: AshPostgres.DataLayer
attributes do
uuid_primary_key :id
attribute :external_id, :string, allow_nil?: false
attribute :date, :date, allow_nil?: false
attribute :duration, :time, allow_nil?: false
attribute :type, :atom, constraints: [one_of: [:value_add, :non_value_add]]
end
relationships do
belongs_to :stereotype, WastePipeline.Cache.Stereotype, allow_nil?: false
belongs_to :user, WastePipeline.Cache.User, allow_nil?: false
end
endNow here’s the ACL itself — a small module whose only job is to take a source record (whatever the upstream SQL database hands us) and produce a domain record we can hand to the cache:
defmodule WastePipeline.Intake.ActivityAcl do
alias WastePipeline.Cache
# Translate one external row into a domain-shaped map. Anything weird about
# the upstream representation gets dealt with right here.
def to_domain(%{"duration_seconds" => secs, "value_type" => v} = row) do
%{
external_id: row["id"],
date: Date.from_iso8601!(row["activity_date"]),
duration: Duration.time_from_seconds(secs),
type: translate_value_type(v),
stereotype_id: row["stereotype_id"],
user_id: row["user_id"]
}
end
defp translate_value_type("VA"), do: :value_add
defp translate_value_type("NVA"), do: :non_value_add
defp translate_value_type(other), do: raise "Unknown value type: #{other}"
# Public entry point. Pull a batch from the source, translate every row,
# and hand the result to the cache for insertion.
def ingest_batch(rows) do
rows
|> Enum.map(&to_domain/1)
|> Enum.each(&Cache.create_activity!/1)
end
endTwo things worth noting. First, the translation logic is explicit — when the source uses "VA" and "NVA", our translator is the one and only place those strings get mapped into atoms. If the source ever changes (say, the strings become "value-add" and "non-value-add"), exactly one function changes. Second, we fail loudly on unknown inputs. An unrecognized value_type raises rather than silently dropping the row. That’s a discipline worth keeping: the ACL is the place where surprising data gets noticed.
Right now, if something goes wrong the Elixir process crashes loudly. In early development, that’s what we want — to see those exceptions and treat them as the blockers they are. In production, we’ll have a supervisor that shunts the failed message into an error-handling queue and then carries on. We can diagnose, fix and reprocess the message asynchronously.
The ACL is where we earn our data integrity. Once the data is past this layer, every downstream stage gets to trust it. That trust is what lets us write the rest of the pipeline as a series of clean, focused transformations.
This newsletter grows by word of mouth… I’d be grateful if you could refer a friend. Your referrals make it worthwhile.
DataFrames as the carrier
With clean data in the cache, the next question is: in what shape do we move it through the pipeline? My answer is a DataFrame.
A DataFrame is a two-dimensional table, like a spreadsheet — rows of records, named columns of values. The big advantages are speed and expressiveness. Operations like filtering, grouping, summarizing, joining and adding columns are first-class, and they’re implemented in highly optimized native code.
In Elixir I use Explorer, which is built on the Rust Polars library (if you’re working in Python, you can use pandas or Polars directly). The mental model is the same regardless: a DataFrame is the carrier that moves through your pipeline, and each stage adds to it without altering what came before.
Here’s an example of a DataFrame stage. It takes in activity data, filters out paid time off, groups by user and adds a column for total non-value-add seconds per user:
defmodule WastePipeline.Stages.NvaSummary do
require Explorer.DataFrame, as: DF
def run(df) do
df
|> DF.filter(stereotype_code != "pto")
|> DF.group_by(["user_id"])
|> DF.summarise(
nva_seconds_sum: sum(nva_seconds),
va_seconds_sum: sum(va_seconds),
total_sum: sum(total_seconds)
)
end
endRead that pipe-style code top to bottom. We start with df (the DataFrame). We filter out paid time off. We group by user. We summarize into three new columns. The original DataFrame isn’t mutated — DF.filter, DF.group_by and DF.summarise all return new DataFrames. The transformations are pure, the data is immutable and the next stage gets a clean input.
This is what I mean by “DataFrames as the carrier.” Each stage takes a DataFrame in, produces a DataFrame out. Annotations are new data, not modifications. If we get the analysis wrong, we throw out the annotations and rerun the stage; the source data never changes.
A pipeline-shaped API that hides the messaging
Now we earn our keep over the long run.
We want our architecture to eventually be event-driven. Stages publish events when they finish; other stages subscribe and react. But in sprint one we don’t have a message broker yet, and we don’t want to build one. So how do we write today’s code so it doesn’t need to be rewritten tomorrow?



