Skip to content

Advanced 5: Software Architecture

Prerequisites: Lab 6 (State Machines)


Overview

In Lab 6, you built a state machine for robot behavior. This module explores how professional embedded software is structured for reliability, maintainability, and testability.

You'll learn: - Layered architecture: separating concerns - Event-driven design patterns - Defensive programming techniques - Testing strategies for embedded code


Part 1: Why Architecture Matters

The Spaghetti Problem

Without architecture, code grows into a tangled mess:

# Spaghetti code example (DON'T DO THIS)
while True:
    if sensor1.value() == 0 and state == "running" and not obstacle:
        if distance < 20:
            motors.stop()
            leds.fill(255, 0, 0)
            state = "stopped"
        else:
            if line_pos > 0.5:
                motors.set_speeds(100, 150)
            elif line_pos < -0.5:
                motors.set_speeds(150, 100)
            else:
                motors.set_speeds(150, 150)
    elif button.value() == 0:
        if state == "stopped":
            state = "running"
        else:
            state = "stopped"
    # ... 500 more lines of similar code

Problems: - Hard to understand - Hard to modify (change one thing, break another) - Hard to test - Hard to debug - Toyota-level disaster waiting to happen

The Solution: Structure

Good architecture separates concerns:

┌─────────────────────────────────────────┐
│           Application Layer             │
│    (State machine, mission logic)       │
├─────────────────────────────────────────┤
│           Service Layer                 │
│  (Sensor fusion, control algorithms)    │
├─────────────────────────────────────────┤
│            Driver Layer                 │
│   (Motor driver, IMU driver, LEDs)      │
├─────────────────────────────────────────┤
│        Hardware Abstraction Layer       │
│        (GPIO, I2C, PWM, timers)         │
└─────────────────────────────────────────┘

Part 2: Layered Architecture

Hardware Abstraction Layer (HAL)

Lowest layer - talks directly to hardware:

# hal.py - Hardware Abstraction Layer

from machine import Pin, PWM, I2C

class GPIO:
    """Abstract GPIO operations."""
    @staticmethod
    def output(pin_num):
        return Pin(pin_num, Pin.OUT)

    @staticmethod
    def input(pin_num, pull=None):
        return Pin(pin_num, Pin.IN, pull)

class PWMChannel:
    """Abstract PWM operations."""
    def __init__(self, pin_num, freq=1000):
        self.pwm = PWM(Pin(pin_num))
        self.pwm.freq(freq)

    def set_duty(self, duty_percent):
        """Set duty cycle 0-100%."""
        duty_u16 = int(duty_percent * 65535 / 100)
        self.pwm.duty_u16(duty_u16)

class I2CBus:
    """Abstract I2C operations."""
    def __init__(self, sda_pin, scl_pin, freq=400000):
        self.bus = I2C(0, sda=Pin(sda_pin), scl=Pin(scl_pin), freq=freq)

    def read_register(self, addr, reg, nbytes):
        self.bus.writeto(addr, bytes([reg]))
        return self.bus.readfrom(addr, nbytes)

    def write_register(self, addr, reg, data):
        self.bus.writeto(addr, bytes([reg]) + bytes(data))

Driver Layer

Uses HAL to implement device-specific logic:

# drivers/motor.py

from hal import PWMChannel, GPIO

class MotorDriver:
    """Driver for single DC motor with H-bridge."""

    def __init__(self, pwm_pin, dir_pin):
        self.pwm = PWMChannel(pwm_pin)
        self.dir = GPIO.output(dir_pin)
        self._speed = 0

    def set_speed(self, speed):
        """
        Set motor speed.

        Args:
            speed: -100 to +100 (negative = reverse)
        """
        self._speed = max(-100, min(100, speed))

        if self._speed >= 0:
            self.dir.value(1)
            self.pwm.set_duty(self._speed)
        else:
            self.dir.value(0)
            self.pwm.set_duty(-self._speed)

    def stop(self):
        self.set_speed(0)

    @property
    def speed(self):
        return self._speed


