Skip to content

Advanced 6: Communication & Data

Prerequisites: Lab 5 (Integration)


Overview

In Lab 5, you integrated multiple sensors and displayed data on the OLED. This module explores how to get data off the robot for analysis - essential for debugging, tuning, and validation.

You'll learn: - Serial communication protocols - Efficient data logging strategies - Protocol design for reliability - PC-side visualization tools


Part 1: Serial Communication Basics

Why Serial?

Parallel communication uses many wires (8-bit = 8 wires). Serial uses fewer:

Protocol Wires Speed Use Case
UART 2 Up to 1 Mbps Debug, PC communication
I2C 2 100-400 kHz Sensors, displays
SPI 4 Up to 10 MHz Fast sensors, SD cards
USB 2 12-480 Mbps PC connection

UART (Universal Asynchronous Receiver-Transmitter)

The simplest serial protocol:

Idle ────┐   ┌──┐  ┌──┐  ┌──┐  ┌──┐  ┌──┐  ┌──┐  ┌──┐  ┌──┐  ┌────
         │   │  │  │  │  │  │  │  │  │  │  │  │  │  │  │  │  │
         └───┘  └──┘  └──┘  └──┘  └──┘  └──┘  └──┘  └──┘  └──┘
        Start  D0   D1   D2   D3   D4   D5   D6   D7  Stop
         bit              (8 data bits)              bit

Key parameters: - Baud rate: Bits per second (9600, 115200, etc.) - Data bits: Usually 8 - Stop bits: Usually 1 - Parity: Usually none

MicroPython UART

from machine import UART, Pin
import time

# USB serial is automatic through REPL
# For additional UART (e.g., Bluetooth module):
uart = UART(0, baudrate=115200, tx=Pin(0), rx=Pin(1))

# Send data
uart.write("Hello from Pico!\n")

# Receive data
if uart.any():
    data = uart.read()
    print(f"Received: {data}")

USB Serial (Your Main Connection)

The Pico appears as USB serial device. print() sends to this:

# This goes to USB serial
print("Debug message")

# Or use sys.stdout directly
import sys
sys.stdout.write("Raw data\n")

Capture on PC:

# Linux/Mac
cat /dev/ttyACM0

# Or use mpremote
mpremote connect /dev/ttyACM0 repl


Part 2: I2C Deep Dive

How I2C Works

Two-wire bus with addressing:

        ┌─────┐     ┌─────┐     ┌─────┐
        │Master│    │Slave│     │Slave│
        │(Pico)│    │ 0x68│     │ 0x3C│
        └──┬───┘    └──┬───┘    └──┬───┘
           │           │           │
    SDA ───┴───────────┴───────────┴─── (data)
    SCL ───┴───────────┴───────────┴─── (clock)
           │           │           │
          GND         GND         GND

Protocol: 1. Master sends START condition 2. Master sends slave address + R/W bit 3. Slave acknowledges (ACK) 4. Data transfer (master or slave depending on R/W) 5. Master sends STOP condition

Scanning for Devices

from machine import I2C, Pin

i2c = I2C(0, sda=Pin(14), scl=Pin(15), freq=400000)

print("Scanning I2C bus...")
devices = i2c.scan()

if devices:
    print(f"Found {len(devices)} device(s):")
    for addr in devices:
        print(f"  0x{addr:02X} ({addr})")
else:
    print("No devices found!")

Common I2C Addresses

Device Address
BMI160 IMU 0x68 or 0x69
SSD1306 OLED 0x3C or 0x3D
BMP280 Pressure 0x76 or 0x77
ADS1115 ADC 0x48-0x4B

Reading Registers

Most I2C devices use register-based access:

def read_register(i2c, addr, reg, nbytes=1):
    """Read register(s) from I2C device."""
    i2c.writeto(addr, bytes([reg]))
    return i2c.readfrom(addr, nbytes)

def write_register(i2c, addr, reg, data):
    """Write to register on I2C device."""
    i2c.writeto(addr, bytes([reg]) + bytes(data))

# Example: Read CHIP_ID register from BMI160
CHIP_ID_REG = 0x00
device_id = read_register(i2c, 0x68, CHIP_ID_REG)[0]
print(f"Device ID: 0x{device_id:02X}")  # Should be 0xD1

Part 3: Data Logging Strategies

