Let's Build an AI Agent That Can Actually Think on Its Feet

Akram Chauhan
Akram Chauhan
15 min read94 views
Let's Build an AI Agent That Can Actually Think on Its Feet

Have you ever watched a simple robot in a video game just walk straight into a wall that suddenly appeared? It’s kind of funny, but it highlights a huge problem. Most basic AI follows a pre-baked plan, and when the world changes, it just… breaks. It has no ability to see the new wall, stop, and think, "Huh, guess I need a new plan."

That's the difference between a simple script and true intelligence. It's the ability to adapt.

Today, we're going to build an agent that's much smarter than that wall-walking robot. We're going to create a "Streaming Decision Agent" from scratch. This isn't just an agent that makes one big plan and hopes for the best. This agent thinks on its feet. It makes a short-term plan, takes a few steps, and then stops to look around and re-evaluate. If the world has changed—if obstacles have moved or the goal has shifted—it throws out the old plan and makes a new one.

And the coolest part? It "thinks out loud." As it's making decisions, it will stream its reasoning to us, so we can see exactly why it's doing what it's doing, in real-time. Let’s get our hands dirty and build something that feels genuinely intelligent.

First Things First: A Window into the Agent's Mind

Before our agent can do anything, we need a way to understand what it's thinking. We don't want a black box. We want a play-by-play.

That's where a "streaming event" comes in. Think of it as a structured log or a status update from our agent's brain. At every step—planning, deciding, acting, observing—it will send out a little packet of information telling us what's up.

We'll use Pydantic to make sure these updates are clean and consistent. It’s a simple data class that holds the timestamp, the type of event (like plan or act), a human-readable message, and any extra data.

import random, math, time
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional, Generator, Any
from collections import deque, defaultdict

try:
    from pydantic import BaseModel, Field
except Exception:
    raise RuntimeError("Please install pydantic: `!pip -q install pydantic` (then rerun).")

class StreamEvent(BaseModel):
    t: float = Field(..., description="Wall-clock time (seconds since start)")
    kind: str = Field(..., description="event type, e.g., plan/update/act/observe/done")
    step: int = Field(..., description="agent step counter")
    msg: str = Field(..., description="human-readable partial reasoning summary")
    data: Dict[str, Any] = Field(default_factory=dict, description="structured payload")

Coord = Tuple[int, int]

Simple, right? This little StreamEvent is the foundation for everything. It’s how we'll get a live feed from our agent’s consciousness.

Let's Build a World (And Make It a Little Chaotic)

An agent is only as interesting as the world it lives in. A static, predictable world doesn't require much intelligence. So, we're going to build a world that's constantly changing.

Imagine a simple grid, like a chessboard. Our agent (A) needs to get to a target (T). But there are obstacles () in the way. Here’s the twist: every few steps, the obstacles can randomly disappear or pop up in new places. And to make it even more interesting, the target itself might jitter around.

This forces our agent to stay on its toes. A perfect plan from five seconds ago might be completely useless now. This DynamicGridWorld class handles all of that chaos for us.

