Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 51 additions & 51 deletions system/hardware/tici/hardware.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import configparser
import json
import os
import subprocess
import time
from enum import IntEnum
from functools import cached_property, lru_cache
from pathlib import Path

Expand All @@ -15,22 +15,8 @@
from openpilot.system.hardware.tici.pins import GPIO
from openpilot.system.hardware.tici.amplifier import Amplifier

NM = 'org.freedesktop.NetworkManager'
NM_CON_ACT = NM + '.Connection.Active'
NM_DEV = NM + '.Device'
NM_DEV_WL = NM + '.Device.Wireless'
NM_AP = NM + '.AccessPoint'
DBUS_PROPS = 'org.freedesktop.DBus.Properties'

class NMMetered(IntEnum):
NM_METERED_UNKNOWN = 0
NM_METERED_YES = 1
NM_METERED_NO = 2
NM_METERED_GUESS_YES = 3
NM_METERED_GUESS_NO = 4

NM_CONNECTIONS_DIRS = ("/run/NetworkManager/system-connections", "/data/etc/NetworkManager/system-connections")
MODEM_STATE_PATH = "/dev/shm/modem"
TIMEOUT = 0.1

NetworkType = log.DeviceState.NetworkType
NetworkStrength = log.DeviceState.NetworkStrength
Expand All @@ -52,16 +38,16 @@ def get_device_type():
model = f.read().strip('\x00')
return model.split('comma ')[-1]

class Tici(HardwareBase):
@cached_property
def bus(self):
import dbus
return dbus.SystemBus()
def wpa_cli(cmd):
out = subprocess.check_output(["wpa_cli", "-i", "wlan0", cmd], text=True, timeout=2)
return dict(l.split("=", 1) for l in out.splitlines() if "=" in l)

@cached_property
def nm(self):
return self.bus.get_object(NM, '/org/freedesktop/NetworkManager')
def get_default_route_iface():
with open("/proc/net/route") as f:
routes = [(int(route[6]), route[0]) for line in f.readlines()[1:] if (route := line.split())[1] == "00000000" and int(route[3], 16) & 0x1]
return min(routes)[1] if routes else None

class Tici(HardwareBase):
@cached_property
def amplifier(self):
if self.get_device_type() == "mici":
Expand Down Expand Up @@ -114,13 +100,11 @@ def set_ir_power(self, percent: int):
def get_network_type(self):
ms = self.get_modem_state()
try:
primary_connection = self.nm.Get(NM, 'PrimaryConnection', dbus_interface=DBUS_PROPS, timeout=TIMEOUT)
primary_connection = self.bus.get_object(NM, primary_connection)
primary_type = primary_connection.Get(NM_CON_ACT, 'Type', dbus_interface=DBUS_PROPS, timeout=TIMEOUT)
if primary_type == '802-3-ethernet':
return NetworkType.ethernet
elif primary_type == '802-11-wireless':
iface = get_default_route_iface()
if iface == "wlan0":
return NetworkType.wifi
if iface in ("eth0", "usb0"):
return NetworkType.ethernet
except Exception:
pass

Expand All @@ -136,10 +120,6 @@ def get_network_type(self):
return NetworkType.cell2G
return NetworkType.none

def get_wlan(self):
wlan_path = self.nm.GetDeviceByIpIface('wlan0', dbus_interface=NM, timeout=TIMEOUT)
return self.bus.get_object(NM, wlan_path)

def get_sim_info(self):
ms = self.get_modem_state()
sim_id = ms.get('iccid', '')
Expand Down Expand Up @@ -189,13 +169,14 @@ def get_network_strength(self, network_type):
try:
if network_type == NetworkType.none:
pass
elif network_type == NetworkType.ethernet:
network_strength = NetworkStrength.great
elif network_type == NetworkType.wifi:
wlan = self.get_wlan()
active_ap_path = wlan.Get(NM_DEV_WL, 'ActiveAccessPoint', dbus_interface=DBUS_PROPS, timeout=TIMEOUT)
if active_ap_path != "/":
active_ap = self.bus.get_object(NM, active_ap_path)
strength = int(active_ap.Get(NM_AP, 'Strength', dbus_interface=DBUS_PROPS, timeout=TIMEOUT))
network_strength = self.parse_strength(strength)
rssi = wpa_cli("signal_poll").get("RSSI")
if rssi is not None:
dbm = int(rssi)
if -100 < dbm <= 0:
network_strength = self.parse_strength(120 + max(-90, min(-20, dbm)))
else: # Cellular
network_strength = self.parse_strength(self.get_modem_state().get('signal_quality', 0))
except Exception:
Expand All @@ -208,17 +189,36 @@ def get_network_metered(self, network_type) -> bool:
from openpilot.common.params import Params
return Params().get_bool("GsmMetered")
try:
primary_connection = self.nm.Get(NM, 'PrimaryConnection', dbus_interface=DBUS_PROPS, timeout=TIMEOUT)
primary_connection = self.bus.get_object(NM, primary_connection)
primary_devices = primary_connection.Get(NM_CON_ACT, 'Devices', dbus_interface=DBUS_PROPS, timeout=TIMEOUT)

for dev in primary_devices:
dev_obj = self.bus.get_object(NM, str(dev))
metered_prop = dev_obj.Get(NM_DEV, 'Metered', dbus_interface=DBUS_PROPS, timeout=TIMEOUT)

if network_type == NetworkType.wifi:
if metered_prop in [NMMetered.NM_METERED_YES, NMMetered.NM_METERED_GUESS_YES]:
return True
if network_type == NetworkType.wifi:
ssid = wpa_cli("status").get("ssid", "")
if ssid:
# wpa_cli escapes non-printable bytes as \xNN; NM keyfile stores ASCII SSIDs as a literal and others as a byte;byte; list
ssid_bytes = ssid.encode().decode('unicode_escape').encode('latin-1')
ssid_keyfile_list = ';'.join(str(b) for b in ssid_bytes) + ';'

for fpath in (p for d in NM_CONNECTIONS_DIRS for p in Path(d).glob("*.nmconnection")):
raw = sudo_read(str(fpath))
if not raw:
continue
cp = configparser.ConfigParser(interpolation=None)
try:
cp.read_string(raw)
keyfile_ssid = cp.get("wifi", "ssid", fallback="")
if keyfile_ssid != ssid and keyfile_ssid != ssid_keyfile_list:
continue
metered = cp.getint("connection", "metered", fallback=0)
except (configparser.Error, ValueError):
continue
if metered == 1:
return True
if metered == 2:
return False
break

# TODO: remove when openpilot owns dhcp and can detect hotspots itself
out = subprocess.check_output(["nmcli", "-t", "-f", "GENERAL.METERED", "dev", "show", "wlan0"], text=True, timeout=2)
if out.strip().split(":", 1)[-1].startswith("yes"):
return True
except Exception:
pass

Expand Down
Loading