Why Log Data?

  • Debugging: What was happening when it failed?
  • Tuning: How do parameters affect behavior?
  • Validation: Prove the system works correctly
  • Analysis: Post-process to extract insights

Strategy 1: Print to Serial

Simple but limited:

while True:
    pos = get_line_position()
    dist = ultrasonic.distance_cm()
    print(f"{time.ticks_ms()},{pos},{dist}")
    time.sleep_ms(10)

Capture:

mpremote connect /dev/ttyACM0 run logger.py > data.csv

Pros: Simple, immediate Cons: Blocking, limited speed, timing affected

Strategy 2: Buffer Then Dump

Collect in memory, dump later:

class DataLogger:
    def __init__(self, max_samples=1000):
        self.data = []
        self.max_samples = max_samples
        self.start_time = 0

    def start(self):
        self.data = []
        self.start_time = time.ticks_ms()

    def log(self, *values):
        if len(self.data) < self.max_samples:
            timestamp = time.ticks_diff(time.ticks_ms(), self.start_time)
            self.data.append((timestamp,) + values)

    def dump_csv(self, header):
        print(header)
        for row in self.data:
            print(",".join(str(v) for v in row))

# Usage
logger = DataLogger(500)
logger.start()

for _ in range(500):
    logger.log(get_position(), get_distance(), motor_speed)
    time.sleep_ms(10)

logger.dump_csv("time_ms,position,distance,speed")

Pros: Doesn't affect timing during collection Cons: Limited by RAM, delayed output

Strategy 3: Circular Buffer

For continuous logging with limited memory:

class CircularBuffer:
    def __init__(self, size):
        self.buffer = [None] * size
        self.size = size
        self.head = 0
        self.count = 0

    def append(self, item):
        self.buffer[self.head] = item
        self.head = (self.head + 1) % self.size
        if self.count < self.size:
            self.count += 1

    def get_all(self):
        """Get all items in order (oldest first)."""
        if self.count < self.size:
            return self.buffer[:self.count]
        else:
            # Wrap around
            start = self.head
            return self.buffer[start:] + self.buffer[:start]

# Keep last 100 samples
buffer = CircularBuffer(100)

while True:
    buffer.append((time.ticks_ms(), get_sensor_data()))
    # When error occurs, dump buffer to see what happened
    time.sleep_ms(10)

Strategy 4: SD Card Logging

For large amounts of data:

import os

# Note: Requires SD card module and appropriate wiring

class SDLogger:
    def __init__(self, filename):
        self.filename = filename
        self.file = None

    def open(self, header):
        self.file = open(self.filename, 'w')
        self.file.write(header + '\n')

    def log(self, *values):
        if self.file:
            line = ','.join(str(v) for v in values)
            self.file.write(line + '\n')

    def close(self):
        if self.file:
            self.file.close()
            self.file = None

# Usage
logger = SDLogger('/sd/run_001.csv')
logger.open('time,position,speed')

try:
    while running:
        logger.log(time.ticks_ms(), pos, speed)
finally:
    logger.close()

Part 4: Protocol Design

The Problem with Raw Data

Raw serial data can be corrupted:

Sent:     "123,45.6,789\n"
Received: "123,45.6,7" + noise + "89\n"

How do you know if data is valid?

Framing

Mark the start and end of messages:

# Simple framing with start/end markers
START = b'\x02'  # STX
END = b'\x03'    # ETX

def send_frame(data):
    sys.stdout.buffer.write(START + data + END)

def receive_frame(uart):
    # Wait for start
    while uart.read(1) != START:
        pass

    # Read until end
    data = b''
    while True:
        byte = uart.read(1)
        if byte == END:
            return data
        data += byte

Checksums

Detect corrupted data:

def checksum(data):
    """Simple XOR checksum."""
    result = 0
    for byte in data:
        result ^= byte
    return result

def send_with_checksum(data):
    cs = checksum(data)
    sys.stdout.buffer.write(data + bytes([cs]) + b'\n')

def verify_checksum(data):
    received_cs = data[-1]
    calculated_cs = checksum(data[:-1])
    return received_cs == calculated_cs

Better: CRC

CRC (Cyclic Redundancy Check) catches more errors:

# CRC-8 implementation
def crc8(data):
    crc = 0
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 0x80:
                crc = (crc << 1) ^ 0x07
            else:
                crc <<= 1
            crc &= 0xFF
    return crc