@dataclass
class DynamicGridWorld:
    w: int = 18
    h: int = 10
    obstacle_ratio: float = 0.18
    seed: int = 7
    move_obstacles_every: int = 6
    spawn_obstacle_prob: float = 0.25
    clear_obstacle_prob: float = 0.15
    target_jitter_prob: float = 0.35

    rng: random.Random = field(init=False)
    obstacles: set = field(init=False, default_factory=set)
    agent: Coord = field(init=False, default=(1, 1))
    target: Coord = field(init=False, default=(15, 7))
    step_count: int = field(init=False, default=0)

    def __post_init__(self):
        self.rng = random.Random(self.seed)
        self.reset()

    def reset(self):
        self.step_count = 0
        self.obstacles = set()
        for y in range(self.h):
            for x in range(self.w):
                if (x, y) in [(1, 1), (self.w - 2, self.h - 2)]: continue
                if self.rng.random() < self.obstacle_ratio:
                    self.obstacles.add((x, y))
        self.agent = (1, 1)
        self.target = (self.w - 2, self.h - 2)
        self._ensure_free(self.agent)
        self._ensure_free(self.target)

    def _ensure_free(self, c: Coord):
        if c in self.obstacles:
            self.obstacles.remove(c)

    def in_bounds(self, c: Coord) -> bool:
        x, y = c
        return 0 <= x < self.w and 0 <= y < self.h

    def passable(self, c: Coord) -> bool:
        return c not in self.obstacles

    def neighbors4(self, c: Coord) -> List[Coord]:
        x, y = c
        cand = [(x+1,y), (x-1,y), (x,y+1), (x,y-1)]
        return [p for p in cand if self.in_bounds(p) and self.passable(p)]

    def manhattan(self, a: Coord, b: Coord) -> int:
        return abs(a[0]-b[0]) + abs(a[1]-b[1])

    def maybe_world_changes(self) -> Dict[str, Any]:
        changes = {"obstacles_added": [], "obstacles_cleared": [], "target_moved": False}
        self.step_count += 1

        if self.rng.random() < self.target_jitter_prob:
            tx, ty = self.target
            options = [(tx+1,ty),(tx-1,ty),(tx,ty+1),(tx,ty-1)]
            options = [c for c in options if self.in_bounds(c) and c != self.agent]
            self.rng.shuffle(options)
            for c in options[:3]:
                if c not in self.obstacles:
                    self.target = c
                    changes["target_moved"] = True
                    break

        if self.step_count % self.move_obstacles_every == 0:
            for _ in range(4):
                if self.rng.random() < self.clear_obstacle_prob and self.obstacles:
                    c = self.rng.choice(tuple(self.obstacles))
                    self.obstacles.remove(c)
                    changes["obstacles_cleared"].append(c)

            for _ in range(5):
                if self.rng.random() < self.spawn_obstacle_prob:
                    c = (self.rng.randrange(self.w), self.rng.randrange(self.h))
                    if c != self.agent and c != self.target:
                        self.obstacles.add(c)
                        changes["obstacles_added"].append(c)

        self._ensure_free(self.agent)
        self._ensure_free(self.target)
        return changes

    def step(self, action: str) -> Dict[str, Any]:
        ax, ay = self.agent
        move = {"R": (ax+1, ay), "L": (ax-1, ay), "D": (ax, ay+1), "U": (ax, ay-1), "S": (ax, ay)}[action]
        moved = False
        if self.in_bounds(move) and self.passable(move):
            self.agent = move
            moved = True
        
        changes = self.maybe_world_changes()
        done = (self.agent == self.target)
        return {"moved": moved, "agent": self.agent, "target": self.target, "done": done, "changes": changes}

    def render(self, path: Optional[List[Coord]] = None) -> str:
        path_set = set(path or [])
        lines = []
        for y in range(self.h):
            row = []
            for x in range(self.w):
                c = (x, y)
                if c == self.agent: row.append("A")
                elif c == self.target: row.append("T")
                elif c in path_set: row.append("·")
                elif c in self.obstacles: row.append("█")
                else: row.append(" ")
            lines.append("".join(row))
        
        border = "+" + "-" * self.w + "+"
        body = "\n".join(["|" + ln + "|" for ln in lines])
        return f"{border}\n{body}\n{border}"

Now we have a stage for our drama to unfold. Let's give our agent a brain.

Giving Our Agent a Brain (And Some Reflexes)

Our agent needs two key mental abilities:

  1. A Planner: A way to look at the world and figure out the best path to the target.
  2. A Reflex: A gut-check mechanism to avoid immediate, obvious danger, even if it means ignoring the plan for a second.

For the planner, we'll use the classic A* (A-star) search algorithm. It's a fantastic and efficient way to find the shortest path between two points. We won't let it run forever, though. We'll give it a budget (max_expand) so it has to think quickly.

For the reflex, we'll create a simple action_risk function. This function looks at a potential next step and asks, "How dangerous is this spot?" It checks if it's next to obstacles or close to the edge of the world. It’s our agent’s "spidey-sense."

@dataclass
class PlanResult:
    path: List[Coord]
    cost: float
    expanded: int
    reason: str

