Advanced 2: Real-Time Concepts
Prerequisites: Lab 3 (Ultrasonic & Timing)
Overview
In Lab 2, you learned non-blocking patterns and ran multiple tasks at different frequencies. This module explains the theory behind task scheduling and real-time systems.
You'll learn: - How to model tasks mathematically - Cooperative vs preemptive scheduling - How to analyze if your system can meet all deadlines - What happens when things go wrong (priority inversion, jitter)
Part 1: What is "Real-Time"?
Real-Time ≠ Fast
A common misconception: "real-time" means "really fast."
Actually: Real-time means predictable timing - meeting deadlines reliably.
| System | Speed | Real-Time? |
|---|---|---|
| Video game (60 FPS) | Fast | Soft real-time (frame drops OK) |
| Web server | Fast | Not real-time (latency varies) |
| Pacemaker | Slow (1 Hz) | Hard real-time (must not miss!) |
| Car ABS | Medium (100 Hz) | Hard real-time (safety critical) |
Hard vs Soft Real-Time
Hard real-time: - Missing a deadline = system failure - Examples: airbag deployment, flight control - Must be mathematically proven to meet deadlines
Soft real-time: - Missing a deadline = degraded performance - Examples: video streaming, audio playback - Statistical guarantees acceptable
Your robot: Soft real-time. Missing a sensor read is bad but not catastrophic.
The Enemy: Unpredictability
Sources of timing unpredictability: - Variable execution time (branches, loops) - Interrupts from other sources - Memory access patterns (cache misses) - Garbage collection (in Python!) - Blocking operations
Part 2: Task Models
Periodic Tasks
Most embedded tasks are periodic - they repeat at fixed intervals.
A periodic task \(\tau_i\) has: - Period \(T_i\): Time between activations - Execution time \(C_i\): Time to complete one run - Deadline \(D_i\): When it must finish (usually \(D_i = T_i\))
Task: Read sensors every 10ms
┌─┐ ┌─┐ ┌─┐ ┌─┐
│C│ │C│ │C│ │C│
────┴─┴───────┴─┴───────┴─┴───────┴─┴────►
0 10ms 10 20ms 20 30ms 30 40ms
└─T─┘ └─T─┘ └─T─┘
C = execution time (~1ms)
T = period (10ms)
D = deadline (end of period)
Aperiodic Tasks
Some tasks run in response to events, not time: - Button press handler - Incoming message processor - Error handler
These are harder to analyze because timing is unpredictable.
Sporadic Tasks
Like aperiodic, but with a minimum inter-arrival time: - "Button can be pressed, but not faster than every 100ms" - Allows analysis by treating as worst-case periodic
Part 3: Execution Time Analysis
Worst-Case Execution Time (WCET)
For deadline analysis, we need the longest possible execution time.
def read_sensors():
values = []
for s in sensors: # Fixed loop: 4 iterations
values.append(s.value()) # Fixed time per read
return values
# WCET is predictable (same every time)
vs:
def process_data(data):
result = []
for item in data: # Variable loop length!
if complex_condition(item):
result.append(slow_operation(item))
return result
# WCET depends on data - much harder to analyze
Measuring Execution Time
import time
def measure_wcet(func, iterations=1000):
"""Measure worst-case execution time."""
times = []
for _ in range(iterations):
start = time.ticks_us()
func()
elapsed = time.ticks_diff(time.ticks_us(), start)
times.append(elapsed)
return {
'min': min(times),
'max': max(times), # This is WCET estimate
'avg': sum(times) / len(times),
'jitter': max(times) - min(times)
}
# Example usage
def my_task():
# Your task code
pass
stats = measure_wcet(my_task)
print(f"WCET: {stats['max']} µs")
print(f"Jitter: {stats['jitter']} µs")
Why Python Makes WCET Hard
Python adds unpredictability: - Garbage collection can pause execution - Dynamic typing adds runtime checks - Bytecode interpretation varies with code path - Memory allocation for lists, strings, etc.
This is why safety-critical systems use C with strict coding rules (MISRA).
Part 4: Utilization Analysis
CPU Utilization
Utilization is the fraction of CPU time used by tasks:
Total utilization:
Example: Your Robot
| Task | Period (\(T\)) | WCET (\(C\)) | Utilization |
|---|---|---|---|
| Line sensors | 10 ms | 0.5 ms | 5.0% |
| LED update | 10 ms | 2.0 ms | 20.0% |
| Ultrasonic | 100 ms | 30 ms | 30.0% |
| Logging | 1000 ms | 5 ms | 0.5% |
| Heartbeat | 500 ms | 0.1 ms | 0.02% |
| Total | 55.5% |
Schedulability
Simple rule: If \(U_{total} < 1\) (100%), the system might be schedulable.
But wait! This assumes perfect scheduling. Reality is messier.
Utilization Bound
For n periodic tasks with rate-monotonic scheduling (shortest period = highest priority):
| Tasks | Utilization Bound |
|---|---|
| 1 | 100% |
| 2 | 82.8% |
| 3 | 78.0% |
| 4 | 75.7% |
| ∞ | 69.3% |
If \(U_{total} \leq U_{bound}\), system is guaranteed schedulable.
Your robot at 55.5% is well under the bound - good!
Part 5: Scheduling Approaches
Run-to-Completion (Your Current Approach)
Each task runs completely before the next:
while True:
now = time.ticks_ms()
if time.ticks_diff(now, last_sensor) >= 10:
read_sensors() # Runs to completion
last_sensor = now
if time.ticks_diff(now, last_led) >= 10:
update_leds() # Runs to completion
last_led = now
Pros: Simple, no race conditions Cons: Long task blocks everything
Cooperative Scheduling
Tasks voluntarily yield control:
class Task:
def __init__(self, period_ms, func):
self.period = period_ms
self.func = func
self.last_run = 0
class Scheduler:
def __init__(self):
self.tasks = []
def add(self, period_ms, func):
self.tasks.append(Task(period_ms, func))
def run(self):
while True:
now = time.ticks_ms()
for task in self.tasks:
if time.ticks_diff(now, task.last_run) >= task.period:
task.func() # Task runs
task.last_run = now
# No preemption - task must return!
# Usage
sched = Scheduler()
sched.add(10, read_sensors)
sched.add(10, update_leds)
sched.add(100, read_ultrasonic)
sched.run()
Key insight: Tasks must be short and return quickly. A task that takes 30ms blocks everything for 30ms!
Preemptive Scheduling
Higher priority tasks interrupt lower priority:
Without preemption:
[ Low Priority Task (30ms) ][High][Low cont...]
↑
High priority waits!
With preemption:
[ Low ][High][ Low continued... ]
↑
Interrupts immediately!
Pros: High priority tasks get immediate attention Cons: Complex, race conditions, need RTOS
Interrupts as Preemption
You used limited preemption with interrupts in State Machines — Part 7, where the echo pin ISR captured timing without blocking the main loop:
def sensor_isr(pin):
global edge_count
edge_count += 1 # Runs immediately, preempts main code
sensor.irq(trigger=Pin.IRQ_FALLING, handler=sensor_isr)
The interrupt preempts your main loop instantly!
Part 6: Timing Problems
Jitter
Jitter = variation in timing between task executions.
Ideal (no jitter):
│ │ │ │ │ │ │
0 10 20 30 40 50 60 ms
With jitter:
│ │ │ │ │ │ │
0 9 21 29 41 50 62 ms
↑ ↑
1ms early 1ms late
Measuring Jitter
import time
PERIOD = 10 # ms
last_time = time.ticks_ms()
intervals = []
for _ in range(100):
# Wait for next period
while time.ticks_diff(time.ticks_ms(), last_time) < PERIOD:
pass
now = time.ticks_ms()
actual_interval = time.ticks_diff(now, last_time)
intervals.append(actual_interval)
last_time = now
# Do task work here
do_task()
# Analyze jitter
target = PERIOD
jitter = [abs(i - target) for i in intervals]
print(f"Max jitter: {max(jitter)} ms")
print(f"Avg jitter: {sum(jitter)/len(jitter):.2f} ms")
Causes of Jitter
- Other tasks taking time
- Interrupts (including system interrupts)
- Variable execution time in tasks
- Garbage collection (Python)
- Busy-wait granularity (checking in a loop)
Priority Inversion
A nasty problem in preemptive systems:
High priority task: H (needs resource R)
Medium priority task: M
Low priority task: L (holds resource R)
Timeline:
1. L starts, acquires R
2. H arrives, needs R, blocks waiting for L
3. M arrives, preempts L (M > L priority)
4. H is stuck waiting for L, but M keeps running!
H is effectively lower priority than M - INVERTED!
Solution: Priority inheritance (L temporarily gets H's priority while holding R)
This is why RTOS exists - to handle these complexities.
Part 7: Building a Simple Scheduler
Let's build a cooperative scheduler with timing analysis:
import time
class ScheduledTask:
def __init__(self, name, period_ms, func):
self.name = name
self.period = period_ms
self.func = func
self.last_run = 0
self.run_count = 0
self.total_time = 0
self.max_time = 0
self.missed_deadlines = 0
class CooperativeScheduler:
def __init__(self):
self.tasks = []
self.start_time = 0
def add_task(self, name, period_ms, func):
self.tasks.append(ScheduledTask(name, period_ms, func))
def run(self, duration_ms=10000):
"""Run scheduler for specified duration."""
self.start_time = time.ticks_ms()
# Initialize all task timers
now = time.ticks_ms()
for task in self.tasks:
task.last_run = now
while time.ticks_diff(time.ticks_ms(), self.start_time) < duration_ms:
now = time.ticks_ms()
for task in self.tasks:
elapsed = time.ticks_diff(now, task.last_run)
if elapsed >= task.period:
# Check if we missed the deadline
if elapsed > task.period * 1.5:
task.missed_deadlines += 1
# Run the task and measure time
task_start = time.ticks_us()
task.func()
task_time = time.ticks_diff(time.ticks_us(), task_start)
# Update statistics
task.run_count += 1
task.total_time += task_time
if task_time > task.max_time:
task.max_time = task_time
task.last_run = now
def print_stats(self):
print("\n=== Scheduler Statistics ===\n")
print(f"{'Task':<15} {'Runs':>6} {'Avg(ms)':>8} {'Max(ms)':>8} {'Missed':>7} {'Util%':>6}")
print("-" * 55)
total_util = 0
for task in self.tasks:
if task.run_count > 0:
avg_ms = task.total_time / task.run_count / 1000
max_ms = task.max_time / 1000
util = (task.max_time / 1000) / task.period * 100
total_util += util
print(f"{task.name:<15} {task.run_count:>6} {avg_ms:>8.2f} {max_ms:>8.2f} {task.missed_deadlines:>7} {util:>6.1f}")
print("-" * 55)
print(f"{'Total Utilization:':<45} {total_util:>6.1f}%")
# Example usage
def read_sensors():
time.sleep_us(500) # Simulated work
def update_leds():
time.sleep_us(2000)
def read_ultrasonic():
time.sleep_ms(25) # Simulated blocking read
def heartbeat():
time.sleep_us(100)
# Create and run scheduler
sched = CooperativeScheduler()
sched.add_task("Sensors", 10, read_sensors)
sched.add_task("LEDs", 10, update_leds)
sched.add_task("Ultrasonic", 100, read_ultrasonic)
sched.add_task("Heartbeat", 500, heartbeat)
print("Running scheduler for 10 seconds...")
sched.run(10000)
sched.print_stats()
Expected Output
=== Scheduler Statistics ===
Task Runs Avg(ms) Max(ms) Missed Util%
-------------------------------------------------------
Sensors 952 0.52 0.58 12 5.8
LEDs 951 2.01 2.15 13 21.5
Ultrasonic 98 25.10 26.20 2 26.2
Heartbeat 20 0.11 0.12 0 0.0
-------------------------------------------------------
Total Utilization: 53.5%
Analysis Questions
- Why did Sensors miss 12 deadlines?
- What's blocking other tasks during Ultrasonic?
- How would you reduce missed deadlines?
Part 8: From Cooperative to RTOS
When Cooperative Isn't Enough
Cooperative scheduling fails when: - Tasks can't be made short enough - Strict timing guarantees needed - Tasks have very different priorities - Dynamic task creation/deletion needed
What an RTOS Provides
Real-Time Operating System features: - Preemptive priority-based scheduling - Task creation and management - Synchronization primitives (mutex, semaphore) - Inter-task communication (queues, events) - Timer services - Memory management
Popular Embedded RTOS
| RTOS | Typical Use |
|---|---|
| FreeRTOS | Widely used, runs on RP2040 |
| Zephyr | Linux Foundation, modern |
| ThreadX | Azure RTOS, commercial |
| VxWorks | Aerospace, military |
| QNX | Automotive, medical |
FreeRTOS on RP2040 (Preview)
// This is C code - FreeRTOS isn't available in MicroPython
void sensor_task(void *params) {
while(1) {
read_sensors();
vTaskDelay(pdMS_TO_TICKS(10)); // Sleep 10ms, yield to other tasks
}
}
void ultrasonic_task(void *params) {
while(1) {
read_ultrasonic(); // Can block - other tasks still run!
vTaskDelay(pdMS_TO_TICKS(100));
}
}
int main() {
xTaskCreate(sensor_task, "Sensors", 256, NULL, 2, NULL);
xTaskCreate(ultrasonic_task, "Ultra", 256, NULL, 1, NULL);
vTaskStartScheduler(); // Never returns
}
Key difference: Tasks can block without stopping everything!
Mini-Project: Scheduler Analysis
Part A: Measure Your Robot's Tasks
- Instrument each task in your Lab 2 code
- Measure WCET for: sensor read, LED update, ultrasonic, logging
- Calculate total utilization
Part B: Jitter Analysis
- Add timestamps to your main loop
- Collect 1000 samples of actual period vs target
- Plot histogram of jitter
- Identify sources of worst-case jitter
Part C: Build Improved Scheduler
- Implement the
CooperativeSchedulerfrom Part 7 - Run your robot tasks through it
- Compare missed deadlines with/without scheduler
- Document improvements
Part D: Deadline Analysis
- Calculate utilization bound for your task set
- Verify your utilization is under the bound
- What's the maximum ultrasonic WCET before deadlines fail?
Deliverable
Report including: - Task table (period, WCET, utilization) - Jitter measurements and analysis - Scheduler comparison results - Deadline analysis calculations
Key Takeaways
- Real-time = predictable, not fast
- Know your WCET - measure, don't guess
- Utilization must be < 100% (and typically < 70% for safety)
- Cooperative scheduling is simple but requires short tasks
- Preemption solves blocking but adds complexity
- RTOS exists for a reason - complex systems need it
Further Reading
- Rate Monotonic Analysis - Original Liu & Layland paper
- FreeRTOS Documentation - Practical RTOS guide
- Making Embedded Systems - Elecia White's excellent book
← Back: Sensor Theory | Advanced Track Overview | Next: Control Theory →