Complete Protocol Example

import struct

class DataProtocol:
    """Simple binary protocol with framing and CRC."""

    HEADER = 0xAA
    FOOTER = 0x55

    def __init__(self):
        self.buffer = bytearray()

    def encode_packet(self, timestamp, position, distance, speed):
        """Create a data packet."""
        # Pack data as binary (more compact than text)
        payload = struct.pack('<Ihhhh',  # Little-endian: uint32 + 4 int16
            timestamp,
            int(position * 100),  # Fixed point: 2 decimal places
            int(distance * 10),
            int(speed)
        )

        crc = crc8(payload)

        return bytes([self.HEADER]) + payload + bytes([crc, self.FOOTER])

    def decode_packet(self, data):
        """Decode a data packet. Returns None if invalid."""
        if len(data) != 14:
            return None

        if data[0] != self.HEADER or data[-1] != self.FOOTER:
            return None

        payload = data[1:-2]
        received_crc = data[-2]

        if crc8(payload) != received_crc:
            return None

        # Unpack
        timestamp, pos_fixed, dist_fixed, speed = struct.unpack('<Ihhhh', payload)

        return {
            'timestamp': timestamp,
            'position': pos_fixed / 100,
            'distance': dist_fixed / 10,
            'speed': speed
        }

# Usage on Pico
protocol = DataProtocol()

while True:
    packet = protocol.encode_packet(
        time.ticks_ms(),
        get_position(),
        get_distance(),
        get_speed()
    )
    sys.stdout.buffer.write(packet)
    time.sleep_ms(10)

Part 5: PC-Side Tools

Python Receiver

# pc_receiver.py - Run on PC, not Pico!

import serial
import struct
import csv
import time

class DataReceiver:
    HEADER = 0xAA
    FOOTER = 0x55

    def __init__(self, port, baudrate=115200):
        self.serial = serial.Serial(port, baudrate, timeout=0.1)
        self.buffer = bytearray()

    def receive_packet(self):
        """Try to receive one packet."""
        # Read available data
        data = self.serial.read(100)
        self.buffer.extend(data)

        # Look for complete packet
        while len(self.buffer) >= 14:
            # Find header
            try:
                start = self.buffer.index(self.HEADER)
                self.buffer = self.buffer[start:]
            except ValueError:
                self.buffer.clear()
                return None

            if len(self.buffer) < 14:
                return None

            # Check footer
            if self.buffer[13] != self.FOOTER:
                self.buffer = self.buffer[1:]
                continue

            # Extract and validate packet
            packet = bytes(self.buffer[:14])
            self.buffer = self.buffer[14:]

            decoded = self.decode(packet)
            if decoded:
                return decoded

        return None

    def decode(self, data):
        """Decode packet with CRC check."""
        payload = data[1:11]
        received_crc = data[11]

        if self.crc8(payload) != received_crc:
            return None

        timestamp, pos, dist, speed = struct.unpack('<Ihhhh', payload)
        return {
            'timestamp': timestamp,
            'position': pos / 100,
            'distance': dist / 10,
            'speed': speed
        }

    @staticmethod
    def crc8(data):
        crc = 0
        for byte in data:
            crc ^= byte
            for _ in range(8):
                if crc & 0x80:
                    crc = (crc << 1) ^ 0x07
                else:
                    crc <<= 1
                crc &= 0xFF
        return crc


# Collect data
receiver = DataReceiver('/dev/ttyACM0')
data = []

print("Collecting data... Press Ctrl+C to stop")
try:
    while True:
        packet = receiver.receive_packet()
        if packet:
            data.append(packet)
            print(f"t={packet['timestamp']:6d} pos={packet['position']:+5.2f} "
                  f"dist={packet['distance']:5.1f}")
except KeyboardInterrupt:
    pass

# Save to CSV
with open('robot_data.csv', 'w', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=['timestamp', 'position', 'distance', 'speed'])
    writer.writeheader()
    writer.writerows(data)

print(f"Saved {len(data)} samples to robot_data.csv")

Live Plotting

# live_plot.py - Real-time visualization

import serial
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque

# Setup
receiver = DataReceiver('/dev/ttyACM0')
max_points = 200

