Advanced 6: Communication & Data
Prerequisites: Lab 5 (Integration)
Overview
In Lab 5, you integrated multiple sensors and displayed data on the OLED. This module explores how to get data off the robot for analysis - essential for debugging, tuning, and validation.
You'll learn: - Serial communication protocols - Efficient data logging strategies - Protocol design for reliability - PC-side visualization tools
Part 1: Serial Communication Basics
Why Serial?
Parallel communication uses many wires (8-bit = 8 wires). Serial uses fewer:
| Protocol | Wires | Speed | Use Case |
|---|---|---|---|
| UART | 2 | Up to 1 Mbps | Debug, PC communication |
| I2C | 2 | 100-400 kHz | Sensors, displays |
| SPI | 4 | Up to 10 MHz | Fast sensors, SD cards |
| USB | 2 | 12-480 Mbps | PC connection |
UART (Universal Asynchronous Receiver-Transmitter)
The simplest serial protocol:
Idle ────┐ ┌──┐ ┌──┐ ┌──┐ ┌──┐ ┌──┐ ┌──┐ ┌──┐ ┌──┐ ┌────
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │
└───┘ └──┘ └──┘ └──┘ └──┘ └──┘ └──┘ └──┘ └──┘
Start D0 D1 D2 D3 D4 D5 D6 D7 Stop
bit (8 data bits) bit
Key parameters: - Baud rate: Bits per second (9600, 115200, etc.) - Data bits: Usually 8 - Stop bits: Usually 1 - Parity: Usually none
MicroPython UART
from machine import UART, Pin
import time
# USB serial is automatic through REPL
# For additional UART (e.g., Bluetooth module):
uart = UART(0, baudrate=115200, tx=Pin(0), rx=Pin(1))
# Send data
uart.write("Hello from Pico!\n")
# Receive data
if uart.any():
data = uart.read()
print(f"Received: {data}")
USB Serial (Your Main Connection)
The Pico appears as USB serial device. print() sends to this:
# This goes to USB serial
print("Debug message")
# Or use sys.stdout directly
import sys
sys.stdout.write("Raw data\n")
Capture on PC:
Part 2: I2C Deep Dive
How I2C Works
Two-wire bus with addressing:
┌─────┐ ┌─────┐ ┌─────┐
│Master│ │Slave│ │Slave│
│(Pico)│ │ 0x68│ │ 0x3C│
└──┬───┘ └──┬───┘ └──┬───┘
│ │ │
SDA ───┴───────────┴───────────┴─── (data)
SCL ───┴───────────┴───────────┴─── (clock)
│ │ │
GND GND GND
Protocol: 1. Master sends START condition 2. Master sends slave address + R/W bit 3. Slave acknowledges (ACK) 4. Data transfer (master or slave depending on R/W) 5. Master sends STOP condition
Scanning for Devices
from machine import I2C, Pin
i2c = I2C(0, sda=Pin(14), scl=Pin(15), freq=400000)
print("Scanning I2C bus...")
devices = i2c.scan()
if devices:
print(f"Found {len(devices)} device(s):")
for addr in devices:
print(f" 0x{addr:02X} ({addr})")
else:
print("No devices found!")
Common I2C Addresses
| Device | Address |
|---|---|
| BMI160 IMU | 0x68 or 0x69 |
| SSD1306 OLED | 0x3C or 0x3D |
| BMP280 Pressure | 0x76 or 0x77 |
| ADS1115 ADC | 0x48-0x4B |
Reading Registers
Most I2C devices use register-based access:
def read_register(i2c, addr, reg, nbytes=1):
"""Read register(s) from I2C device."""
i2c.writeto(addr, bytes([reg]))
return i2c.readfrom(addr, nbytes)
def write_register(i2c, addr, reg, data):
"""Write to register on I2C device."""
i2c.writeto(addr, bytes([reg]) + bytes(data))
# Example: Read CHIP_ID register from BMI160
CHIP_ID_REG = 0x00
device_id = read_register(i2c, 0x68, CHIP_ID_REG)[0]
print(f"Device ID: 0x{device_id:02X}") # Should be 0xD1
Part 3: Data Logging Strategies
Why Log Data?
- Debugging: What was happening when it failed?
- Tuning: How do parameters affect behavior?
- Validation: Prove the system works correctly
- Analysis: Post-process to extract insights
Strategy 1: Print to Serial
Simple but limited:
while True:
pos = get_line_position()
dist = ultrasonic.distance_cm()
print(f"{time.ticks_ms()},{pos},{dist}")
time.sleep_ms(10)
Capture:
Pros: Simple, immediate Cons: Blocking, limited speed, timing affected
Strategy 2: Buffer Then Dump
Collect in memory, dump later:
class DataLogger:
def __init__(self, max_samples=1000):
self.data = []
self.max_samples = max_samples
self.start_time = 0
def start(self):
self.data = []
self.start_time = time.ticks_ms()
def log(self, *values):
if len(self.data) < self.max_samples:
timestamp = time.ticks_diff(time.ticks_ms(), self.start_time)
self.data.append((timestamp,) + values)
def dump_csv(self, header):
print(header)
for row in self.data:
print(",".join(str(v) for v in row))
# Usage
logger = DataLogger(500)
logger.start()
for _ in range(500):
logger.log(get_position(), get_distance(), motor_speed)
time.sleep_ms(10)
logger.dump_csv("time_ms,position,distance,speed")
Pros: Doesn't affect timing during collection Cons: Limited by RAM, delayed output
Strategy 3: Circular Buffer
For continuous logging with limited memory:
class CircularBuffer:
def __init__(self, size):
self.buffer = [None] * size
self.size = size
self.head = 0
self.count = 0
def append(self, item):
self.buffer[self.head] = item
self.head = (self.head + 1) % self.size
if self.count < self.size:
self.count += 1
def get_all(self):
"""Get all items in order (oldest first)."""
if self.count < self.size:
return self.buffer[:self.count]
else:
# Wrap around
start = self.head
return self.buffer[start:] + self.buffer[:start]
# Keep last 100 samples
buffer = CircularBuffer(100)
while True:
buffer.append((time.ticks_ms(), get_sensor_data()))
# When error occurs, dump buffer to see what happened
time.sleep_ms(10)
Strategy 4: SD Card Logging
For large amounts of data:
import os
# Note: Requires SD card module and appropriate wiring
class SDLogger:
def __init__(self, filename):
self.filename = filename
self.file = None
def open(self, header):
self.file = open(self.filename, 'w')
self.file.write(header + '\n')
def log(self, *values):
if self.file:
line = ','.join(str(v) for v in values)
self.file.write(line + '\n')
def close(self):
if self.file:
self.file.close()
self.file = None
# Usage
logger = SDLogger('/sd/run_001.csv')
logger.open('time,position,speed')
try:
while running:
logger.log(time.ticks_ms(), pos, speed)
finally:
logger.close()
Part 4: Protocol Design
The Problem with Raw Data
Raw serial data can be corrupted:
How do you know if data is valid?
Framing
Mark the start and end of messages:
# Simple framing with start/end markers
START = b'\x02' # STX
END = b'\x03' # ETX
def send_frame(data):
sys.stdout.buffer.write(START + data + END)
def receive_frame(uart):
# Wait for start
while uart.read(1) != START:
pass
# Read until end
data = b''
while True:
byte = uart.read(1)
if byte == END:
return data
data += byte
Checksums
Detect corrupted data:
def checksum(data):
"""Simple XOR checksum."""
result = 0
for byte in data:
result ^= byte
return result
def send_with_checksum(data):
cs = checksum(data)
sys.stdout.buffer.write(data + bytes([cs]) + b'\n')
def verify_checksum(data):
received_cs = data[-1]
calculated_cs = checksum(data[:-1])
return received_cs == calculated_cs
Better: CRC
CRC (Cyclic Redundancy Check) catches more errors:
# CRC-8 implementation
def crc8(data):
crc = 0
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0x07
else:
crc <<= 1
crc &= 0xFF
return crc
Complete Protocol Example
import struct
class DataProtocol:
"""Simple binary protocol with framing and CRC."""
HEADER = 0xAA
FOOTER = 0x55
def __init__(self):
self.buffer = bytearray()
def encode_packet(self, timestamp, position, distance, speed):
"""Create a data packet."""
# Pack data as binary (more compact than text)
payload = struct.pack('<Ihhhh', # Little-endian: uint32 + 4 int16
timestamp,
int(position * 100), # Fixed point: 2 decimal places
int(distance * 10),
int(speed)
)
crc = crc8(payload)
return bytes([self.HEADER]) + payload + bytes([crc, self.FOOTER])
def decode_packet(self, data):
"""Decode a data packet. Returns None if invalid."""
if len(data) != 14:
return None
if data[0] != self.HEADER or data[-1] != self.FOOTER:
return None
payload = data[1:-2]
received_crc = data[-2]
if crc8(payload) != received_crc:
return None
# Unpack
timestamp, pos_fixed, dist_fixed, speed = struct.unpack('<Ihhhh', payload)
return {
'timestamp': timestamp,
'position': pos_fixed / 100,
'distance': dist_fixed / 10,
'speed': speed
}
# Usage on Pico
protocol = DataProtocol()
while True:
packet = protocol.encode_packet(
time.ticks_ms(),
get_position(),
get_distance(),
get_speed()
)
sys.stdout.buffer.write(packet)
time.sleep_ms(10)
Part 5: PC-Side Tools
Python Receiver
# pc_receiver.py - Run on PC, not Pico!
import serial
import struct
import csv
import time
class DataReceiver:
HEADER = 0xAA
FOOTER = 0x55
def __init__(self, port, baudrate=115200):
self.serial = serial.Serial(port, baudrate, timeout=0.1)
self.buffer = bytearray()
def receive_packet(self):
"""Try to receive one packet."""
# Read available data
data = self.serial.read(100)
self.buffer.extend(data)
# Look for complete packet
while len(self.buffer) >= 14:
# Find header
try:
start = self.buffer.index(self.HEADER)
self.buffer = self.buffer[start:]
except ValueError:
self.buffer.clear()
return None
if len(self.buffer) < 14:
return None
# Check footer
if self.buffer[13] != self.FOOTER:
self.buffer = self.buffer[1:]
continue
# Extract and validate packet
packet = bytes(self.buffer[:14])
self.buffer = self.buffer[14:]
decoded = self.decode(packet)
if decoded:
return decoded
return None
def decode(self, data):
"""Decode packet with CRC check."""
payload = data[1:11]
received_crc = data[11]
if self.crc8(payload) != received_crc:
return None
timestamp, pos, dist, speed = struct.unpack('<Ihhhh', payload)
return {
'timestamp': timestamp,
'position': pos / 100,
'distance': dist / 10,
'speed': speed
}
@staticmethod
def crc8(data):
crc = 0
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0x07
else:
crc <<= 1
crc &= 0xFF
return crc
# Collect data
receiver = DataReceiver('/dev/ttyACM0')
data = []
print("Collecting data... Press Ctrl+C to stop")
try:
while True:
packet = receiver.receive_packet()
if packet:
data.append(packet)
print(f"t={packet['timestamp']:6d} pos={packet['position']:+5.2f} "
f"dist={packet['distance']:5.1f}")
except KeyboardInterrupt:
pass
# Save to CSV
with open('robot_data.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=['timestamp', 'position', 'distance', 'speed'])
writer.writeheader()
writer.writerows(data)
print(f"Saved {len(data)} samples to robot_data.csv")
Live Plotting
# live_plot.py - Real-time visualization
import serial
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque
# Setup
receiver = DataReceiver('/dev/ttyACM0')
max_points = 200
times = deque(maxlen=max_points)
positions = deque(maxlen=max_points)
distances = deque(maxlen=max_points)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 6))
line1, = ax1.plot([], [])
ax1.set_xlim(0, max_points)
ax1.set_ylim(-2, 2)
ax1.set_ylabel('Position')
ax1.axhline(y=0, color='r', linestyle='--')
line2, = ax2.plot([], [])
ax2.set_xlim(0, max_points)
ax2.set_ylim(0, 100)
ax2.set_ylabel('Distance (cm)')
ax2.set_xlabel('Sample')
def update(frame):
# Receive new data
for _ in range(10): # Process multiple packets per frame
packet = receiver.receive_packet()
if packet:
times.append(len(times))
positions.append(packet['position'])
distances.append(packet['distance'])
# Update plots
line1.set_data(list(times), list(positions))
line2.set_data(list(times), list(distances))
if times:
ax1.set_xlim(max(0, times[-1] - max_points), times[-1])
ax2.set_xlim(max(0, times[-1] - max_points), times[-1])
return line1, line2
ani = animation.FuncAnimation(fig, update, interval=50, blit=True)
plt.tight_layout()
plt.show()
Part 6: Debugging Techniques
Logic Analyzer
For protocol debugging, a logic analyzer is invaluable:
Logic Analyzer View:
Time: 0µs 100µs 200µs 300µs
SDA: ────┐ ┌──┐ ┌────┐ ┌──┐ ┌────
└──┘ └──┘ └──┘ └──┘
SCL: ────┐ ┌┐ ┌┐ ┌┐ ┌┐ ┌┐ ┌┐ ┌┐ ┌┐ ┌┐
└─┘└─┘└─┘└─┘└─┘└─┘└─┘└─┘└─┘└─
Decoded: [S] 0x68 [W] [A] 0x75 [A] [S] 0x68 [R] [A] 0x68 [N] [P]
Start Addr Ack Reg Ack Restart Addr Ack Data NAck Stop
Cheap USB logic analyzers (~$10) with sigrok/PulseView work great.
Timestamp Everything
When debugging timing issues:
def debug_log(msg):
print(f"[{time.ticks_ms():8d}] {msg}")
debug_log("Starting sensor read")
data = read_sensors()
debug_log("Sensor read complete")
debug_log("Starting motor update")
update_motors(data)
debug_log("Motor update complete")
State Dumps
Periodically dump system state:
def dump_state():
print("=== STATE DUMP ===")
print(f"Time: {time.ticks_ms()}")
print(f"State: {mission.state}")
print(f"Position: {line_sensors.get_position()}")
print(f"Distance: {ultrasonic.distance_cm()}")
print(f"Motor L/R: {motors.left_speed}/{motors.right_speed}")
print(f"Loop time: {loop_time_us} µs")
print("==================")
# Dump every 5 seconds
if time.ticks_diff(now, last_dump) > 5000:
dump_state()
last_dump = now
Mini-Project: Data Logger with Visualization
Part A: Design Protocol
- Define packet format for your robot data
- Include timestamp, sensors, motor commands, state
- Add framing and CRC
Part B: Implement Pico Logger
- Create efficient data encoder
- Buffer and send at appropriate rate
- Don't impact control loop timing
Part C: Create PC Receiver
- Implement packet decoder
- Validate with CRC
- Save to CSV file
Part D: Build Visualizer
- Create live plot of key variables
- Show position, distance, motor speeds
- Mark state changes
Part E: Analyze a Run
- Record a line-following run
- Plot position error over time
- Correlate motor commands with position
- Identify tuning improvements
Deliverable
- Protocol specification document
- Pico logger code
- PC receiver and visualizer
- Analysis of recorded run with plots
- Tuning recommendations based on data
Key Takeaways
- Serial is simple - UART for debug, I2C/SPI for sensors
- Log strategically - buffer in RAM, dump later
- Protect your data - framing and CRC catch errors
- Visualize - plots reveal patterns numbers hide
- Timestamp everything - essential for debugging
- Tools matter - logic analyzer saves hours of guessing
Further Reading
- Serial Communication Basics - Sparkfun tutorial
- I2C Specification - Official NXP document
- Python Serial Library - pySerial documentation
- Protocol Buffer - For complex protocols