class DifferentialDrive:
    """Driver for two-wheel differential drive robot."""

    def __init__(self, left_motor, right_motor):
        self.left = left_motor
        self.right = right_motor

    def set_speeds(self, left_speed, right_speed):
        self.left.set_speed(left_speed)
        self.right.set_speed(right_speed)

    def forward(self, speed):
        self.set_speeds(speed, speed)

    def turn(self, speed, turn_rate):
        """
        Drive with turn.

        Args:
            speed: Forward speed (-100 to 100)
            turn_rate: Turn rate (-100 to 100, positive = right)
        """
        left = speed + turn_rate
        right = speed - turn_rate
        self.set_speeds(left, right)

    def stop(self):
        self.left.stop()
        self.right.stop()

Service Layer

Implements higher-level algorithms using drivers:

# services/line_follower.py

class LineFollower:
    """Line following service using P control."""

    def __init__(self, line_sensors, drive, kp=50):
        self.sensors = line_sensors
        self.drive = drive
        self.kp = kp
        self.base_speed = 80
        self._enabled = False

    def enable(self):
        self._enabled = True

    def disable(self):
        self._enabled = False
        self.drive.stop()

    def update(self):
        """Called periodically to update control."""
        if not self._enabled:
            return

        position = self.sensors.get_position()

        if position is None:
            # Line lost
            self.drive.stop()
            return

        # P control
        correction = self.kp * position
        self.drive.turn(self.base_speed, correction)

    def set_speed(self, speed):
        self.base_speed = speed

    def set_gain(self, kp):
        self.kp = kp

Application Layer

Implements mission logic using services:

# app/robot_mission.py

from services.line_follower import LineFollower
from services.obstacle_detector import ObstacleDetector

class RobotMission:
    """Main robot mission controller."""

    # States
    IDLE = "idle"
    FOLLOWING = "following"
    AVOIDING = "avoiding"
    FINISHED = "finished"

    def __init__(self, line_follower, obstacle_detector, leds):
        self.follower = line_follower
        self.obstacles = obstacle_detector
        self.leds = leds

        self.state = self.IDLE
        self._running = False

    def start(self):
        self._running = True
        self._enter_state(self.FOLLOWING)

    def stop(self):
        self._running = False
        self._enter_state(self.IDLE)

    def update(self):
        """Main update - call periodically."""
        if not self._running:
            return

        if self.state == self.FOLLOWING:
            self._handle_following()
        elif self.state == self.AVOIDING:
            self._handle_avoiding()

    def _handle_following(self):
        # Check for obstacles
        if self.obstacles.detected():
            self._enter_state(self.AVOIDING)
            return

        # Follow line
        self.follower.update()

    def _handle_avoiding(self):
        # Simple avoidance: stop and wait
        if not self.obstacles.detected():
            self._enter_state(self.FOLLOWING)

    def _enter_state(self, new_state):
        # Exit old state
        if self.state == self.FOLLOWING:
            self.follower.disable()
        elif self.state == self.AVOIDING:
            pass

        self.state = new_state

        # Enter new state
        if self.state == self.IDLE:
            self.leds.fill(0, 0, 50)  # Blue
        elif self.state == self.FOLLOWING:
            self.follower.enable()
            self.leds.fill(0, 50, 0)  # Green
        elif self.state == self.AVOIDING:
            self.leds.fill(50, 50, 0)  # Yellow

        self.leds.show()

Part 3: Event-Driven Architecture

The Event Queue Pattern

Instead of polling everything, let events drive execution:

# events.py

from collections import deque

class Event:
    """Base event class."""
    pass

class ButtonEvent(Event):
    def __init__(self, button_id, pressed):
        self.button_id = button_id
        self.pressed = pressed

class LineEvent(Event):
    def __init__(self, event_type):  # "lost", "found", "crossed"
        self.event_type = event_type

class ObstacleEvent(Event):
    def __init__(self, distance):
        self.distance = distance

class TimerEvent(Event):
    def __init__(self, timer_id):
        self.timer_id = timer_id


class EventQueue:
    """Thread-safe event queue."""

    def __init__(self, max_size=32):
        self.queue = deque((), max_size)

    def post(self, event):
        """Add event to queue (can be called from ISR)."""
        try:
            self.queue.append(event)
        except IndexError:
            pass  # Queue full, drop event

    def get(self):
        """Get next event (returns None if empty)."""
        try:
            return self.queue.popleft()
        except IndexError:
            return None

    def empty(self):
        return len(self.queue) == 0