def astar(world: DynamicGridWorld, start: Coord, goal: Coord, max_expand: int = 5000) -> PlanResult:
    frontier = []
    import heapq
    heapq.heappush(frontier, (world.manhattan(start, goal), 0, start))
    came_from: Dict[Coord, Optional[Coord]] = {start: None}
    gscore: Dict[Coord, float] = {start: 0}
    expanded = 0

    while frontier and expanded < max_expand:
        f, g, cur = heapq.heappop(frontier)
        expanded += 1

        if cur == goal:
            path = []
            c = cur
            while c is not None:
                path.append(c)
                c = came_from[c]
            path.reverse()
            return PlanResult(path=path, cost=gscore[cur], expanded=expanded, reason="found_path")

        for nb in world.neighbors4(cur):
            ng = gscore[cur] + 1
            if nb not in gscore or ng < gscore[nb]:
                gscore[nb] = ng
                came_from[nb] = cur
                h = world.manhattan(nb, goal)
                heapq.heappush(frontier, (ng + h, ng, nb))

    return PlanResult(path=[start], cost=float("inf"), expanded=expanded, reason="no_path_or_budget")

def path_to_actions(path: List[Coord]) -> List[str]:
    actions = []
    for (x1,y1),(x2,y2) in zip(path, path[1:]):
        if x2 == x1+1 and y2 == y1: actions.append("R")
        elif x2 == x1-1 and y2 == y1: actions.append("L")
        elif x2 == x1 and y2 == y1+1: actions.append("D")
        elif x2 == x1 and y2 == y1-1: actions.append("U")
        else: actions.append("S") # Stay
    return actions

def action_risk(world: DynamicGridWorld, next_pos: Coord) -> float:
    x, y = next_pos
    near = 0
    for dx, dy in [(1,0),(-1,0),(0,1),(0,-1)]:
        c = (x+dx, y+dy)
        if world.in_bounds(c) and c in world.obstacles:
            near += 1
    edge = 1 if (x in [0, world.w-1] or y in [0, world.h-1]) else 0
    return 0.25 * near + 0.15 * edge

We now have the core components: a world, a planner, and a risk assessor. It's time to put them all together into our agent.

The Main Event: The Streaming Decision Agent

This is where the magic happens. The StreamingDecisionAgent class is the conductor of our little orchestra. It manages the state, decides when to plan, and makes the final call on what to do next.

Here’s the core philosophy, which is often called "receding-horizon control":

  1. Look around: Observe the current state of the world.
  2. Think: Use the A* planner to find the best path to the target.
  3. Commit (a little): Don't commit to the whole 50-step plan! Just commit to the next few steps (we'll call this the horizon). This is key.
  4. Act: Take one step.
  5. Repeat: Go back to step 1.

The agent only replans when it needs to. Why waste time and energy making a new plan if the world hasn't changed and the old plan is still good? The _need_replan method handles this logic. It triggers a replan if the target moves, an obstacle appears in our path, or we've simply run out of our short-term plan.

The _choose_action method is where the planner meets the reflex. It takes the action suggested by the plan and runs it through the risk check. If the planned move is suddenly invalid (e.g., an obstacle just appeared there) or too risky, the agent can override the plan and choose a safer, local move, like just staying put. This is the reactive adaptation that prevents it from walking into walls.

@dataclass
class AgentConfig:
    horizon: int = 6
    replan_on_target_move: bool = True
    replan_on_obstacle_change: bool = True
    max_steps: int = 120
    think_latency: float = 0.02
    act_latency: float = 0.01
    risk_gate: float = 0.85
    alt_search_depth: int = 2

