Skip to content

Advanced 2: Real-Time Concepts

Prerequisites: Lab 3 (Ultrasonic & Timing)


Overview

In Lab 2, you learned non-blocking patterns and ran multiple tasks at different frequencies. This module explains the theory behind task scheduling and real-time systems.

You'll learn: - How to model tasks mathematically - Cooperative vs preemptive scheduling - How to analyze if your system can meet all deadlines - What happens when things go wrong (priority inversion, jitter)


Part 1: What is "Real-Time"?

Real-Time ≠ Fast

A common misconception: "real-time" means "really fast."

Actually: Real-time means predictable timing - meeting deadlines reliably.

System Speed Real-Time?
Video game (60 FPS) Fast Soft real-time (frame drops OK)
Web server Fast Not real-time (latency varies)
Pacemaker Slow (1 Hz) Hard real-time (must not miss!)
Car ABS Medium (100 Hz) Hard real-time (safety critical)

Hard vs Soft Real-Time

Hard real-time: - Missing a deadline = system failure - Examples: airbag deployment, flight control - Must be mathematically proven to meet deadlines

Soft real-time: - Missing a deadline = degraded performance - Examples: video streaming, audio playback - Statistical guarantees acceptable

Your robot: Soft real-time. Missing a sensor read is bad but not catastrophic.

The Enemy: Unpredictability

Sources of timing unpredictability: - Variable execution time (branches, loops) - Interrupts from other sources - Memory access patterns (cache misses) - Garbage collection (in Python!) - Blocking operations


Part 2: Task Models

Periodic Tasks

Most embedded tasks are periodic - they repeat at fixed intervals.

A periodic task \(\tau_i\) has: - Period \(T_i\): Time between activations - Execution time \(C_i\): Time to complete one run - Deadline \(D_i\): When it must finish (usually \(D_i = T_i\))

Task: Read sensors every 10ms

    ┌─┐       ┌─┐       ┌─┐       ┌─┐
    │C│       │C│       │C│       │C│
────┴─┴───────┴─┴───────┴─┴───────┴─┴────►
    0   10ms  10  20ms  20  30ms  30  40ms
    └─T─┘     └─T─┘     └─T─┘

C = execution time (~1ms)
T = period (10ms)
D = deadline (end of period)

Aperiodic Tasks

Some tasks run in response to events, not time: - Button press handler - Incoming message processor - Error handler

These are harder to analyze because timing is unpredictable.

Sporadic Tasks

Like aperiodic, but with a minimum inter-arrival time: - "Button can be pressed, but not faster than every 100ms" - Allows analysis by treating as worst-case periodic


Part 3: Execution Time Analysis

Worst-Case Execution Time (WCET)

For deadline analysis, we need the longest possible execution time.

def read_sensors():
    values = []
    for s in sensors:        # Fixed loop: 4 iterations
        values.append(s.value())  # Fixed time per read
    return values

# WCET is predictable (same every time)

vs:

def process_data(data):
    result = []
    for item in data:        # Variable loop length!
        if complex_condition(item):
            result.append(slow_operation(item))
    return result

# WCET depends on data - much harder to analyze

Measuring Execution Time

import time

def measure_wcet(func, iterations=1000):
    """Measure worst-case execution time."""
    times = []
    for _ in range(iterations):
        start = time.ticks_us()
        func()
        elapsed = time.ticks_diff(time.ticks_us(), start)
        times.append(elapsed)

    return {
        'min': min(times),
        'max': max(times),  # This is WCET estimate
        'avg': sum(times) / len(times),
        'jitter': max(times) - min(times)
    }

# Example usage
def my_task():
    # Your task code
    pass

stats = measure_wcet(my_task)
print(f"WCET: {stats['max']} µs")
print(f"Jitter: {stats['jitter']} µs")

Why Python Makes WCET Hard