times = deque(maxlen=max_points)
positions = deque(maxlen=max_points)
distances = deque(maxlen=max_points)

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 6))

line1, = ax1.plot([], [])
ax1.set_xlim(0, max_points)
ax1.set_ylim(-2, 2)
ax1.set_ylabel('Position')
ax1.axhline(y=0, color='r', linestyle='--')

line2, = ax2.plot([], [])
ax2.set_xlim(0, max_points)
ax2.set_ylim(0, 100)
ax2.set_ylabel('Distance (cm)')
ax2.set_xlabel('Sample')

def update(frame):
    # Receive new data
    for _ in range(10):  # Process multiple packets per frame
        packet = receiver.receive_packet()
        if packet:
            times.append(len(times))
            positions.append(packet['position'])
            distances.append(packet['distance'])

    # Update plots
    line1.set_data(list(times), list(positions))
    line2.set_data(list(times), list(distances))

    if times:
        ax1.set_xlim(max(0, times[-1] - max_points), times[-1])
        ax2.set_xlim(max(0, times[-1] - max_points), times[-1])

    return line1, line2

ani = animation.FuncAnimation(fig, update, interval=50, blit=True)
plt.tight_layout()
plt.show()

Part 6: Debugging Techniques

Logic Analyzer

For protocol debugging, a logic analyzer is invaluable:

Logic Analyzer View:

Time:     0µs    100µs    200µs    300µs
SDA:  ────┐  ┌──┐  ┌────┐  ┌──┐  ┌────
          └──┘  └──┘    └──┘  └──┘

SCL:  ────┐ ┌┐ ┌┐ ┌┐ ┌┐ ┌┐ ┌┐ ┌┐ ┌┐ ┌┐
          └─┘└─┘└─┘└─┘└─┘└─┘└─┘└─┘└─┘└─

Decoded: [S] 0x68 [W] [A] 0x75 [A] [S] 0x68 [R] [A] 0x68 [N] [P]
         Start  Addr   Ack  Reg   Ack Restart Addr  Ack Data NAck Stop

Cheap USB logic analyzers (~$10) with sigrok/PulseView work great.

Timestamp Everything

When debugging timing issues:

def debug_log(msg):
    print(f"[{time.ticks_ms():8d}] {msg}")

debug_log("Starting sensor read")
data = read_sensors()
debug_log("Sensor read complete")
debug_log("Starting motor update")
update_motors(data)
debug_log("Motor update complete")

State Dumps

Periodically dump system state:

def dump_state():
    print("=== STATE DUMP ===")
    print(f"Time: {time.ticks_ms()}")
    print(f"State: {mission.state}")
    print(f"Position: {line_sensors.get_position()}")
    print(f"Distance: {ultrasonic.distance_cm()}")
    print(f"Motor L/R: {motors.left_speed}/{motors.right_speed}")
    print(f"Loop time: {loop_time_us} µs")
    print("==================")

# Dump every 5 seconds
if time.ticks_diff(now, last_dump) > 5000:
    dump_state()
    last_dump = now

Mini-Project: Data Logger with Visualization

Part A: Design Protocol

  1. Define packet format for your robot data
  2. Include timestamp, sensors, motor commands, state
  3. Add framing and CRC

Part B: Implement Pico Logger

  1. Create efficient data encoder
  2. Buffer and send at appropriate rate
  3. Don't impact control loop timing

Part C: Create PC Receiver

  1. Implement packet decoder
  2. Validate with CRC
  3. Save to CSV file

Part D: Build Visualizer

  1. Create live plot of key variables
  2. Show position, distance, motor speeds
  3. Mark state changes

Part E: Analyze a Run

  1. Record a line-following run
  2. Plot position error over time
  3. Correlate motor commands with position
  4. Identify tuning improvements

Deliverable

  • Protocol specification document
  • Pico logger code
  • PC receiver and visualizer
  • Analysis of recorded run with plots
  • Tuning recommendations based on data

Key Takeaways

  1. Serial is simple - UART for debug, I2C/SPI for sensors
  2. Log strategically - buffer in RAM, dump later
  3. Protect your data - framing and CRC catch errors
  4. Visualize - plots reveal patterns numbers hide
  5. Timestamp everything - essential for debugging
  6. Tools matter - logic analyzer saves hours of guessing

Further Reading


← Back: Software Architecture | Advanced Track Overview