Event Producers (Interrupt Handlers)

# producers.py

from machine import Pin
from events import ButtonEvent, LineEvent, EventQueue

def setup_button_events(pin_num, event_queue):
    """Setup button to generate events."""
    button = Pin(pin_num, Pin.IN, Pin.PULL_UP)

    def handler(pin):
        pressed = pin.value() == 0
        event_queue.post(ButtonEvent(pin_num, pressed))

    button.irq(trigger=Pin.IRQ_FALLING | Pin.IRQ_RISING, handler=handler)
    return button


def setup_line_events(sensor_pin, event_queue):
    """Setup line sensor to generate events on transitions."""
    sensor = Pin(sensor_pin, Pin.IN)
    last_value = [sensor.value()]  # List to allow modification in closure

    def handler(pin):
        value = pin.value()
        if value != last_value[0]:
            if value == 0:
                event_queue.post(LineEvent("entered"))
            else:
                event_queue.post(LineEvent("left"))
            last_value[0] = value

    sensor.irq(trigger=Pin.IRQ_FALLING | Pin.IRQ_RISING, handler=handler)
    return sensor

Event Consumer (Main Loop)

# main.py

from events import EventQueue, ButtonEvent, LineEvent, ObstacleEvent
from app.robot_mission import RobotMission

# Create event queue
events = EventQueue()

# Setup event producers
setup_button_events(22, events)  # Start/stop button
setup_line_events(2, events)     # Line sensor

# Create mission controller
mission = RobotMission(...)

# Main event loop
while True:
    # Process all pending events
    while not events.empty():
        event = events.get()

        if isinstance(event, ButtonEvent):
            if event.pressed:
                if mission.state == mission.IDLE:
                    mission.start()
                else:
                    mission.stop()

        elif isinstance(event, LineEvent):
            if event.event_type == "lost":
                # Handle line lost
                pass

        elif isinstance(event, ObstacleEvent):
            if event.distance < 20:
                # Handle close obstacle
                pass

    # Periodic updates
    mission.update()
    time.sleep_ms(10)

Benefits of Event-Driven

  1. Decoupling: Producers don't know about consumers
  2. Testability: Can inject fake events for testing
  3. Responsiveness: Events processed immediately
  4. Flexibility: Easy to add new event types

Part 4: Defensive Programming

Validate Everything

Don't trust inputs - validate them:

class Motor:
    def set_speed(self, speed):
        # Validate input
        if not isinstance(speed, (int, float)):
            raise TypeError(f"Speed must be numeric, got {type(speed)}")

        if speed < -100 or speed > 100:
            raise ValueError(f"Speed must be -100 to 100, got {speed}")

        # Now safe to use
        self._apply_speed(speed)


class LineSensors:
    def get_position(self):
        readings = [s.value() for s in self.sensors]

        # Validate readings make sense
        if all(r == 1 for r in readings):
            return None  # All white - line lost

        if all(r == 0 for r in readings):
            # All black - might be intersection or error
            return 0  # Return center, let caller decide

        # Normal calculation
        return self._calculate_position(readings)

Fail Safe, Not Silent

When errors occur, fail in a safe state:

class RobotController:
    def update(self):
        try:
            self._do_update()
        except Exception as e:
            # Log error
            print(f"ERROR: {e}")

            # Enter safe state
            self.motors.stop()
            self.leds.fill(255, 0, 0)  # Red = error
            self.leds.show()

            # Set error flag (don't crash!)
            self.error_state = True

Watchdog Timer

Detect if code gets stuck:

from machine import WDT

class SafetyMonitor:
    def __init__(self, timeout_ms=5000):
        # Hardware watchdog - resets if not fed
        self.wdt = WDT(timeout=timeout_ms)
        self.last_feed = time.ticks_ms()

    def feed(self):
        """Call this regularly to prevent reset."""
        self.wdt.feed()
        self.last_feed = time.ticks_ms()

    def check_timing(self):
        """Check if we're being called often enough."""
        elapsed = time.ticks_diff(time.ticks_ms(), self.last_feed)
        if elapsed > 1000:
            print(f"WARNING: Main loop slow ({elapsed}ms)")