Python adds unpredictability: - Garbage collection can pause execution - Dynamic typing adds runtime checks - Bytecode interpretation varies with code path - Memory allocation for lists, strings, etc.

This is why safety-critical systems use C with strict coding rules (MISRA).


Part 4: Utilization Analysis

CPU Utilization

Utilization is the fraction of CPU time used by tasks:

\[U_i = \frac{C_i}{T_i}\]

Total utilization:

\[U_{total} = \sum_{i=1}^{n} \frac{C_i}{T_i}\]

Example: Your Robot

Task Period (\(T\)) WCET (\(C\)) Utilization
Line sensors 10 ms 0.5 ms 5.0%
LED update 10 ms 2.0 ms 20.0%
Ultrasonic 100 ms 30 ms 30.0%
Logging 1000 ms 5 ms 0.5%
Heartbeat 500 ms 0.1 ms 0.02%
Total 55.5%

Schedulability

Simple rule: If \(U_{total} < 1\) (100%), the system might be schedulable.

But wait! This assumes perfect scheduling. Reality is messier.

Utilization Bound

For n periodic tasks with rate-monotonic scheduling (shortest period = highest priority):

\[U_{bound} = n \times (2^{1/n} - 1)\]
Tasks Utilization Bound
1 100%
2 82.8%
3 78.0%
4 75.7%
69.3%

If \(U_{total} \leq U_{bound}\), system is guaranteed schedulable.

Your robot at 55.5% is well under the bound - good!


Part 5: Scheduling Approaches

Run-to-Completion (Your Current Approach)

Each task runs completely before the next:

while True:
    now = time.ticks_ms()

    if time.ticks_diff(now, last_sensor) >= 10:
        read_sensors()  # Runs to completion
        last_sensor = now

    if time.ticks_diff(now, last_led) >= 10:
        update_leds()   # Runs to completion
        last_led = now

Pros: Simple, no race conditions Cons: Long task blocks everything

Cooperative Scheduling

Tasks voluntarily yield control:

class Task:
    def __init__(self, period_ms, func):
        self.period = period_ms
        self.func = func
        self.last_run = 0

class Scheduler:
    def __init__(self):
        self.tasks = []

    def add(self, period_ms, func):
        self.tasks.append(Task(period_ms, func))

    def run(self):
        while True:
            now = time.ticks_ms()
            for task in self.tasks:
                if time.ticks_diff(now, task.last_run) >= task.period:
                    task.func()  # Task runs
                    task.last_run = now
                    # No preemption - task must return!

# Usage
sched = Scheduler()
sched.add(10, read_sensors)
sched.add(10, update_leds)
sched.add(100, read_ultrasonic)
sched.run()

Key insight: Tasks must be short and return quickly. A task that takes 30ms blocks everything for 30ms!

Preemptive Scheduling

Higher priority tasks interrupt lower priority:

Without preemption:
    [  Low Priority Task (30ms)  ][High][Low cont...]
                            High priority waits!

With preemption:
    [  Low  ][High][  Low continued...  ]
        Interrupts immediately!

Pros: High priority tasks get immediate attention Cons: Complex, race conditions, need RTOS

Interrupts as Preemption

You used limited preemption with interrupts in State Machines — Part 7, where the echo pin ISR captured timing without blocking the main loop:

def sensor_isr(pin):
    global edge_count
    edge_count += 1  # Runs immediately, preempts main code

sensor.irq(trigger=Pin.IRQ_FALLING, handler=sensor_isr)

The interrupt preempts your main loop instantly!


Part 6: Timing Problems

Jitter

Jitter = variation in timing between task executions.

Ideal (no jitter):
    │    │    │    │    │    │    │
    0   10   20   30   40   50   60 ms

With jitter:
    │   │     │   │     │    │   │
    0   9    21  29    41   50  62 ms
         ↑        ↑
     1ms early  1ms late

Measuring Jitter

import time

PERIOD = 10  # ms
last_time = time.ticks_ms()
intervals = []

