diagnose.py (Source)

import asyncio
import json
import os
import platform
import re
import sys
from importlib_metadata import PackageNotFoundError, version
_conf_path = os.path.expanduser("~") + "/theengsgw.conf"
_ADDR_RE = re.compile(r"^(([0-9A-F]{2}:){3})([0-9A-F]{2}:){2}[0-9A-F]{2}$")
def _anonymize_strings(fields, config) -> None:
    for field in fields:
        if field in config:
            config[field] = "***"
def _anonymize_address(address) -> str:
    addr_parts = _ADDR_RE.match(address)
    if addr_parts:
        return f"{addr_parts.group(1)}XX:XX:XX"
    else:
        return "INVALID ADDRESS"
def _anonymize_addresses(field, config) -> None:
    try:
        config[field] = [
            _anonymize_address(address) for address in config[field]
        ]
    except KeyError:
        pass
# This function is taken from Textual
def _section(title, values) -> None:
    """Print a collection of named values within a titled section.
    Args:
        title: The title for the section.
        values: The values to print out.
    """
    max_name = max(map(len, values.keys()))
    max_value = max(map(len, [str(value) for value in values.values()]))
    print(f"## {title}")
    print()
    print(f"| {'Name':{max_name}} | {'Value':{max_value}} |")
    print(f"|-{'-' * max_name}-|-{'-'*max_value}-|")
    for name, value in values.items():
        print(f"| {name:{max_name}} | {str(value):{max_value}} |")
    print()
def _versions() -> None:
    """Print useful version numbers."""
    try:
        packages = {
            "Theengs Gateway": version("TheengsGateway"),
            "Theengs Decoder": version("TheengsDecoder"),
            "Bleak": version("bleak"),
            "Bluetooth Clocks": version("bluetooth-clocks"),
            "Bluetooth Numbers": version("bluetooth-numbers"),
            "Paho MQTT": version("paho-mqtt"),
        }
    except PackageNotFoundError as e:
        print(f"Package {e.name} not found. Please install it with:")
        print()
        print(f"    pip install {e.name}")
        print()
    if sys.version_info[:2] >= (3, 9):
        try:
            packages["Bluetooth Adapters"] = version("bluetooth-adapters")
        except PackageNotFoundError as e:
            print(f"Package {e.name} not found. Please install it with:")
            print()
            print(f"    pip install {e.name}")
            print()
    _section("Package Versions", packages)
def _python() -> None:
    """Print information about Python."""
    _section(
        "Python",
        {
            "Version": platform.python_version(),
            "Implementation": platform.python_implementation(),
            "Compiler": platform.python_compiler(),
            "Executable": sys.executable,
        },
    )
def _os() -> None:
    os_parameters = {
        "System": platform.system(),
        "Release": platform.release(),
        "Version": platform.version(),
        "Machine type": platform.machine(),
    }
    if platform.system() == "Linux" and sys.version_info[:2] >= (3, 10):
        os_parameters["Distribution"] = platform.freedesktop_os_release()[
            "PRETTY_NAME"
        ]
    _section("Operating System", os_parameters)
def _config() -> None:
    print("## Configuration")
    print()
    try:
        with open(_conf_path, encoding="utf-8") as config_file:
            config = json.load(config_file)
            _anonymize_strings(["user", "pass"], config)
            _anonymize_addresses("time_sync", config)
        print("```")
        print(json.dumps(config, sort_keys=True, indent=4))
        print("```")
        print()
    except FileNotFoundError:
        print(f"Configuration file not found: {_conf_path}")
        print()
async def _adapters() -> None:
    if sys.version_info[:2] >= (3, 9):
        from bluetooth_adapters import get_adapters
        print("## Bluetooth adapters")
        print()
        bluetooth_adapters = get_adapters()
        await bluetooth_adapters.refresh()
        print(f"Default adapter: {bluetooth_adapters.default_adapter}")
        print()
        for adapter, properties in sorted(bluetooth_adapters.adapters.items()):
            properties["address"] = _anonymize_address(properties["address"])
            print("#", end="")
            _section(adapter, properties)
async def diagnostics():
    print("# Theengs Gateway Diagnostics")
    print()
    _versions()
    _python()
    _os()
    _config()
    await _adapters()
if __name__ == "__main__":
    asyncio.run(diagnostics())