Advanced 5: Software Architecture
Prerequisites: Lab 6 (State Machines)
Overview
In Lab 6, you built a state machine for robot behavior. This module explores how professional embedded software is structured for reliability, maintainability, and testability.
You'll learn: - Layered architecture: separating concerns - Event-driven design patterns - Defensive programming techniques - Testing strategies for embedded code
Part 1: Why Architecture Matters
The Spaghetti Problem
Without architecture, code grows into a tangled mess:
# Spaghetti code example (DON'T DO THIS)
while True:
if sensor1.value() == 0 and state == "running" and not obstacle:
if distance < 20:
motors.stop()
leds.fill(255, 0, 0)
state = "stopped"
else:
if line_pos > 0.5:
motors.set_speeds(100, 150)
elif line_pos < -0.5:
motors.set_speeds(150, 100)
else:
motors.set_speeds(150, 150)
elif button.value() == 0:
if state == "stopped":
state = "running"
else:
state = "stopped"
# ... 500 more lines of similar code
Problems: - Hard to understand - Hard to modify (change one thing, break another) - Hard to test - Hard to debug - Toyota-level disaster waiting to happen
The Solution: Structure
Good architecture separates concerns:
┌─────────────────────────────────────────┐
│ Application Layer │
│ (State machine, mission logic) │
├─────────────────────────────────────────┤
│ Service Layer │
│ (Sensor fusion, control algorithms) │
├─────────────────────────────────────────┤
│ Driver Layer │
│ (Motor driver, IMU driver, LEDs) │
├─────────────────────────────────────────┤
│ Hardware Abstraction Layer │
│ (GPIO, I2C, PWM, timers) │
└─────────────────────────────────────────┘
Part 2: Layered Architecture
Hardware Abstraction Layer (HAL)
Lowest layer - talks directly to hardware:
# hal.py - Hardware Abstraction Layer
from machine import Pin, PWM, I2C
class GPIO:
"""Abstract GPIO operations."""
@staticmethod
def output(pin_num):
return Pin(pin_num, Pin.OUT)
@staticmethod
def input(pin_num, pull=None):
return Pin(pin_num, Pin.IN, pull)
class PWMChannel:
"""Abstract PWM operations."""
def __init__(self, pin_num, freq=1000):
self.pwm = PWM(Pin(pin_num))
self.pwm.freq(freq)
def set_duty(self, duty_percent):
"""Set duty cycle 0-100%."""
duty_u16 = int(duty_percent * 65535 / 100)
self.pwm.duty_u16(duty_u16)
class I2CBus:
"""Abstract I2C operations."""
def __init__(self, sda_pin, scl_pin, freq=400000):
self.bus = I2C(0, sda=Pin(sda_pin), scl=Pin(scl_pin), freq=freq)
def read_register(self, addr, reg, nbytes):
self.bus.writeto(addr, bytes([reg]))
return self.bus.readfrom(addr, nbytes)
def write_register(self, addr, reg, data):
self.bus.writeto(addr, bytes([reg]) + bytes(data))
Driver Layer
Uses HAL to implement device-specific logic:
# drivers/motor.py
from hal import PWMChannel, GPIO
class MotorDriver:
"""Driver for single DC motor with H-bridge."""
def __init__(self, pwm_pin, dir_pin):
self.pwm = PWMChannel(pwm_pin)
self.dir = GPIO.output(dir_pin)
self._speed = 0
def set_speed(self, speed):
"""
Set motor speed.
Args:
speed: -100 to +100 (negative = reverse)
"""
self._speed = max(-100, min(100, speed))
if self._speed >= 0:
self.dir.value(1)
self.pwm.set_duty(self._speed)
else:
self.dir.value(0)
self.pwm.set_duty(-self._speed)
def stop(self):
self.set_speed(0)
@property
def speed(self):
return self._speed
class DifferentialDrive:
"""Driver for two-wheel differential drive robot."""
def __init__(self, left_motor, right_motor):
self.left = left_motor
self.right = right_motor
def set_speeds(self, left_speed, right_speed):
self.left.set_speed(left_speed)
self.right.set_speed(right_speed)
def forward(self, speed):
self.set_speeds(speed, speed)
def turn(self, speed, turn_rate):
"""
Drive with turn.
Args:
speed: Forward speed (-100 to 100)
turn_rate: Turn rate (-100 to 100, positive = right)
"""
left = speed + turn_rate
right = speed - turn_rate
self.set_speeds(left, right)
def stop(self):
self.left.stop()
self.right.stop()
Service Layer
Implements higher-level algorithms using drivers:
# services/line_follower.py
class LineFollower:
"""Line following service using P control."""
def __init__(self, line_sensors, drive, kp=50):
self.sensors = line_sensors
self.drive = drive
self.kp = kp
self.base_speed = 80
self._enabled = False
def enable(self):
self._enabled = True
def disable(self):
self._enabled = False
self.drive.stop()
def update(self):
"""Called periodically to update control."""
if not self._enabled:
return
position = self.sensors.get_position()
if position is None:
# Line lost
self.drive.stop()
return
# P control
correction = self.kp * position
self.drive.turn(self.base_speed, correction)
def set_speed(self, speed):
self.base_speed = speed
def set_gain(self, kp):
self.kp = kp
Application Layer
Implements mission logic using services:
# app/robot_mission.py
from services.line_follower import LineFollower
from services.obstacle_detector import ObstacleDetector
class RobotMission:
"""Main robot mission controller."""
# States
IDLE = "idle"
FOLLOWING = "following"
AVOIDING = "avoiding"
FINISHED = "finished"
def __init__(self, line_follower, obstacle_detector, leds):
self.follower = line_follower
self.obstacles = obstacle_detector
self.leds = leds
self.state = self.IDLE
self._running = False
def start(self):
self._running = True
self._enter_state(self.FOLLOWING)
def stop(self):
self._running = False
self._enter_state(self.IDLE)
def update(self):
"""Main update - call periodically."""
if not self._running:
return
if self.state == self.FOLLOWING:
self._handle_following()
elif self.state == self.AVOIDING:
self._handle_avoiding()
def _handle_following(self):
# Check for obstacles
if self.obstacles.detected():
self._enter_state(self.AVOIDING)
return
# Follow line
self.follower.update()
def _handle_avoiding(self):
# Simple avoidance: stop and wait
if not self.obstacles.detected():
self._enter_state(self.FOLLOWING)
def _enter_state(self, new_state):
# Exit old state
if self.state == self.FOLLOWING:
self.follower.disable()
elif self.state == self.AVOIDING:
pass
self.state = new_state
# Enter new state
if self.state == self.IDLE:
self.leds.fill(0, 0, 50) # Blue
elif self.state == self.FOLLOWING:
self.follower.enable()
self.leds.fill(0, 50, 0) # Green
elif self.state == self.AVOIDING:
self.leds.fill(50, 50, 0) # Yellow
self.leds.show()
Part 3: Event-Driven Architecture
The Event Queue Pattern
Instead of polling everything, let events drive execution:
# events.py
from collections import deque
class Event:
"""Base event class."""
pass
class ButtonEvent(Event):
def __init__(self, button_id, pressed):
self.button_id = button_id
self.pressed = pressed
class LineEvent(Event):
def __init__(self, event_type): # "lost", "found", "crossed"
self.event_type = event_type
class ObstacleEvent(Event):
def __init__(self, distance):
self.distance = distance
class TimerEvent(Event):
def __init__(self, timer_id):
self.timer_id = timer_id
class EventQueue:
"""Thread-safe event queue."""
def __init__(self, max_size=32):
self.queue = deque((), max_size)
def post(self, event):
"""Add event to queue (can be called from ISR)."""
try:
self.queue.append(event)
except IndexError:
pass # Queue full, drop event
def get(self):
"""Get next event (returns None if empty)."""
try:
return self.queue.popleft()
except IndexError:
return None
def empty(self):
return len(self.queue) == 0
Event Producers (Interrupt Handlers)
# producers.py
from machine import Pin
from events import ButtonEvent, LineEvent, EventQueue
def setup_button_events(pin_num, event_queue):
"""Setup button to generate events."""
button = Pin(pin_num, Pin.IN, Pin.PULL_UP)
def handler(pin):
pressed = pin.value() == 0
event_queue.post(ButtonEvent(pin_num, pressed))
button.irq(trigger=Pin.IRQ_FALLING | Pin.IRQ_RISING, handler=handler)
return button
def setup_line_events(sensor_pin, event_queue):
"""Setup line sensor to generate events on transitions."""
sensor = Pin(sensor_pin, Pin.IN)
last_value = [sensor.value()] # List to allow modification in closure
def handler(pin):
value = pin.value()
if value != last_value[0]:
if value == 0:
event_queue.post(LineEvent("entered"))
else:
event_queue.post(LineEvent("left"))
last_value[0] = value
sensor.irq(trigger=Pin.IRQ_FALLING | Pin.IRQ_RISING, handler=handler)
return sensor
Event Consumer (Main Loop)
# main.py
from events import EventQueue, ButtonEvent, LineEvent, ObstacleEvent
from app.robot_mission import RobotMission
# Create event queue
events = EventQueue()
# Setup event producers
setup_button_events(22, events) # Start/stop button
setup_line_events(2, events) # Line sensor
# Create mission controller
mission = RobotMission(...)
# Main event loop
while True:
# Process all pending events
while not events.empty():
event = events.get()
if isinstance(event, ButtonEvent):
if event.pressed:
if mission.state == mission.IDLE:
mission.start()
else:
mission.stop()
elif isinstance(event, LineEvent):
if event.event_type == "lost":
# Handle line lost
pass
elif isinstance(event, ObstacleEvent):
if event.distance < 20:
# Handle close obstacle
pass
# Periodic updates
mission.update()
time.sleep_ms(10)
Benefits of Event-Driven
- Decoupling: Producers don't know about consumers
- Testability: Can inject fake events for testing
- Responsiveness: Events processed immediately
- Flexibility: Easy to add new event types
Part 4: Defensive Programming
Validate Everything
Don't trust inputs - validate them:
class Motor:
def set_speed(self, speed):
# Validate input
if not isinstance(speed, (int, float)):
raise TypeError(f"Speed must be numeric, got {type(speed)}")
if speed < -100 or speed > 100:
raise ValueError(f"Speed must be -100 to 100, got {speed}")
# Now safe to use
self._apply_speed(speed)
class LineSensors:
def get_position(self):
readings = [s.value() for s in self.sensors]
# Validate readings make sense
if all(r == 1 for r in readings):
return None # All white - line lost
if all(r == 0 for r in readings):
# All black - might be intersection or error
return 0 # Return center, let caller decide
# Normal calculation
return self._calculate_position(readings)
Fail Safe, Not Silent
When errors occur, fail in a safe state:
class RobotController:
def update(self):
try:
self._do_update()
except Exception as e:
# Log error
print(f"ERROR: {e}")
# Enter safe state
self.motors.stop()
self.leds.fill(255, 0, 0) # Red = error
self.leds.show()
# Set error flag (don't crash!)
self.error_state = True
Watchdog Timer
Detect if code gets stuck:
from machine import WDT
class SafetyMonitor:
def __init__(self, timeout_ms=5000):
# Hardware watchdog - resets if not fed
self.wdt = WDT(timeout=timeout_ms)
self.last_feed = time.ticks_ms()
def feed(self):
"""Call this regularly to prevent reset."""
self.wdt.feed()
self.last_feed = time.ticks_ms()
def check_timing(self):
"""Check if we're being called often enough."""
elapsed = time.ticks_diff(time.ticks_ms(), self.last_feed)
if elapsed > 1000:
print(f"WARNING: Main loop slow ({elapsed}ms)")
# Usage in main loop
safety = SafetyMonitor()
while True:
safety.feed() # Reset watchdog
# ... do work ...
safety.check_timing() # Warn if slow
Assert Preconditions
Check assumptions explicitly:
def calculate_pid(error, dt):
# Preconditions
assert dt > 0, "Time step must be positive"
assert -10 <= error <= 10, f"Error out of expected range: {error}"
# Now safe to calculate
return kp * error + ki * integral + kd * derivative
def set_motor_pwm(duty):
assert 0 <= duty <= 65535, f"Invalid PWM duty: {duty}"
pwm.duty_u16(duty)
Part 5: Testing Embedded Code
The Testing Challenge
Embedded code is hard to test because: - Hardware dependencies - Real-time behavior - Limited visibility
Dependency Injection
Design for testability by injecting dependencies:
# Bad: Hard to test (creates its own hardware)
class LineFollower:
def __init__(self):
self.sensors = [Pin(2, Pin.IN), Pin(3, Pin.IN), ...]
self.motors = Motors()
# Good: Dependencies injected (can use fakes)
class LineFollower:
def __init__(self, sensors, motors):
self.sensors = sensors
self.motors = motors
Fake/Mock Objects
Create fake objects for testing:
# fakes.py
class FakeSensor:
"""Fake sensor for testing."""
def __init__(self):
self._value = 0
def value(self):
return self._value
def set_value(self, v):
"""Test helper to set fake reading."""
self._value = v
class FakeMotors:
"""Fake motors that record commands."""
def __init__(self):
self.left_speed = 0
self.right_speed = 0
self.history = []
def set_speeds(self, left, right):
self.left_speed = left
self.right_speed = right
self.history.append((left, right))
def stop(self):
self.set_speeds(0, 0)
Unit Tests
Test individual components:
# tests/test_line_follower.py
from fakes import FakeSensor, FakeMotors
from services.line_follower import LineFollower
def test_follows_line_center():
"""When line is centered, drive straight."""
sensors = FakeLineSensors()
motors = FakeMotors()
follower = LineFollower(sensors, motors, kp=50)
# Simulate centered line
sensors.set_position(0)
follower.enable()
follower.update()
# Should drive straight (equal speeds)
assert motors.left_speed == motors.right_speed
def test_turns_toward_line():
"""When line is right, turn right."""
sensors = FakeLineSensors()
motors = FakeMotors()
follower = LineFollower(sensors, motors, kp=50)
# Simulate line to the right
sensors.set_position(0.5)
follower.enable()
follower.update()
# Should turn right (left faster than right)
assert motors.left_speed > motors.right_speed
def test_stops_when_disabled():
"""When disabled, motors stop."""
sensors = FakeLineSensors()
motors = FakeMotors()
follower = LineFollower(sensors, motors, kp=50)
follower.enable()
follower.update()
follower.disable()
assert motors.left_speed == 0
assert motors.right_speed == 0
# Run tests
if __name__ == "__main__":
test_follows_line_center()
test_turns_toward_line()
test_stops_when_disabled()
print("All tests passed!")
Integration Tests
Test components working together:
# tests/test_integration.py
def test_obstacle_stops_line_following():
"""Obstacle detection should stop line following."""
# Setup with fakes
sensors = FakeLineSensors()
motors = FakeMotors()
ultrasonic = FakeUltrasonic()
leds = FakeLEDs()
mission = RobotMission(
LineFollower(sensors, motors),
ObstacleDetector(ultrasonic),
leds
)
# Start mission
mission.start()
sensors.set_position(0)
mission.update()
# Verify following line
assert mission.state == mission.FOLLOWING
assert motors.left_speed > 0
# Simulate obstacle
ultrasonic.set_distance(10)
mission.update()
# Verify stopped
assert mission.state == mission.AVOIDING
assert motors.left_speed == 0
Part 6: Code Organization
Project Structure
robot/
├── main.py # Entry point
├── config.py # Pin assignments, constants
├── hal/ # Hardware Abstraction Layer
│ ├── __init__.py
│ ├── gpio.py
│ ├── pwm.py
│ └── i2c.py
├── drivers/ # Hardware drivers
│ ├── __init__.py
│ ├── motor.py
│ ├── imu.py
│ ├── leds.py
│ └── ultrasonic.py
├── services/ # Higher-level services
│ ├── __init__.py
│ ├── line_follower.py
│ ├── heading_control.py
│ └── obstacle_detector.py
├── app/ # Application logic
│ ├── __init__.py
│ ├── state_machine.py
│ └── mission.py
├── events/ # Event system
│ ├── __init__.py
│ ├── queue.py
│ └── types.py
└── tests/ # Test code
├── fakes.py
├── test_line_follower.py
└── test_mission.py
Configuration Management
Keep hardware configuration separate:
# config.py
# Pin assignments
MOTOR_LEFT_PWM = 12
MOTOR_LEFT_DIR = 13
MOTOR_RIGHT_PWM = 10
MOTOR_RIGHT_DIR = 11
LINE_SENSORS = [2, 3, 4, 5]
ULTRASONIC_TRIG = 0
ULTRASONIC_ECHO = 1
LED_PIN = 6
LED_COUNT = 8
IMU_SDA = 14
IMU_SCL = 15
# Control parameters
LINE_FOLLOW_KP = 50
LINE_FOLLOW_BASE_SPEED = 80
OBSTACLE_THRESHOLD_CM = 20
# Timing
SENSOR_RATE_HZ = 100
CONTROL_RATE_HZ = 50
LED_RATE_HZ = 20
Clean Main
Keep main.py simple:
# main.py
import time
from config import *
from hal import GPIO, PWMChannel, I2CBus
from drivers import MotorDriver, DifferentialDrive, LineSensors, Ultrasonic, LEDStrip
from services import LineFollower, ObstacleDetector
from app import RobotMission
from events import EventQueue, setup_button_events
def main():
# Initialize hardware
left_motor = MotorDriver(MOTOR_LEFT_PWM, MOTOR_LEFT_DIR)
right_motor = MotorDriver(MOTOR_RIGHT_PWM, MOTOR_RIGHT_DIR)
drive = DifferentialDrive(left_motor, right_motor)
line_sensors = LineSensors(LINE_SENSORS)
ultrasonic = Ultrasonic(ULTRASONIC_TRIG, ULTRASONIC_ECHO)
leds = LEDStrip(LED_PIN, LED_COUNT)
# Initialize services
follower = LineFollower(line_sensors, drive, LINE_FOLLOW_KP)
obstacles = ObstacleDetector(ultrasonic, OBSTACLE_THRESHOLD_CM)
# Initialize application
mission = RobotMission(follower, obstacles, leds)
# Setup events
events = EventQueue()
setup_button_events(22, events)
# Main loop
try:
while True:
process_events(events, mission)
mission.update()
time.sleep_ms(10)
finally:
drive.stop() # Safety: always stop on exit
if __name__ == "__main__":
main()
Mini-Project: Refactored Robot
Part A: Create Layer Structure
- Create the directory structure above
- Move existing code into appropriate layers
- Ensure each layer only imports from layers below
Part B: Implement Event System
- Create event types for your robot
- Add interrupt-based event producers
- Modify main loop to process events
Part C: Add Safety Features
- Add input validation to all public methods
- Implement watchdog timer
- Add error handling with safe fallback
Part D: Write Tests
- Create fake objects for all hardware
- Write unit tests for line follower
- Write integration test for mission
Deliverable
- Refactored code with clean architecture
- Event system implementation
- Safety features (watchdog, validation)
- Test suite with passing tests
- Documentation of design decisions
Key Takeaways
- Layers separate concerns - HAL, drivers, services, application
- Events decouple components - producers don't know consumers
- Validate everything - don't trust inputs
- Fail safe - errors should stop, not crash
- Design for testing - inject dependencies
- Organize for humans - clear structure, good names
Further Reading
- Making Embedded Systems - Elecia White's architecture chapter
- Embedded Software Design - Barr Group resources
- Test-Driven Development for Embedded C - James Grenning's book
← Back: Sensor Fusion | Advanced Track Overview | Next: Communication & Data →