A real-time assistive technology solution combining computer vision and sensor fusion - with all its challenges and learnings
Presented By:VIVEKTEJA SAPAVATH
I chose to create something impactful that can help many people ,So, I chose to make a solution to assist the visually impaired in navigating their environment safely.
Since almost everyone uses a smartphone today, I aimed to create an application that works solely on a smartphone with reasonable accuracy.
Wanted to combine:
Explored various computer vision approaches:
The system runs multiple components simultaneously:
Changed approaches multiple times:
Information moves through three parallel pipelines that merge at the fusion module:
class YOLOv8Detector: def __init__(self, model_path="yolov8n.pt"): # Load model self.model = YOLO(model_path) self.model.to('cuda' if torch.cuda.is_available() else 'cpu') self.classes = self.model.names def detect(self, frame): # Run inference results = self.model(frame, imgsz=640, conf=0.5) # Process results detections = [] for result in results: boxes = result.boxes.xyxy.cpu().numpy() scores = result.boxes.conf.cpu().numpy() class_ids = result.boxes.cls.cpu().numpy().astype(int) for box, score, cls_id in zip(boxes, scores, class_ids): detections.append({ 'bbox': box, 'score': score, 'class_id': cls_id, 'class_name': self.classes[cls_id] }) return detections
After researching alternatives (R-CNN, SSD, etc.):
class DeepSortTracker: def __init__(self): # Feature extractor self.encoder = FeatureExtractor() # Tracking parameters self.max_age = 50 # frames self.n_init = 3 # confirmations needed self.metric = NearestNeighborDistanceMetric("cosine", 0.2) self.tracker = Tracker( self.metric, max_age=self.max_age, n_init=self.n_init) def update(self, detections, frame): # Extract features for each detection features = [] for det in detections: x1, y1, x2, y2 = map(int, det['bbox']) crop = frame[y1:y2, x1:x2] features.append(self.encoder(crop) if crop.size > 0 else np.zeros(512)) # Convert to numpy arrays bboxes = np.array([d['bbox'] for d in detections]) confidences = np.array([d['score'] for d in detections]) features = np.array(features) # Update tracker self.tracker.predict() self.tracker.update(bboxes, confidences, features) # Return active tracks return [ { 'track_id': track.track_id, 'bbox': track.to_tlbr(), 'class_id': getattr(track, 'class_id', -1) } for track in self.tracker.tracks if track.is_confirmed() and track.time_since_update <= 1 ]
Needed persistent object IDs across frames:
class DepthEstimator: def __init__(self, model_type="DPT_Large"): # Load MiDaS model self.model = torch.hub.load("intel-isl/MiDaS", model_type) self.model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) self.model.eval() # Load transforms self.transform = torch.hub.load("intel-isl/MiDaS", "transforms").dpt_transform def estimate(self, frame): # Convert and preprocess img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) input_tensor = self.transform(img).to(self.device) # Predict depth with torch.no_grad(): prediction = self.model(input_tensor.unsqueeze(0)) prediction = torch.nn.functional.interpolate( prediction.unsqueeze(1), size=img.shape[:2], mode="bicubic", align_corners=False).squeeze() # Convert to numpy and normalize depth_map = prediction.cpu().numpy() return cv2.normalize(depth_map, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U) def get_object_depth(self, depth_map, bbox): x1, y1, x2, y2 = map(int, bbox) obj_region = depth_map[y1:y2, x1:x2] if obj_region.size == 0: return 0 median_depth = np.median(obj_region) # Convert to relative distance if median_depth > 200: return "very close" elif median_depth > 150: return "close" elif median_depth > 100: return "medium" else: return "far"
Major hurdles faced:
class NetworkServer: initialize TCP and UDP ports initialize server state and IMU data queue initialize sockets and client list function start_servers(): set running = True start TCP server in separate thread start UDP server in separate thread print server start info function _tcp_server(): create and bind TCP socket listen for incoming connections while running: accept new client start thread to handle client on shutdown, close TCP socket function _handle_tcp_client(client_socket, address): while running: receive data from client if no data, break try: parse data as JSON process sensor data send "OK" except parse error: send "ERROR" close client socket function _udp_server(): create and bind UDP socket while running: receive data from client try: parse data as JSON process sensor data except parse error: continue on shutdown, close UDP socket function _process_sensor_data(data): extract timestamp, accel, gyro from JSON create IMUData object add IMUData to queue function get_imu_data(): collect all IMUData from queue and return as list function stop_servers(): set running = False close TCP and UDP sockets if open
class IMUProcessor: initialize motion state (position, velocity, orientation, etc.) initialize calibration manager and filters initialize bias and buffers function set_calibration_mode(enable): if enable: start collecting calibration data else: stop calibration function add_imu_data(imu_data): if in calibration mode: collect calibration data return if calibration complete: apply calibration to imu_data extract accel and gyro from imu_data if still collecting bias: update accel/gyro bias return subtract bias from accel and gyro store corrected accel and gyro in buffers if not enough data in buffer: return apply low-pass filter to accel update orientation using gyro rotate accel to world frame and remove gravity update motion state: acceleration β velocity β position apply damping to velocity update confidence based on variance in recent accel data function _apply_filter(buffer): apply low-pass filter to buffer and return latest value function _update_orientation(gyro): convert gyro to quaternion rotation apply to current orientation normalize orientation function _quaternion_multiply(q1, q2): return product of two quaternions function _rotate_vector(vector, quaternion): rotate vector using quaternion function _update_confidence(): calculate variance in recent accel map variance to confidence score
Unexpected difficulties:
class DataFusion: def __init__(self): self.tracks = {} self.max_age = 2.0 # seconds def update(self, visual_tracks, depth_map, imu_data): current_time = time.time() risk_items = [] # Update tracks with new data for track in visual_tracks: track_id = track['track_id'] if track_id in self.tracks: # Update existing track self.tracks[track_id].update({ 'bbox': track['bbox'], 'class': track['class_name'], 'depth': self._get_depth(track['bbox'], depth_map), 'last_seen': current_time }) else: # New track self.tracks[track_id] = { 'bbox': track['bbox'], 'class': track['class_name'], 'depth': self._get_depth(track['bbox'], depth_map), 'first_seen': current_time, 'last_seen': current_time } # Remove stale tracks self.tracks = { k: v for k, v in self.tracks.items() if current_time - v['last_seen'] < self.max_age } # Assess risk for each track for track_id, track in self.tracks.items(): risk = self._assess_risk(track, imu_data) risk_items.append({ 'track_id': track_id, 'class': track['class'], 'bbox': track['bbox'], 'depth': track['depth'], 'risk': risk }) return risk_items def _assess_risk(self, track, imu_data): # Simple risk assessment depth_priority = { "very close": 3, "close": 2, "medium": 1, "far": 0 } class_priority = { "person": 3, "car": 3, "bicycle": 2, "dog": 2, "chair": 1, "bench": 1 } risk_score = (depth_priority.get(track['depth'], 0) * class_priority.get(track['class'], 0)) if risk_score >= 6: return "HIGH" elif risk_score >= 3: return "MEDIUM" else: return "LOW"
Difficulties in combining data:
Smart alert features:
Explored computer vision models, depth estimation techniques, and sensor options. Settled on YOLOv8 + MiDaS + IMU approach.
Built basic YOLO detection pipeline. Attempted to integrate depth estimation. First attempts at Android sensor access.
Discovered Play Store restrictions on sensor access. Depth scaling not working as expected. IMU drift issues became apparent.
Switched from Streamlit to PyQt5. Changed from absolute to relative depth. Added DeepSORT for better tracking.
Combined all components. Added calibration routines. Implemented basic guidance system.
Theoretical concepts often don't translate directly to practice. Real systems have limitations and edge cases that aren't apparent until implementation.
Spending more time upfront understanding platform limitations and model capabilities would have saved weeks of rework.
Building a minimal viable product first, then enhancing it, leads to better outcomes than trying to implement everything at once.
As a first-year student, this project was extremely ambitious. While I learned a lot, a narrower focus might have yielded more polished results.
Despite the challenges and limitations, this project provided invaluable hands-on experience with real-world computer vision and sensor systems. The lessons learned will inform all my future projects.