# Usage in main loop
safety = SafetyMonitor()

while True:
    safety.feed()  # Reset watchdog

    # ... do work ...

    safety.check_timing()  # Warn if slow

Assert Preconditions

Check assumptions explicitly:

def calculate_pid(error, dt):
    # Preconditions
    assert dt > 0, "Time step must be positive"
    assert -10 <= error <= 10, f"Error out of expected range: {error}"

    # Now safe to calculate
    return kp * error + ki * integral + kd * derivative


def set_motor_pwm(duty):
    assert 0 <= duty <= 65535, f"Invalid PWM duty: {duty}"
    pwm.duty_u16(duty)

Part 5: Testing Embedded Code

The Testing Challenge

Embedded code is hard to test because: - Hardware dependencies - Real-time behavior - Limited visibility

Dependency Injection

Design for testability by injecting dependencies:

# Bad: Hard to test (creates its own hardware)
class LineFollower:
    def __init__(self):
        self.sensors = [Pin(2, Pin.IN), Pin(3, Pin.IN), ...]
        self.motors = Motors()

# Good: Dependencies injected (can use fakes)
class LineFollower:
    def __init__(self, sensors, motors):
        self.sensors = sensors
        self.motors = motors

Fake/Mock Objects

Create fake objects for testing:

# fakes.py

class FakeSensor:
    """Fake sensor for testing."""

    def __init__(self):
        self._value = 0

    def value(self):
        return self._value

    def set_value(self, v):
        """Test helper to set fake reading."""
        self._value = v


class FakeMotors:
    """Fake motors that record commands."""

    def __init__(self):
        self.left_speed = 0
        self.right_speed = 0
        self.history = []

    def set_speeds(self, left, right):
        self.left_speed = left
        self.right_speed = right
        self.history.append((left, right))

    def stop(self):
        self.set_speeds(0, 0)

Unit Tests

Test individual components:

# tests/test_line_follower.py

from fakes import FakeSensor, FakeMotors
from services.line_follower import LineFollower

def test_follows_line_center():
    """When line is centered, drive straight."""
    sensors = FakeLineSensors()
    motors = FakeMotors()
    follower = LineFollower(sensors, motors, kp=50)

    # Simulate centered line
    sensors.set_position(0)

    follower.enable()
    follower.update()

    # Should drive straight (equal speeds)
    assert motors.left_speed == motors.right_speed


def test_turns_toward_line():
    """When line is right, turn right."""
    sensors = FakeLineSensors()
    motors = FakeMotors()
    follower = LineFollower(sensors, motors, kp=50)

    # Simulate line to the right
    sensors.set_position(0.5)

    follower.enable()
    follower.update()

    # Should turn right (left faster than right)
    assert motors.left_speed > motors.right_speed


def test_stops_when_disabled():
    """When disabled, motors stop."""
    sensors = FakeLineSensors()
    motors = FakeMotors()
    follower = LineFollower(sensors, motors, kp=50)

    follower.enable()
    follower.update()
    follower.disable()

    assert motors.left_speed == 0
    assert motors.right_speed == 0


# Run tests
if __name__ == "__main__":
    test_follows_line_center()
    test_turns_toward_line()
    test_stops_when_disabled()
    print("All tests passed!")

Integration Tests

Test components working together:

# tests/test_integration.py

def test_obstacle_stops_line_following():
    """Obstacle detection should stop line following."""
    # Setup with fakes
    sensors = FakeLineSensors()
    motors = FakeMotors()
    ultrasonic = FakeUltrasonic()
    leds = FakeLEDs()

    mission = RobotMission(
        LineFollower(sensors, motors),
        ObstacleDetector(ultrasonic),
        leds
    )

    # Start mission
    mission.start()
    sensors.set_position(0)
    mission.update()

    # Verify following line
    assert mission.state == mission.FOLLOWING
    assert motors.left_speed > 0

    # Simulate obstacle
    ultrasonic.set_distance(10)
    mission.update()

    # Verify stopped
    assert mission.state == mission.AVOIDING
    assert motors.left_speed == 0

Part 6: Code Organization

Project Structure

