Robot Data Pipeline
How sensor data flows from hardware to analysis, with practical examples.
System Architecture
┌─────────────────────────────────────────────────────────────────┐
│ ROBOT (Pico) │
│ ┌─────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐ │
│ │ BMI160 │───→│ Raw Data │───→│ Streamer │───→│ UDP Packet │ │
│ │ Sensor │ │ ax,ay, │ │ Bundle │ │ to WiFi │ │
│ │ 800 Hz │ │ az,gx, │ │ samples │ │ │ │
│ │ │ │ gy,gz │ │ │ │ │ │
│ └─────────┘ └──────────┘ └──────────┘ └────────────┘ │
└───────────────────────────────────────────────│────────────────┘
│
UDP (port 5005)
│
↓
┌─────────────────────────────────────────────────────────────────┐
│ HOST (PC) │
│ ┌────────────┐ ┌──────────┐ ┌──────────┐ ┌─────────┐ │
│ │ UDP Socket │───→│ Parse │───→│ Buffers │───→│ Charts │ │
│ │ Receive │ │ Packets │ │ (deque) │ │ (live) │ │
│ └────────────┘ └──────────┘ └────┬─────┘ └─────────┘ │
│ │ │
│ ↓ │
│ ┌──────────┐ ┌─────────┐ │
│ │ Record │───→│ CSV │ │
│ │ Buffer │ │ Export │ │
│ └────┬─────┘ └─────────┘ │
│ │ │
│ ↓ │
│ ┌──────────────────────────┐ │
│ │ ANALYZERS │ │
│ │ ├── BumpDetector │ │
│ │ ├── VibrationAnalyzer │ │
│ │ ├── FFTAnalyzer │ │
│ │ └── MotionClassifier │ │
│ └──────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Step 1: Sensor Reading (Robot)
BMI160 Hardware
The BMI160 IMU measures: - Accelerometer: Force in X, Y, Z (including gravity) - Gyroscope: Rotation rate around X, Y, Z
# From pico_collector.py
class BMI160:
"""6-axis IMU (Accelerometer + Gyroscope)"""
def read_raw(self):
"""Read raw 16-bit values from sensor."""
# BMI160 data format: little-endian, gyro first
data = self.i2c.readfrom_mem(self.addr, 0x0C, 12)
gx = struct.unpack_from('<h', data, 0)[0]
gy = struct.unpack_from('<h', data, 2)[0]
gz = struct.unpack_from('<h', data, 4)[0]
ax = struct.unpack_from('<h', data, 6)[0]
ay = struct.unpack_from('<h', data, 8)[0]
az = struct.unpack_from('<h', data, 10)[0]
return (ax, ay, az, gx, gy, gz)
Converting Raw to Physical Units
# Raw values → Physical units
# Accelerometer: ±2g range at 16384 LSB/g
ax_g = ax_raw / 16384.0 # Result in g (9.81 m/s²)
# Gyroscope: ±250°/s range at 131.2 LSB/(°/s)
gx_dps = gx_raw / 131.2 # Result in degrees per second
Understanding g-force
- Robot stationary: az ≈ 1.0g (gravity pointing down)
- Robot tilted 45°: ax ≈ 0.7g, az ≈ 0.7g
- Robot accelerating: ax shows thrust force
Step 2: Data Streaming (Robot → Host)
Packet Format
# Each sample: timestamp + 7 floats
# <Q7f = little-endian: uint64 + 7 × float32
sample = struct.pack('<Q7f',
timestamp_us, # Microseconds since boot
ax, ay, az, # Accelerometer (g)
gx, gy, gz, # Gyroscope (°/s)
distance # Ultrasonic (cm)
)
# Packet: header + multiple samples
# [0x01][count][sample1][sample2]...
packet = bytes([0x01, batch_size]) + samples_data
Why Batch Samples?
Without batching: With batching (10 samples):
┌──────┐ ┌──────────────────────────┐
│ S1 │ → network latency │ S1 S2 S3 S4 S5 S6 S7... │ → 1 network latency
└──────┘ └──────────────────────────┘
┌──────┐
│ S2 │ → network latency 10x fewer packets
└──────┘ More efficient WiFi usage
... Smoother data flow
Step 3: Parsing on Host
# From host_collector.py
def parse_sensor_packet(data):
"""Parse UDP packet from robot into samples."""
if len(data) < 3:
return []
packet_type = data[0]
count = data[1]
samples = []
if packet_type == 0x01: # Standard IMU packet
sample_size = 36 # <Q7f = 8 + 7*4 = 36 bytes
for i in range(count):
offset = 2 + i * sample_size
sample_data = data[offset:offset + sample_size]
if len(sample_data) == sample_size:
values = struct.unpack('<Q7f', sample_data)
samples.append({
'timestamp_us': values[0],
'ax': values[1], 'ay': values[2], 'az': values[3],
'gx': values[4], 'gy': values[5], 'gz': values[6],
'us': values[7]
})
return samples
Step 4: Buffering for Charts
from collections import deque
# Ring buffers for chart data
CHART_HISTORY = 500 # 1 second at 500 Hz
time_buffer = deque(maxlen=CHART_HISTORY)
data_buffers = {
'ax': deque(maxlen=CHART_HISTORY),
'ay': deque(maxlen=CHART_HISTORY),
'az': deque(maxlen=CHART_HISTORY),
# ... etc
}
def process_sample(sample):
time_s = (sample['timestamp_us'] - first_timestamp) / 1_000_000
time_buffer.append(time_s)
for key in data_buffers:
if key in sample:
data_buffers[key].append(sample[key])
Why deque with maxlen?
- Automatically removes oldest data when full
- Constant O(1) append and pop
- No manual memory management needed
Step 5: Analysis
Bump Detection Pipeline
# analyzers/bump_detector.py
class BumpDetector:
"""Counts wheel bumps using high-pass filter."""
def __init__(self, threshold=0.015, hp_alpha=0.95):
self.threshold = threshold
self.hp_alpha = hp_alpha
def analyze(self, data):
"""
Pipeline:
1. Calculate magnitude: sqrt(ax² + ay² + az²) - 1
2. High-pass filter to remove DC/slow drift
3. Count threshold crossings
"""
magnitudes = []
for row in data:
mag = math.sqrt(row['ax']**2 + row['ay']**2 + row['az']**2) - 1.0
magnitudes.append(mag)
# High-pass filter
filtered = []
prev_in, prev_out = 0, 0
for m in magnitudes:
out = self.hp_alpha * (prev_out + m - prev_in)
filtered.append(out)
prev_in, prev_out = m, out
# Count bumps
bumps = 0
was_above = False
for f in filtered:
is_above = f > self.threshold
if is_above and not was_above:
bumps += 1
was_above = is_above
return bumps
FFT Analysis Pipeline
# analyzers/fft_analyzer.py
class FFTAnalyzer:
"""Frequency domain analysis."""
def __init__(self, sample_rate=500, window_type='hann'):
self.sample_rate = sample_rate
self.window_type = window_type
def analyze(self, signal):
"""
Pipeline:
1. Remove DC offset (mean)
2. Apply window function (Hann)
3. Compute FFT
4. Find peaks in magnitude spectrum
"""
import numpy as np
# Remove DC
signal = np.array(signal) - np.mean(signal)
# Apply window
if self.window_type == 'hann':
window = np.hanning(len(signal))
else:
window = np.hamming(len(signal))
windowed = signal * window
# FFT
fft = np.fft.fft(windowed)
freqs = np.fft.fftfreq(len(signal), 1/self.sample_rate)
# Get positive frequencies only
n = len(signal) // 2
magnitudes = np.abs(fft[:n]) / n
freqs = freqs[:n]
# Find peaks
peaks = self._find_peaks(magnitudes, freqs)
return {
'frequencies': freqs,
'magnitudes': magnitudes,
'peaks': peaks
}
Vibration Analysis Pipeline
# analyzers/vibration_analyzer.py
class VibrationAnalyzer:
"""Sliding window variance for motion intensity."""
def __init__(self, window_size=50):
self.window_size = window_size
def analyze(self, data):
"""
Pipeline:
1. Calculate acceleration magnitude
2. Compute variance in sliding window
3. Classify motion state by variance level
"""
magnitudes = [
math.sqrt(r['ax']**2 + r['ay']**2 + r['az']**2)
for r in data
]
variances = []
for i in range(len(magnitudes) - self.window_size):
window = magnitudes[i:i + self.window_size]
var = sum((x - sum(window)/len(window))**2
for x in window) / len(window)
variances.append(var)
# Classify by variance
states = []
for v in variances:
if v < 0.001:
states.append('STOPPED')
elif v < 0.01:
states.append('SLOW')
elif v < 0.1:
states.append('MEDIUM')
else:
states.append('FAST')
return states, variances
Putting It Together
Complete Workflow
# 1. Deploy robot code
ampy --port /dev/ttyACM0 put pico_collector.py main.py
# 2. Start host (robot must be on same WiFi)
python host_collector.py
# 3. Drive and record (press R)
# WASD to control, R to record, E to export
# 4. Analyze recorded data
python analyzers/bump_detector.py data/recording.csv
python analyzers/fft_analyzer.py data/recording.csv --bump
python analyzers/vibration_analyzer.py data/recording.csv
Real-Time vs Batch Analysis
| Analysis | Real-Time (on robot) | Batch (on host) |
|---|---|---|
| Bump counting | HP filter + threshold | Same, higher accuracy |
| Speed | Bump rate × calibration | FFT peak detection |
| Motion state | Simple thresholds | ML classifier |
| Vibration | Variance accumulator | Full windowed analysis |
Educational Approach
Robot does simple, reliable calculations in real-time. Host does complex analysis on recorded data.
This separation makes both sides easier to understand and debug.
Calibration
Bump-to-Distance Calibration
# Drive robot a known distance, count bumps
known_distance_cm = 100 # Measure with ruler
total_bumps = 523 # From analysis
bumps_per_cm = total_bumps / known_distance_cm
# = 5.23 bumps/cm
# Later: estimate distance from bumps
estimated_distance = new_bump_count / bumps_per_cm
Speed Calibration
# Correlate bump frequency with known speed
known_speed_cm_s = 20 # Measure with timer
bump_frequency_hz = 25 # From FFT analysis
calibration_factor = known_speed_cm_s / bump_frequency_hz
# = 0.8 cm/s per Hz
# Later: estimate speed from FFT
estimated_speed = new_bump_freq * calibration_factor
Common Issues and Solutions
Issue: No Data in Charts
Check:
1. Robot connected to WiFi?
2. Correct IP address/port?
3. Firewall blocking UDP?
4. Robot code running? (LED blinking?)
Issue: Noisy Bump Detection
Adjust threshold:
- Too many false positives: increase threshold (0.015 → 0.025)
- Missing real bumps: decrease threshold (0.015 → 0.010)
Or adjust filter:
- Too sensitive: increase hp_alpha (0.95 → 0.98)
- Too sluggish: decrease hp_alpha (0.95 → 0.90)
Issue: Wrong FFT Frequencies
Check sample rate:
- If FFT shows 50Hz but expect 25Hz, sample rate might be wrong
- Verify: count samples per second from recorded CSV
- Actual Fs = len(data) / (last_time - first_time)
Next Steps
- [[Reference/extras/signal-processing/basics|Signal Processing Basics]] - Mathematical foundations
- [[Reference/07-data-analysis/analyzers-reference|Analyzers Reference]] - CLI tool reference
- [[Reference/07-data-analysis/overview|Data Analysis Overview]] - Workflow overview