@dataclass
class StreamingDecisionAgent:
    cfg: AgentConfig
    world: DynamicGridWorld
    
    start_time: float = field(init=False, default_factory=time.time)
    step_id: int = field(init=False, default=0)
    current_plan: List[Coord] = field(init=False, default_factory=list)
    current_actions: List[str] = field(init=False, default_factory=list)
    last_snapshot: Dict[str, Any] = field(init=False, default_factory=dict)
    stats: Dict[str, Any] = field(init=False, default_factory=lambda: defaultdict(int))

    def _now(self) -> float:
        return time.time() - self.start_time

    def _emit(self, kind: str, msg: str, data: Optional[Dict[str, Any]] = None) -> StreamEvent:
        return StreamEvent(t=self._now(), kind=kind, step=self.step_id, msg=msg, data=data or {})

    def _need_replan(self, obs: Dict[str, Any]) -> bool:
        ch = obs["changes"]
        if obs["done"]: return False
        if not self.current_plan or len(self.current_plan) <= 1: return True
        if self.cfg.replan_on_target_move and ch.get("target_moved"): return True
        if self.cfg.replan_on_obstacle_change and (ch.get("obstacles_added") or ch.get("obstacles_cleared")): return True
        if len(self.current_plan) > 1 and self.current_plan[1] in self.world.obstacles: return True
        return False

    def _plan(self) -> PlanResult:
        time.sleep(self.cfg.think_latency) # Simulate thinking time
        self.stats["replans"] += 1
        return astar(self.world, self.world.agent, self.world.target)

    def _choose_action(self, planned_action: str) -> Tuple[str, str]:
        ax, ay = self.world.agent
        action_to_delta = {"R": (1,0), "L": (-1,0), "D": (0,1), "U": (0,-1), "S": (0,0)}
        dx, dy = action_to_delta[planned_action]
        nxt = (ax+dx, ay+dy)

        # Reflex: Is the planned move now impossible?
        if not self.world.in_bounds(nxt) or not self.world.passable(nxt):
            self.stats["overrides"] += 1
            return "S", "planned_move_invalid -> wait."

        # Reflex: Is the planned move too risky?
        r = action_risk(self.world, nxt)
        if r > self.cfg.risk_gate:
            candidates = ["U","D","L","R","S"]
            best = (planned_action, float("inf"), "keep_plan")
            for a in candidates:
                dx, dy = action_to_delta[a]
                p = (ax+dx, ay+dy)
                if not self.world.in_bounds(p) or not self.world.passable(p): continue
                score = action_risk(self.world, p) + 0.05 * self.world.manhattan(p, self.world.target)
                if score < best[1]:
                    best = (a, score, "risk_avoidance_override")

            if best[0] != planned_action:
                self.stats["overrides"] += 1
                return best[0], best[2]

        return planned_action, "follow_plan"

    def run(self) -> Generator[StreamEvent, None, None]:
        yield self._emit("observe", "Initialize: reading initial state.", {"agent": self.world.agent, "target": self.world.target})
        yield self._emit("world", "Initial world snapshot.", {"grid": self.world.render()})

        for self.step_id in range(1, self.cfg.max_steps + 1):
            if self.step_id == 1 or self._need_replan(self.last_snapshot):
                pr = self._plan()
                self.current_plan = pr.path
                self.current_actions = path_to_actions(pr.path)
                
                if pr.reason != "found_path":
                    yield self._emit("plan", "Planner could not find a path within budget; switching to reactive exploration.", {"reason": pr.reason, "expanded": pr.expanded})
                    self.current_actions = []
                else:
                    horizon_path = pr.path[: max(2, min(len(pr.path), self.cfg.horizon + 1))]
                    yield self._emit("plan", f"Plan updated (online A*). Commit to next {len(horizon_path)-1} moves, then re-evaluate.", {
                        "reason": pr.reason, "path_len": len(pr.path), "expanded": pr.expanded, 
                        "commit_horizon": self.cfg.horizon, "horizon_path": horizon_path,
                        "grid_with_path": self.world.render(path=horizon_path)})

            if self.current_actions:
                planned_action = self.current_actions[0]
            else: # No plan, just try to move towards target
                ax, ay = self.world.agent; tx, ty = self.world.target
                options = []
                if tx > ax: options.append("R")
                if tx < ax: options.append("L")
                if ty > ay: options.append("D")
                if ty < ay: options.append("U")
                options += ["S","U","D","L","R"]
                planned_action = options[0]

            action, why = self._choose_action(planned_action)
            yield self._emit("decide", f"Intermediate decision: action={action} ({why}).", { "planned_action": planned_action, "chosen_action": action, "agent": self.world.agent, "target": self.world.target})
            
            time.sleep(self.cfg.act_latency)
            obs = self.world.step(action)
            self.last_snapshot = obs

            if self.current_actions:
                if action == planned_action:
                    self.current_actions = self.current_actions[1:]
                    if len(self.current_plan) > 1:
                        self.current_plan = self.current_plan[1:]
            
            ch = obs["changes"]
            surprise = []
            if ch.get("target_moved"): surprise.append("target_moved")
            if ch.get("obstacles_added"): surprise.append(f"obstacles_added={len(ch['obstacles_added'])}")
            if ch.get("obstacles_cleared"): surprise.append(f"obstacles_cleared={len(ch['obstacles_cleared'])}")
            surprise_msg = ("Surprises: " + ", ".join(surprise)) if surprise else "No major surprises."
            
            self.stats["steps"] += 1
            if obs["moved"]: self.stats["moves"] += 1
            if ch.get("target_moved"): self.stats["target_moves"] += 1
            if ch.get("obstacles_added") or ch.get("obstacles_cleared"): self.stats["world_shifts"] += 1

            yield self._emit("observe", f"Observed outcome. {surprise_msg}", {
                "moved": obs["moved"], "agent": obs["agent"], "target": obs["target"], 
                "done": obs["done"], "changes": ch,
                "grid": self.world.render(path=self.current_plan[: min(len(self.current_plan), 10)])})

            if obs["done"]:
                yield self._emit("done", "Goal reached. Stopping execution.", {"final_agent": obs["agent"], "final_target": obs["target"], "stats": dict(self.stats)})
                return

        yield self._emit("done", "Max steps reached without reaching the goal.", {"final_agent": self.world.agent, "final_target": self.world.target, "stats": dict(self.stats)})