for _ in range(100):
    # Wait for next period
    while time.ticks_diff(time.ticks_ms(), last_time) < PERIOD:
        pass

    now = time.ticks_ms()
    actual_interval = time.ticks_diff(now, last_time)
    intervals.append(actual_interval)
    last_time = now

    # Do task work here
    do_task()

# Analyze jitter
target = PERIOD
jitter = [abs(i - target) for i in intervals]
print(f"Max jitter: {max(jitter)} ms")
print(f"Avg jitter: {sum(jitter)/len(jitter):.2f} ms")

Causes of Jitter

  1. Other tasks taking time
  2. Interrupts (including system interrupts)
  3. Variable execution time in tasks
  4. Garbage collection (Python)
  5. Busy-wait granularity (checking in a loop)

Priority Inversion

A nasty problem in preemptive systems:

High priority task: H (needs resource R)
Medium priority task: M
Low priority task: L (holds resource R)

Timeline:
1. L starts, acquires R
2. H arrives, needs R, blocks waiting for L
3. M arrives, preempts L (M > L priority)
4. H is stuck waiting for L, but M keeps running!

H is effectively lower priority than M - INVERTED!

Solution: Priority inheritance (L temporarily gets H's priority while holding R)

This is why RTOS exists - to handle these complexities.


Part 7: Building a Simple Scheduler

Let's build a cooperative scheduler with timing analysis:

import time

class ScheduledTask:
    def __init__(self, name, period_ms, func):
        self.name = name
        self.period = period_ms
        self.func = func
        self.last_run = 0
        self.run_count = 0
        self.total_time = 0
        self.max_time = 0
        self.missed_deadlines = 0

class CooperativeScheduler:
    def __init__(self):
        self.tasks = []
        self.start_time = 0

    def add_task(self, name, period_ms, func):
        self.tasks.append(ScheduledTask(name, period_ms, func))

    def run(self, duration_ms=10000):
        """Run scheduler for specified duration."""
        self.start_time = time.ticks_ms()

        # Initialize all task timers
        now = time.ticks_ms()
        for task in self.tasks:
            task.last_run = now

        while time.ticks_diff(time.ticks_ms(), self.start_time) < duration_ms:
            now = time.ticks_ms()

            for task in self.tasks:
                elapsed = time.ticks_diff(now, task.last_run)

                if elapsed >= task.period:
                    # Check if we missed the deadline
                    if elapsed > task.period * 1.5:
                        task.missed_deadlines += 1

                    # Run the task and measure time
                    task_start = time.ticks_us()
                    task.func()
                    task_time = time.ticks_diff(time.ticks_us(), task_start)

                    # Update statistics
                    task.run_count += 1
                    task.total_time += task_time
                    if task_time > task.max_time:
                        task.max_time = task_time

                    task.last_run = now

    def print_stats(self):
        print("\n=== Scheduler Statistics ===\n")
        print(f"{'Task':<15} {'Runs':>6} {'Avg(ms)':>8} {'Max(ms)':>8} {'Missed':>7} {'Util%':>6}")
        print("-" * 55)

        total_util = 0
        for task in self.tasks:
            if task.run_count > 0:
                avg_ms = task.total_time / task.run_count / 1000
                max_ms = task.max_time / 1000
                util = (task.max_time / 1000) / task.period * 100
                total_util += util
                print(f"{task.name:<15} {task.run_count:>6} {avg_ms:>8.2f} {max_ms:>8.2f} {task.missed_deadlines:>7} {util:>6.1f}")

        print("-" * 55)
        print(f"{'Total Utilization:':<45} {total_util:>6.1f}%")

# Example usage
def read_sensors():
    time.sleep_us(500)  # Simulated work

def update_leds():
    time.sleep_us(2000)

def read_ultrasonic():
    time.sleep_ms(25)  # Simulated blocking read

def heartbeat():
    time.sleep_us(100)

# Create and run scheduler
sched = CooperativeScheduler()
sched.add_task("Sensors", 10, read_sensors)
sched.add_task("LEDs", 10, update_leds)
sched.add_task("Ultrasonic", 100, read_ultrasonic)
sched.add_task("Heartbeat", 500, heartbeat)

print("Running scheduler for 10 seconds...")
sched.run(10000)
sched.print_stats()

Expected Output

=== Scheduler Statistics ===

Task            Runs   Avg(ms)  Max(ms)  Missed  Util%
-------------------------------------------------------
Sensors          952     0.52     0.58       12    5.8
LEDs             951     2.01     2.15       13   21.5
Ultrasonic        98    25.10    26.20        2   26.2
Heartbeat         20     0.11     0.12        0    0.0
-------------------------------------------------------
Total Utilization:                           53.5%

Analysis Questions

  1. Why did Sensors miss 12 deadlines?
  2. What's blocking other tasks during Ultrasonic?
  3. How would you reduce missed deadlines?

Part 8: From Cooperative to RTOS

When Cooperative Isn't Enough

Cooperative scheduling fails when: - Tasks can't be made short enough - Strict timing guarantees needed - Tasks have very different priorities - Dynamic task creation/deletion needed

What an RTOS Provides

Real-Time Operating System features: - Preemptive priority-based scheduling - Task creation and management - Synchronization primitives (mutex, semaphore) - Inter-task communication (queues, events) - Timer services - Memory management

RTOS Typical Use
FreeRTOS Widely used, runs on RP2040
Zephyr Linux Foundation, modern
ThreadX Azure RTOS, commercial
VxWorks Aerospace, military
QNX Automotive, medical

FreeRTOS on RP2040 (Preview)

// This is C code - FreeRTOS isn't available in MicroPython

void sensor_task(void *params) {
    while(1) {
        read_sensors();
        vTaskDelay(pdMS_TO_TICKS(10));  // Sleep 10ms, yield to other tasks
    }
}

void ultrasonic_task(void *params) {
    while(1) {
        read_ultrasonic();  // Can block - other tasks still run!
        vTaskDelay(pdMS_TO_TICKS(100));
    }
}

int main() {
    xTaskCreate(sensor_task, "Sensors", 256, NULL, 2, NULL);
    xTaskCreate(ultrasonic_task, "Ultra", 256, NULL, 1, NULL);
    vTaskStartScheduler();  // Never returns
}

Key difference: Tasks can block without stopping everything!


Mini-Project: Scheduler Analysis

Part A: Measure Your Robot's Tasks

  1. Instrument each task in your Lab 2 code
  2. Measure WCET for: sensor read, LED update, ultrasonic, logging
  3. Calculate total utilization

Part B: Jitter Analysis

  1. Add timestamps to your main loop
  2. Collect 1000 samples of actual period vs target
  3. Plot histogram of jitter
  4. Identify sources of worst-case jitter

Part C: Build Improved Scheduler

  1. Implement the CooperativeScheduler from Part 7
  2. Run your robot tasks through it
  3. Compare missed deadlines with/without scheduler
  4. Document improvements

Part D: Deadline Analysis

  1. Calculate utilization bound for your task set
  2. Verify your utilization is under the bound
  3. What's the maximum ultrasonic WCET before deadlines fail?

Deliverable

Report including: - Task table (period, WCET, utilization) - Jitter measurements and analysis - Scheduler comparison results - Deadline analysis calculations


Key Takeaways

  1. Real-time = predictable, not fast
  2. Know your WCET - measure, don't guess
  3. Utilization must be < 100% (and typically < 70% for safety)
  4. Cooperative scheduling is simple but requires short tasks
  5. Preemption solves blocking but adds complexity
  6. RTOS exists for a reason - complex systems need it

Further Reading


← Back: Sensor Theory | Advanced Track Overview | Next: Control Theory →