#!/usr/bin/env python3 import json import os import socket import sys import time import yaml import platform import paho.mqtt.client as mqtt from typing import Dict, Any import importlib.util import glob class System2MQTT: def __init__(self, config_path: str = "config.yaml"): self.config = self._load_config(config_path) self.hostname = socket.gethostname() self.client = self._setup_mqtt_client() self.connected = False self.device_info = self._get_device_info() self.collectors = self._load_collectors() def _load_config(self, config_path: str) -> Dict[str, Any]: """Load configuration from YAML file.""" try: with open(config_path, 'r') as f: return yaml.safe_load(f) except Exception as e: print(f"Error loading config: {e}") sys.exit(1) def _setup_mqtt_client(self) -> mqtt.Client: """Setup MQTT client with configuration.""" client = mqtt.Client( client_id=self.config['mqtt']['client_id'].format(hostname=self.hostname) ) client.username_pw_set( self.config['mqtt']['username'], self.config['mqtt']['password'] ) client.on_connect = self._on_connect client.on_disconnect = self._on_disconnect return client def _on_connect(self, client, userdata, flags, rc): """Callback for when the client connects to the broker.""" if rc == 0: print("Connected to MQTT broker") self.connected = True else: print(f"Failed to connect to MQTT broker with code: {rc}") self.connected = False def _on_disconnect(self, client, userdata, rc): """Callback for when the client disconnects from the broker.""" print(f"Disconnected from MQTT broker with code: {rc}") self.connected = False def _get_device_info(self) -> Dict[str, Any]: """Get device information for Home Assistant.""" return { "identifiers": [f"system2mqtt_{self.hostname}"], "name": f"System {self.hostname}", "model": platform.machine(), "manufacturer": platform.system() } def _get_unique_id(self, sensor_id: str) -> str: """Generate unique_id from sensor_id.""" return f"system2mqtt_{self.hostname}_{sensor_id}" def _get_state_topic(self, sensor_id: str) -> str: """Generate state topic from sensor_id.""" return f"system2mqtt/{self.hostname}/{sensor_id}/state" def _load_collectors(self) -> list: """Load all collector modules from the collectors directory.""" collectors = [] collector_dir = os.path.join(os.path.dirname(__file__), 'collectors') # Find all Python files in the collectors directory collector_files = glob.glob(os.path.join(collector_dir, '*.py')) for collector_file in collector_files: if collector_file.endswith('__init__.py'): continue module_name = os.path.splitext(os.path.basename(collector_file))[0] spec = importlib.util.spec_from_file_location(module_name, collector_file) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) if hasattr(module, 'collect_metrics'): collectors.append(module) print(f"Loaded collector: {module_name}") return collectors def process_collector_data(self, data: Dict[str, Any]): """Process data from collectors and publish to MQTT.""" if not self.connected: print("Not connected to MQTT broker") return # Publish discovery messages for each entity for entity in data['entities']: sensor_id = entity['sensor_id'] unique_id = self._get_unique_id(sensor_id) state_topic = self._get_state_topic(sensor_id) discovery_topic = f"{self.config['mqtt']['discovery_prefix']}/sensor/{unique_id}/config" # Prepare discovery message discovery_msg = { "name": entity['name'], "unique_id": unique_id, "state_topic": state_topic, "state_class": entity['state_class'], "unit_of_measurement": entity['unit_of_measurement'], "device_class": entity['device_class'], "device": self.device_info } # Publish discovery message self.client.publish( discovery_topic, json.dumps(discovery_msg), retain=True ) # Publish state self.client.publish( state_topic, entity['value'], retain=True ) # Publish attributes if present if 'attributes' in entity: attributes_topic = f"{state_topic}/attributes" self.client.publish( attributes_topic, json.dumps(entity['attributes']), retain=True ) def collect_and_publish(self): """Collect metrics from all collectors and publish them.""" for collector in self.collectors: try: data = collector.collect_metrics() self.process_collector_data(data) except Exception as e: print(f"Error collecting metrics from {collector.__name__}: {e}") def connect(self): """Connect to MQTT broker.""" try: self.client.connect( self.config['mqtt']['broker'], self.config['mqtt']['port'] ) self.client.loop_start() except Exception as e: print(f"Error connecting to MQTT broker: {e}") sys.exit(1) def disconnect(self): """Disconnect from MQTT broker.""" self.client.loop_stop() self.client.disconnect() def main(): """Main function.""" system2mqtt = System2MQTT() system2mqtt.connect() try: # Initial collection system2mqtt.collect_and_publish() # Keep collecting metrics every 60 seconds while True: time.sleep(60) system2mqtt.collect_and_publish() except KeyboardInterrupt: print("\nShutting down...") finally: system2mqtt.disconnect() if __name__ == "__main__": main()