Let's Watch It Work!

Now for the fun part. We'll set up a world, configure our agent, and then just let it run. The run_and_print function will loop through the StreamEvents our agent generates and print them out, giving us that live feed from its brain.

def run_and_print(agent: StreamingDecisionAgent, throttle: float = 0.0):
    last_kind = None
    for ev in agent.run():
        header = f"[t={ev.t:6.2f}s | step={ev.step:03d} | {ev.kind.upper():7}]"
        print(header, ev.msg)
        if ev.kind in {"plan", "observe", "world"}:
            if "grid_with_path" in ev.data:
                print(ev.data["grid_with_path"])
            elif "grid" in ev.data:
                print(ev.data["grid"])
        if throttle > 0:
            time.sleep(throttle)

# --- Configuration ---
world = DynamicGridWorld(w=18, h=10, obstacle_ratio=0.18, seed=10, move_obstacles_every=6)
cfg = AgentConfig(
    horizon=6,
    replan_on_target_move=True,
    replan_on_obstacle_change=True,
    max_steps=120,
    think_latency=0.01,
    act_latency=0.01,
    risk_gate=0.85,
    alt_search_depth=2
)
agent = StreamingDecisionAgent(cfg=cfg, world=world)

# --- Run it! ---
run_and_print(agent, throttle=0.0)

When you run this, you'll see a beautiful log of the agent's journey. You'll see it make a plan, follow it for a few steps, and then suddenly, a PLAN event will pop up with a message like "Plan updated... Commit to next 5 moves." You'll know it was triggered because the world changed. You might even see a DECIDE event where it says "risk_avoidance_override," which is our agent's reflex kicking in to keep it safe.

And there you have it. We've built an agent that's far from a dumb, script-following robot. It's a dynamic, continuously-reasoning system that can handle a bit of chaos. The real magic isn't just that it reaches the goal; it's that we can watch it think its way there. This kind of transparent, adaptive reasoning is a huge step towards building AI we can actually trust and understand.

Tags

AI Robotics Automation Agentic AI AI Engineering AI System Design Reinforcement Learning Real-time AI AI architecture Autonomous Agents AI Planning Dynamic Environments Adaptive AI Intelligent Agents Online Replanning Reactive AI Decision Making AI

Stay Updated

Get the latest articles and insights delivered straight to your inbox.

We respect your privacy. Unsubscribe at any time.

Aicosoft

AI & Technology News, Insights & Innovation

AICOSOFT delivers cutting-edge AI news, technology breakthroughs, and innovation insights. Stay informed about artificial intelligence, machine learning, robotics, and the latest tech trends shaping tomorrow.

Connect With Us

© 2026 Aicosoft. All rights reserved.