Have you ever been there? You build a slick data processing job for a big batch of historical data. It works perfectly. Then, your boss comes along and says, "This is great! Now, can we get these results in real-time?"
Suddenly, you're looking at a complete rewrite. You have to swap out your batch-focused tools for a streaming framework like Kafka or Flink, and all your carefully crafted logic needs to be re-implemented. It’s like building a car, then being asked to turn it into a speedboat. They both get you from A to B, but the engineering is completely different.
This is one of the biggest headaches in data engineering. But what if it didn't have to be this way?
That's the promise of Apache Beam. It offers a unified model, a single way of thinking about data, whether it's a massive file sitting in storage (batch) or a firehose of events flying in (streaming).
Today, we’re going to roll up our sleeves and build a simple pipeline that proves this point. We'll write one piece of core logic and then, with a simple flick of a switch, run it on both a fixed set of data and a simulated real-time stream. No complex setup, no external systems—just you, me, and Beam's DirectRunner to see the magic happen right on our machines.
So, What Are We Actually Building?
Let's imagine we're tracking user transactions. We want to count how many transactions each user makes and sum up the amounts within one-minute windows.
The tricky part? Data in the real world is messy. Sometimes events arrive late or out of order. Our pipeline needs to be smart enough to handle that gracefully.
To do this, we'll use a few key Beam concepts:
- Event-Time Windowing: We'll group data based on when the event actually happened, not when our pipeline saw it. Think of it like sorting mail by the postmark date, not the date it landed on your doormat.
- Triggers & Allowed Lateness: This tells our pipeline when to give us an answer and gives it permission to update that answer if a late piece of data strolls in.
And to make this easy to test locally, we’ll use Beam’s built-in TestStream, a fantastic tool for pretending we have a live stream of data without all the setup.
First, Let's Get Our Ingredients Ready
Before we can cook, we need to prep. We'll start with some basic setup and create the data we’ll be playing with.
First, the boring (but necessary) imports and configuration.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone
# Let's set some ground rules for our pipeline
MODE = "stream" # We can flip this to "batch" later
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120
We’re telling our pipeline we want to group events into 60-second windows and that we’re willing to wait an extra 120 seconds for any late-arriving data for a given window.
Now, for the data itself. We'll create a simple function to make events and then define a list of them for our batch processing test.
def make_event(user_id, event_type, amount, event_time_epoch_s):
return {
"user_id": user_id,
"event_type": event_type,
"amount": float(amount),
"event_time": int(event_time_epoch_s),
}
# Get a starting timestamp
base = datetime.now(timezone.utc).replace(microsecond=0)
t0 = int(base.timestamp())
# Our sample batch data
BATCH_EVENTS = [
make_event("u1", "purchase", 20, t0 + 5), # Window 1
make_event("u1", "purchase", 15, t0 + 20), # Window 1
make_event("u2", "purchase", 8, t0 + 35), # Window 1
make_event("u1", "refund", -5, t0 + 62), # Window 2
make_event("u2", "purchase", 12, t0 + 70), # Window 2
make_event("u3", "purchase", 9, t0 + 75), # Window 2
make_event("u2", "purchase", 3, t0 + 50), # Window 1 (Out of order!)
]
Notice that last event for u2? Its timestamp (t0 + 50) means it belongs in the first one-minute window, but we’ve placed it at the end of our list. This is a classic "out-of-order" event, and we want to see if Beam handles it correctly.
The Heart of the Pipeline: Our Reusable Logic
Here’s the beautiful part. We're going to write a reusable component, called a PTransform, that contains all our windowing and aggregation logic. This is the code that will be identical for both batch and stream.
class WindowedUserAgg(beam.PTransform):
def expand(self, pcoll):
# 1. Tell Beam to use the 'event_time' from our data
stamped = pcoll | "AddEventTimestamp" >> beam.Map(
lambda e: beam.window.TimestampedValue(e, e["event_time"])
)
# 2. Apply our windowing rules
windowed = stamped | "ApplyWindowing" >> beam.WindowInto(
FixedWindows(WINDOW_SIZE_SECS),
allowed_lateness=ALLOWED_LATENESS_SECS,
trigger=AfterWatermark(
early=AfterProcessingTime(10),
late=AfterProcessingTime(10),
),
accumulation_mode=AccumulationMode.ACCUMULATING,
)
# 3. Key the data by user_id
keyed = windowed | "KeyByUser" >> beam.Map(lambda e: (e["user_id"], e["amount"]))
# 4. Perform the aggregations (count and sum)
counts = keyed | "CountPerUser" >> beam.combiners.Count.PerKey()
sums = keyed | "SumPerUser" >> beam.CombinePerKey(sum)
# 5. Join the results back together
return (
{"count": counts, "sum_amount": sums}
| "MergeAggs" >> beam.CoGroupByKey()
| "FormatOutput" >> beam.Map(format_joined_record)
)
def format_joined_record(kv):
user_id, d = kv
return {
"user_id": user_id,
"count": int(d["count"][0]) if d["count"] else 0,
"sum_amount": float(d["sum_amount"][0]) if d["sum_amount"] else 0.0,
}
Whoa, that's a bit of code. Let's break it down.
- Timestamping: We explicitly tell Beam, "Hey, for your time calculations, don't use the time you see the data. Use the
event_timefield inside it." This is crucial for event-time processing. - Windowing: We apply our 60-second fixed windows. The
triggeris really interesting. It tells Beam: "Give me an early result 10 seconds after you process some data (early), give me the final result when you're sure you have all the data for the window (AfterWatermark), and if any late data comes in, give me an updated result 10 seconds after you see it (late)." TheACCUMULATINGmode means each update builds on the previous one. - Aggregation: The rest is standard data processing. We group by user, then calculate a count and a sum for each one.
- Formatting: Finally, we join our two calculations (count and sum) back into a nice, clean record.
This WindowedUserAgg is our self-contained, reusable engine. It doesn't care where the data comes from.
Making the Output Readable
To really understand what's happening, it helps to see which window and which "pane" (a pane is just a result emitted by a trigger) each output belongs to. Let's create a little helper to add that info.
class AddWindowInfo(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
yield {
**element,
"window_start_utc": datetime.fromtimestamp(window.start, tz=timezone.utc).strftime("%H:%M:%S"),
"window_end_utc": datetime.fromtimestamp(window.end, tz=timezone.utc).strftime("%H:%M:%S"),
"pane_timing": str(pane_info.timing),
"pane_is_last": pane_info.is_last,
}
This function takes each result from our aggregation and sprinkles in some metadata about the window it came from. Super handy for debugging and learning.
Simulating a Live Stream with TestStream
This is where it gets really cool. For our streaming mode, we don't need Kafka. We can use TestStream to precisely control the flow of events and time itself.
def build_test_stream():
return (
TestStream()
# Start time
.advance_watermark_to(t0)
# Add some initial events
.add_elements([
beam.window.TimestampedValue(make_event("u1", "purchase", 20, t0 + 5), t0 + 5),
beam.window.TimestampedValue(make_event("u1", "purchase", 15, t0 + 20), t0 + 20),
beam.window.TimestampedValue(make_event("u2", "purchase", 8, t0 + 35), t0 + 35),
])
# Pretend 5 seconds of real time have passed
.advance_processing_time(5)
# The watermark advances, closing the first window
.advance_watermark_to(t0 + 61)
# Add more events, some for the next window
.add_elements([
beam.window.TimestampedValue(make_event("u1", "refund", -5, t0 + 62), t0 + 62),
beam.window.TimestampedValue(make_event("u2", "purchase", 12, t0 + 70), t0 + 70),
])
# Pretend another 5 seconds pass
.advance_processing_time(5)
# Uh oh, here comes a late event for the FIRST window!
.add_elements([
beam.window.TimestampedValue(make_event("u2", "purchase", 3, t0 + 50), t0 + 50),
])
# Finally, advance time to the end
.advance_watermark_to_infinity()
)
Think of this as a script for our data stream. We add some elements, then we tell Beam to move its "processing time" clock forward. Then we advance the "watermark," which is Beam's way of saying, "I am now confident that no more data with a timestamp earlier than X will arrive." When a watermark crosses the end of a window, that window is "closed," and final results can be calculated. We even deliberately add a late event after its window has closed to test our allowed_lateness setting.
Putting It All Together: Running Batch and Stream
Now we just need to wire everything up. We'll create two simple functions, one for each mode.
def run_batch():
with beam.Pipeline() as p:
(
p
| "CreateBatchData" >> beam.Create(BATCH_EVENTS)
| "RunAggregation" >> WindowedUserAgg()
| "AddWindowMetadata" >> beam.ParDo(AddWindowInfo())
| "PrintResults" >> beam.Map(print)
)
def run_stream():
opts = PipelineOptions([])
opts.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=opts) as p:
(
p
| "CreateStreamData" >> build_test_stream()
| "RunAggregation" >> WindowedUserAgg()
| "AddWindowMetadata" >> beam.ParDo(AddWindowInfo())
| "PrintResults" >> beam.Map(print)
)
# The grand finale!
if MODE == "stream":
run_stream()
else:
run_batch()
Look at how similar those two functions are! The only difference is the first step.
run_batchusesbeam.Create(BATCH_EVENTS)to read from our in-memory list.run_streamuses ourbuild_test_stream()function to get its data.
The core logic, WindowedUserAgg(), is called in exactly the same way. This is the whole point. We just proved the unified model.
So, What Did We Learn?
By running this code, you'll see how Beam intelligently groups events, fires early results, provides a final result when the watermark passes, and even goes back to update that result when a late event shows up. You'll see this behavior consistently in both the batch run (which processes everything at once) and the stream run (which processes events as they "arrive").
This is more than just a neat party trick. It means you can develop and test your complex windowing logic on a simple batch file, and be confident that the exact same code will work correctly when you deploy it against a live stream from Pub/Sub, Kinesis, or Kafka.
You write the business logic once. That's a huge win for development speed, testing, and maintenance. So next time you're faced with that "batch now, stream later" problem, you'll know there’s a much better way to handle it.