robot/
├── main.py              # Entry point
├── config.py            # Pin assignments, constants
├── hal/                 # Hardware Abstraction Layer
│   ├── __init__.py
│   ├── gpio.py
│   ├── pwm.py
│   └── i2c.py
├── drivers/             # Hardware drivers
│   ├── __init__.py
│   ├── motor.py
│   ├── imu.py
│   ├── leds.py
│   └── ultrasonic.py
├── services/            # Higher-level services
│   ├── __init__.py
│   ├── line_follower.py
│   ├── heading_control.py
│   └── obstacle_detector.py
├── app/                 # Application logic
│   ├── __init__.py
│   ├── state_machine.py
│   └── mission.py
├── events/              # Event system
│   ├── __init__.py
│   ├── queue.py
│   └── types.py
└── tests/               # Test code
    ├── fakes.py
    ├── test_line_follower.py
    └── test_mission.py

Configuration Management

Keep hardware configuration separate:

# config.py

# Pin assignments
MOTOR_LEFT_PWM = 12
MOTOR_LEFT_DIR = 13
MOTOR_RIGHT_PWM = 10
MOTOR_RIGHT_DIR = 11

LINE_SENSORS = [2, 3, 4, 5]

ULTRASONIC_TRIG = 0
ULTRASONIC_ECHO = 1

LED_PIN = 6
LED_COUNT = 8

IMU_SDA = 14
IMU_SCL = 15

# Control parameters
LINE_FOLLOW_KP = 50
LINE_FOLLOW_BASE_SPEED = 80
OBSTACLE_THRESHOLD_CM = 20

# Timing
SENSOR_RATE_HZ = 100
CONTROL_RATE_HZ = 50
LED_RATE_HZ = 20

Clean Main

Keep main.py simple:

# main.py

import time
from config import *
from hal import GPIO, PWMChannel, I2CBus
from drivers import MotorDriver, DifferentialDrive, LineSensors, Ultrasonic, LEDStrip
from services import LineFollower, ObstacleDetector
from app import RobotMission
from events import EventQueue, setup_button_events

def main():
    # Initialize hardware
    left_motor = MotorDriver(MOTOR_LEFT_PWM, MOTOR_LEFT_DIR)
    right_motor = MotorDriver(MOTOR_RIGHT_PWM, MOTOR_RIGHT_DIR)
    drive = DifferentialDrive(left_motor, right_motor)

    line_sensors = LineSensors(LINE_SENSORS)
    ultrasonic = Ultrasonic(ULTRASONIC_TRIG, ULTRASONIC_ECHO)
    leds = LEDStrip(LED_PIN, LED_COUNT)

    # Initialize services
    follower = LineFollower(line_sensors, drive, LINE_FOLLOW_KP)
    obstacles = ObstacleDetector(ultrasonic, OBSTACLE_THRESHOLD_CM)

    # Initialize application
    mission = RobotMission(follower, obstacles, leds)

    # Setup events
    events = EventQueue()
    setup_button_events(22, events)

    # Main loop
    try:
        while True:
            process_events(events, mission)
            mission.update()
            time.sleep_ms(10)
    finally:
        drive.stop()  # Safety: always stop on exit

if __name__ == "__main__":
    main()

Mini-Project: Refactored Robot

Part A: Create Layer Structure

  1. Create the directory structure above
  2. Move existing code into appropriate layers
  3. Ensure each layer only imports from layers below

Part B: Implement Event System

  1. Create event types for your robot
  2. Add interrupt-based event producers
  3. Modify main loop to process events

Part C: Add Safety Features

  1. Add input validation to all public methods
  2. Implement watchdog timer
  3. Add error handling with safe fallback

Part D: Write Tests

  1. Create fake objects for all hardware
  2. Write unit tests for line follower
  3. Write integration test for mission

Deliverable

  • Refactored code with clean architecture
  • Event system implementation
  • Safety features (watchdog, validation)
  • Test suite with passing tests
  • Documentation of design decisions

Key Takeaways

  1. Layers separate concerns - HAL, drivers, services, application
  2. Events decouple components - producers don't know consumers
  3. Validate everything - don't trust inputs
  4. Fail safe - errors should stop, not crash
  5. Design for testing - inject dependencies
  6. Organize for humans - clear structure, good names

Further Reading


← Back: Sensor Fusion | Advanced Track Overview | Next: Communication & Data →