Have you ever watched a simple robot in a video game just walk straight into a wall that suddenly appeared? It’s kind of funny, but it highlights a huge problem. Most basic AI follows a pre-baked plan, and when the world changes, it just… breaks. It has no ability to see the new wall, stop, and think, "Huh, guess I need a new plan."
That's the difference between a simple script and true intelligence. It's the ability to adapt.
Today, we're going to build an agent that's much smarter than that wall-walking robot. We're going to create a "Streaming Decision Agent" from scratch. This isn't just an agent that makes one big plan and hopes for the best. This agent thinks on its feet. It makes a short-term plan, takes a few steps, and then stops to look around and re-evaluate. If the world has changed—if obstacles have moved or the goal has shifted—it throws out the old plan and makes a new one.
And the coolest part? It "thinks out loud." As it's making decisions, it will stream its reasoning to us, so we can see exactly why it's doing what it's doing, in real-time. Let’s get our hands dirty and build something that feels genuinely intelligent.
First Things First: A Window into the Agent's Mind
Before our agent can do anything, we need a way to understand what it's thinking. We don't want a black box. We want a play-by-play.
That's where a "streaming event" comes in. Think of it as a structured log or a status update from our agent's brain. At every step—planning, deciding, acting, observing—it will send out a little packet of information telling us what's up.
We'll use Pydantic to make sure these updates are clean and consistent. It’s a simple data class that holds the timestamp, the type of event (like plan or act), a human-readable message, and any extra data.
import random, math, time
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional, Generator, Any
from collections import deque, defaultdict
try:
from pydantic import BaseModel, Field
except Exception:
raise RuntimeError("Please install pydantic: `!pip -q install pydantic` (then rerun).")
class StreamEvent(BaseModel):
t: float = Field(..., description="Wall-clock time (seconds since start)")
kind: str = Field(..., description="event type, e.g., plan/update/act/observe/done")
step: int = Field(..., description="agent step counter")
msg: str = Field(..., description="human-readable partial reasoning summary")
data: Dict[str, Any] = Field(default_factory=dict, description="structured payload")
Coord = Tuple[int, int]
Simple, right? This little StreamEvent is the foundation for everything. It’s how we'll get a live feed from our agent’s consciousness.
Let's Build a World (And Make It a Little Chaotic)
An agent is only as interesting as the world it lives in. A static, predictable world doesn't require much intelligence. So, we're going to build a world that's constantly changing.
Imagine a simple grid, like a chessboard. Our agent (A) needs to get to a target (T). But there are obstacles (█) in the way. Here’s the twist: every few steps, the obstacles can randomly disappear or pop up in new places. And to make it even more interesting, the target itself might jitter around.
This forces our agent to stay on its toes. A perfect plan from five seconds ago might be completely useless now. This DynamicGridWorld class handles all of that chaos for us.
@dataclass
class DynamicGridWorld:
w: int = 18
h: int = 10
obstacle_ratio: float = 0.18
seed: int = 7
move_obstacles_every: int = 6
spawn_obstacle_prob: float = 0.25
clear_obstacle_prob: float = 0.15
target_jitter_prob: float = 0.35
rng: random.Random = field(init=False)
obstacles: set = field(init=False, default_factory=set)
agent: Coord = field(init=False, default=(1, 1))
target: Coord = field(init=False, default=(15, 7))
step_count: int = field(init=False, default=0)
def __post_init__(self):
self.rng = random.Random(self.seed)
self.reset()
def reset(self):
self.step_count = 0
self.obstacles = set()
for y in range(self.h):
for x in range(self.w):
if (x, y) in [(1, 1), (self.w - 2, self.h - 2)]: continue
if self.rng.random() < self.obstacle_ratio:
self.obstacles.add((x, y))
self.agent = (1, 1)
self.target = (self.w - 2, self.h - 2)
self._ensure_free(self.agent)
self._ensure_free(self.target)
def _ensure_free(self, c: Coord):
if c in self.obstacles:
self.obstacles.remove(c)
def in_bounds(self, c: Coord) -> bool:
x, y = c
return 0 <= x < self.w and 0 <= y < self.h
def passable(self, c: Coord) -> bool:
return c not in self.obstacles
def neighbors4(self, c: Coord) -> List[Coord]:
x, y = c
cand = [(x+1,y), (x-1,y), (x,y+1), (x,y-1)]
return [p for p in cand if self.in_bounds(p) and self.passable(p)]
def manhattan(self, a: Coord, b: Coord) -> int:
return abs(a[0]-b[0]) + abs(a[1]-b[1])
def maybe_world_changes(self) -> Dict[str, Any]:
changes = {"obstacles_added": [], "obstacles_cleared": [], "target_moved": False}
self.step_count += 1
if self.rng.random() < self.target_jitter_prob:
tx, ty = self.target
options = [(tx+1,ty),(tx-1,ty),(tx,ty+1),(tx,ty-1)]
options = [c for c in options if self.in_bounds(c) and c != self.agent]
self.rng.shuffle(options)
for c in options[:3]:
if c not in self.obstacles:
self.target = c
changes["target_moved"] = True
break
if self.step_count % self.move_obstacles_every == 0:
for _ in range(4):
if self.rng.random() < self.clear_obstacle_prob and self.obstacles:
c = self.rng.choice(tuple(self.obstacles))
self.obstacles.remove(c)
changes["obstacles_cleared"].append(c)
for _ in range(5):
if self.rng.random() < self.spawn_obstacle_prob:
c = (self.rng.randrange(self.w), self.rng.randrange(self.h))
if c != self.agent and c != self.target:
self.obstacles.add(c)
changes["obstacles_added"].append(c)
self._ensure_free(self.agent)
self._ensure_free(self.target)
return changes
def step(self, action: str) -> Dict[str, Any]:
ax, ay = self.agent
move = {"R": (ax+1, ay), "L": (ax-1, ay), "D": (ax, ay+1), "U": (ax, ay-1), "S": (ax, ay)}[action]
moved = False
if self.in_bounds(move) and self.passable(move):
self.agent = move
moved = True
changes = self.maybe_world_changes()
done = (self.agent == self.target)
return {"moved": moved, "agent": self.agent, "target": self.target, "done": done, "changes": changes}
def render(self, path: Optional[List[Coord]] = None) -> str:
path_set = set(path or [])
lines = []
for y in range(self.h):
row = []
for x in range(self.w):
c = (x, y)
if c == self.agent: row.append("A")
elif c == self.target: row.append("T")
elif c in path_set: row.append("·")
elif c in self.obstacles: row.append("█")
else: row.append(" ")
lines.append("".join(row))
border = "+" + "-" * self.w + "+"
body = "\n".join(["|" + ln + "|" for ln in lines])
return f"{border}\n{body}\n{border}"
Now we have a stage for our drama to unfold. Let's give our agent a brain.
Giving Our Agent a Brain (And Some Reflexes)
Our agent needs two key mental abilities:
- A Planner: A way to look at the world and figure out the best path to the target.
- A Reflex: A gut-check mechanism to avoid immediate, obvious danger, even if it means ignoring the plan for a second.
For the planner, we'll use the classic A* (A-star) search algorithm. It's a fantastic and efficient way to find the shortest path between two points. We won't let it run forever, though. We'll give it a budget (max_expand) so it has to think quickly.
For the reflex, we'll create a simple action_risk function. This function looks at a potential next step and asks, "How dangerous is this spot?" It checks if it's next to obstacles or close to the edge of the world. It’s our agent’s "spidey-sense."
@dataclass
class PlanResult:
path: List[Coord]
cost: float
expanded: int
reason: str
def astar(world: DynamicGridWorld, start: Coord, goal: Coord, max_expand: int = 5000) -> PlanResult:
frontier = []
import heapq
heapq.heappush(frontier, (world.manhattan(start, goal), 0, start))
came_from: Dict[Coord, Optional[Coord]] = {start: None}
gscore: Dict[Coord, float] = {start: 0}
expanded = 0
while frontier and expanded < max_expand:
f, g, cur = heapq.heappop(frontier)
expanded += 1
if cur == goal:
path = []
c = cur
while c is not None:
path.append(c)
c = came_from[c]
path.reverse()
return PlanResult(path=path, cost=gscore[cur], expanded=expanded, reason="found_path")
for nb in world.neighbors4(cur):
ng = gscore[cur] + 1
if nb not in gscore or ng < gscore[nb]:
gscore[nb] = ng
came_from[nb] = cur
h = world.manhattan(nb, goal)
heapq.heappush(frontier, (ng + h, ng, nb))
return PlanResult(path=[start], cost=float("inf"), expanded=expanded, reason="no_path_or_budget")
def path_to_actions(path: List[Coord]) -> List[str]:
actions = []
for (x1,y1),(x2,y2) in zip(path, path[1:]):
if x2 == x1+1 and y2 == y1: actions.append("R")
elif x2 == x1-1 and y2 == y1: actions.append("L")
elif x2 == x1 and y2 == y1+1: actions.append("D")
elif x2 == x1 and y2 == y1-1: actions.append("U")
else: actions.append("S") # Stay
return actions
def action_risk(world: DynamicGridWorld, next_pos: Coord) -> float:
x, y = next_pos
near = 0
for dx, dy in [(1,0),(-1,0),(0,1),(0,-1)]:
c = (x+dx, y+dy)
if world.in_bounds(c) and c in world.obstacles:
near += 1
edge = 1 if (x in [0, world.w-1] or y in [0, world.h-1]) else 0
return 0.25 * near + 0.15 * edge
We now have the core components: a world, a planner, and a risk assessor. It's time to put them all together into our agent.
The Main Event: The Streaming Decision Agent
This is where the magic happens. The StreamingDecisionAgent class is the conductor of our little orchestra. It manages the state, decides when to plan, and makes the final call on what to do next.
Here’s the core philosophy, which is often called "receding-horizon control":
- Look around: Observe the current state of the world.
- Think: Use the A* planner to find the best path to the target.
- Commit (a little): Don't commit to the whole 50-step plan! Just commit to the next few steps (we'll call this the
horizon). This is key. - Act: Take one step.
- Repeat: Go back to step 1.
The agent only replans when it needs to. Why waste time and energy making a new plan if the world hasn't changed and the old plan is still good? The _need_replan method handles this logic. It triggers a replan if the target moves, an obstacle appears in our path, or we've simply run out of our short-term plan.
The _choose_action method is where the planner meets the reflex. It takes the action suggested by the plan and runs it through the risk check. If the planned move is suddenly invalid (e.g., an obstacle just appeared there) or too risky, the agent can override the plan and choose a safer, local move, like just staying put. This is the reactive adaptation that prevents it from walking into walls.
@dataclass
class AgentConfig:
horizon: int = 6
replan_on_target_move: bool = True
replan_on_obstacle_change: bool = True
max_steps: int = 120
think_latency: float = 0.02
act_latency: float = 0.01
risk_gate: float = 0.85
alt_search_depth: int = 2
@dataclass
class StreamingDecisionAgent:
cfg: AgentConfig
world: DynamicGridWorld
start_time: float = field(init=False, default_factory=time.time)
step_id: int = field(init=False, default=0)
current_plan: List[Coord] = field(init=False, default_factory=list)
current_actions: List[str] = field(init=False, default_factory=list)
last_snapshot: Dict[str, Any] = field(init=False, default_factory=dict)
stats: Dict[str, Any] = field(init=False, default_factory=lambda: defaultdict(int))
def _now(self) -> float:
return time.time() - self.start_time
def _emit(self, kind: str, msg: str, data: Optional[Dict[str, Any]] = None) -> StreamEvent:
return StreamEvent(t=self._now(), kind=kind, step=self.step_id, msg=msg, data=data or {})
def _need_replan(self, obs: Dict[str, Any]) -> bool:
ch = obs["changes"]
if obs["done"]: return False
if not self.current_plan or len(self.current_plan) <= 1: return True
if self.cfg.replan_on_target_move and ch.get("target_moved"): return True
if self.cfg.replan_on_obstacle_change and (ch.get("obstacles_added") or ch.get("obstacles_cleared")): return True
if len(self.current_plan) > 1 and self.current_plan[1] in self.world.obstacles: return True
return False
def _plan(self) -> PlanResult:
time.sleep(self.cfg.think_latency) # Simulate thinking time
self.stats["replans"] += 1
return astar(self.world, self.world.agent, self.world.target)
def _choose_action(self, planned_action: str) -> Tuple[str, str]:
ax, ay = self.world.agent
action_to_delta = {"R": (1,0), "L": (-1,0), "D": (0,1), "U": (0,-1), "S": (0,0)}
dx, dy = action_to_delta[planned_action]
nxt = (ax+dx, ay+dy)
# Reflex: Is the planned move now impossible?
if not self.world.in_bounds(nxt) or not self.world.passable(nxt):
self.stats["overrides"] += 1
return "S", "planned_move_invalid -> wait."
# Reflex: Is the planned move too risky?
r = action_risk(self.world, nxt)
if r > self.cfg.risk_gate:
candidates = ["U","D","L","R","S"]
best = (planned_action, float("inf"), "keep_plan")
for a in candidates:
dx, dy = action_to_delta[a]
p = (ax+dx, ay+dy)
if not self.world.in_bounds(p) or not self.world.passable(p): continue
score = action_risk(self.world, p) + 0.05 * self.world.manhattan(p, self.world.target)
if score < best[1]:
best = (a, score, "risk_avoidance_override")
if best[0] != planned_action:
self.stats["overrides"] += 1
return best[0], best[2]
return planned_action, "follow_plan"
def run(self) -> Generator[StreamEvent, None, None]:
yield self._emit("observe", "Initialize: reading initial state.", {"agent": self.world.agent, "target": self.world.target})
yield self._emit("world", "Initial world snapshot.", {"grid": self.world.render()})
for self.step_id in range(1, self.cfg.max_steps + 1):
if self.step_id == 1 or self._need_replan(self.last_snapshot):
pr = self._plan()
self.current_plan = pr.path
self.current_actions = path_to_actions(pr.path)
if pr.reason != "found_path":
yield self._emit("plan", "Planner could not find a path within budget; switching to reactive exploration.", {"reason": pr.reason, "expanded": pr.expanded})
self.current_actions = []
else:
horizon_path = pr.path[: max(2, min(len(pr.path), self.cfg.horizon + 1))]
yield self._emit("plan", f"Plan updated (online A*). Commit to next {len(horizon_path)-1} moves, then re-evaluate.", {
"reason": pr.reason, "path_len": len(pr.path), "expanded": pr.expanded,
"commit_horizon": self.cfg.horizon, "horizon_path": horizon_path,
"grid_with_path": self.world.render(path=horizon_path)})
if self.current_actions:
planned_action = self.current_actions[0]
else: # No plan, just try to move towards target
ax, ay = self.world.agent; tx, ty = self.world.target
options = []
if tx > ax: options.append("R")
if tx < ax: options.append("L")
if ty > ay: options.append("D")
if ty < ay: options.append("U")
options += ["S","U","D","L","R"]
planned_action = options[0]
action, why = self._choose_action(planned_action)
yield self._emit("decide", f"Intermediate decision: action={action} ({why}).", { "planned_action": planned_action, "chosen_action": action, "agent": self.world.agent, "target": self.world.target})
time.sleep(self.cfg.act_latency)
obs = self.world.step(action)
self.last_snapshot = obs
if self.current_actions:
if action == planned_action:
self.current_actions = self.current_actions[1:]
if len(self.current_plan) > 1:
self.current_plan = self.current_plan[1:]
ch = obs["changes"]
surprise = []
if ch.get("target_moved"): surprise.append("target_moved")
if ch.get("obstacles_added"): surprise.append(f"obstacles_added={len(ch['obstacles_added'])}")
if ch.get("obstacles_cleared"): surprise.append(f"obstacles_cleared={len(ch['obstacles_cleared'])}")
surprise_msg = ("Surprises: " + ", ".join(surprise)) if surprise else "No major surprises."
self.stats["steps"] += 1
if obs["moved"]: self.stats["moves"] += 1
if ch.get("target_moved"): self.stats["target_moves"] += 1
if ch.get("obstacles_added") or ch.get("obstacles_cleared"): self.stats["world_shifts"] += 1
yield self._emit("observe", f"Observed outcome. {surprise_msg}", {
"moved": obs["moved"], "agent": obs["agent"], "target": obs["target"],
"done": obs["done"], "changes": ch,
"grid": self.world.render(path=self.current_plan[: min(len(self.current_plan), 10)])})
if obs["done"]:
yield self._emit("done", "Goal reached. Stopping execution.", {"final_agent": obs["agent"], "final_target": obs["target"], "stats": dict(self.stats)})
return
yield self._emit("done", "Max steps reached without reaching the goal.", {"final_agent": self.world.agent, "final_target": self.world.target, "stats": dict(self.stats)})
Let's Watch It Work!
Now for the fun part. We'll set up a world, configure our agent, and then just let it run. The run_and_print function will loop through the StreamEvents our agent generates and print them out, giving us that live feed from its brain.
def run_and_print(agent: StreamingDecisionAgent, throttle: float = 0.0):
last_kind = None
for ev in agent.run():
header = f"[t={ev.t:6.2f}s | step={ev.step:03d} | {ev.kind.upper():7}]"
print(header, ev.msg)
if ev.kind in {"plan", "observe", "world"}:
if "grid_with_path" in ev.data:
print(ev.data["grid_with_path"])
elif "grid" in ev.data:
print(ev.data["grid"])
if throttle > 0:
time.sleep(throttle)
# --- Configuration ---
world = DynamicGridWorld(w=18, h=10, obstacle_ratio=0.18, seed=10, move_obstacles_every=6)
cfg = AgentConfig(
horizon=6,
replan_on_target_move=True,
replan_on_obstacle_change=True,
max_steps=120,
think_latency=0.01,
act_latency=0.01,
risk_gate=0.85,
alt_search_depth=2
)
agent = StreamingDecisionAgent(cfg=cfg, world=world)
# --- Run it! ---
run_and_print(agent, throttle=0.0)
When you run this, you'll see a beautiful log of the agent's journey. You'll see it make a plan, follow it for a few steps, and then suddenly, a PLAN event will pop up with a message like "Plan updated... Commit to next 5 moves." You'll know it was triggered because the world changed. You might even see a DECIDE event where it says "risk_avoidance_override," which is our agent's reflex kicking in to keep it safe.
And there you have it. We've built an agent that's far from a dumb, script-following robot. It's a dynamic, continuously-reasoning system that can handle a bit of chaos. The real magic isn't just that it reaches the goal; it's that we can watch it think its way there. This kind of transparent, adaptive reasoning is a huge step towards building AI we can actually trust and understand.




