192 lines
6.5 KiB
Python
192 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import os
|
|
import socket
|
|
import sys
|
|
import time
|
|
import yaml
|
|
import platform
|
|
import paho.mqtt.client as mqtt
|
|
from typing import Dict, Any
|
|
import importlib.util
|
|
import glob
|
|
|
|
class System2MQTT:
|
|
def __init__(self, config_path: str = "config.yaml"):
|
|
self.config = self._load_config(config_path)
|
|
self.hostname = socket.gethostname()
|
|
self.client = self._setup_mqtt_client()
|
|
self.connected = False
|
|
self.device_info = self._get_device_info()
|
|
self.collectors = self._load_collectors()
|
|
|
|
def _load_config(self, config_path: str) -> Dict[str, Any]:
|
|
"""Load configuration from YAML file."""
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
return yaml.safe_load(f)
|
|
except Exception as e:
|
|
print(f"Error loading config: {e}")
|
|
sys.exit(1)
|
|
|
|
def _setup_mqtt_client(self) -> mqtt.Client:
|
|
"""Setup MQTT client with configuration."""
|
|
client = mqtt.Client(
|
|
client_id=self.config['mqtt']['client_id'].format(hostname=self.hostname)
|
|
)
|
|
client.username_pw_set(
|
|
self.config['mqtt']['username'],
|
|
self.config['mqtt']['password']
|
|
)
|
|
client.on_connect = self._on_connect
|
|
client.on_disconnect = self._on_disconnect
|
|
return client
|
|
|
|
def _on_connect(self, client, userdata, flags, rc):
|
|
"""Callback for when the client connects to the broker."""
|
|
if rc == 0:
|
|
print("Connected to MQTT broker")
|
|
self.connected = True
|
|
else:
|
|
print(f"Failed to connect to MQTT broker with code: {rc}")
|
|
self.connected = False
|
|
|
|
def _on_disconnect(self, client, userdata, rc):
|
|
"""Callback for when the client disconnects from the broker."""
|
|
print(f"Disconnected from MQTT broker with code: {rc}")
|
|
self.connected = False
|
|
|
|
def _get_device_info(self) -> Dict[str, Any]:
|
|
"""Get device information for Home Assistant."""
|
|
return {
|
|
"identifiers": [f"system2mqtt_{self.hostname}"],
|
|
"name": f"System {self.hostname}",
|
|
"model": platform.machine(),
|
|
"manufacturer": platform.system()
|
|
}
|
|
|
|
def _get_unique_id(self, sensor_id: str) -> str:
|
|
"""Generate unique_id from sensor_id."""
|
|
return f"system2mqtt_{self.hostname}_{sensor_id}"
|
|
|
|
def _get_state_topic(self, sensor_id: str) -> str:
|
|
"""Generate state topic from sensor_id."""
|
|
return f"system2mqtt/{self.hostname}/{sensor_id}/state"
|
|
|
|
def _load_collectors(self) -> list:
|
|
"""Load all collector modules from the collectors directory."""
|
|
collectors = []
|
|
collector_dir = os.path.join(os.path.dirname(__file__), 'collectors')
|
|
|
|
# Find all Python files in the collectors directory
|
|
collector_files = glob.glob(os.path.join(collector_dir, '*.py'))
|
|
|
|
for collector_file in collector_files:
|
|
if collector_file.endswith('__init__.py'):
|
|
continue
|
|
|
|
module_name = os.path.splitext(os.path.basename(collector_file))[0]
|
|
spec = importlib.util.spec_from_file_location(module_name, collector_file)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
if hasattr(module, 'collect_metrics'):
|
|
collectors.append(module)
|
|
print(f"Loaded collector: {module_name}")
|
|
|
|
return collectors
|
|
|
|
def process_collector_data(self, data: Dict[str, Any]):
|
|
"""Process data from collectors and publish to MQTT."""
|
|
if not self.connected:
|
|
print("Not connected to MQTT broker")
|
|
return
|
|
|
|
# Publish discovery messages for each entity
|
|
for entity in data['entities']:
|
|
sensor_id = entity['sensor_id']
|
|
unique_id = self._get_unique_id(sensor_id)
|
|
state_topic = self._get_state_topic(sensor_id)
|
|
discovery_topic = f"{self.config['mqtt']['discovery_prefix']}/sensor/{unique_id}/config"
|
|
|
|
# Prepare discovery message
|
|
discovery_msg = {
|
|
"name": entity['name'],
|
|
"unique_id": unique_id,
|
|
"state_topic": state_topic,
|
|
"state_class": entity['state_class'],
|
|
"unit_of_measurement": entity['unit_of_measurement'],
|
|
"device_class": entity['device_class'],
|
|
"device": self.device_info
|
|
}
|
|
|
|
# Publish discovery message
|
|
self.client.publish(
|
|
discovery_topic,
|
|
json.dumps(discovery_msg),
|
|
retain=True
|
|
)
|
|
|
|
# Publish state
|
|
self.client.publish(
|
|
state_topic,
|
|
entity['value'],
|
|
retain=True
|
|
)
|
|
|
|
# Publish attributes if present
|
|
if 'attributes' in entity:
|
|
attributes_topic = f"{state_topic}/attributes"
|
|
self.client.publish(
|
|
attributes_topic,
|
|
json.dumps(entity['attributes']),
|
|
retain=True
|
|
)
|
|
|
|
def collect_and_publish(self):
|
|
"""Collect metrics from all collectors and publish them."""
|
|
for collector in self.collectors:
|
|
try:
|
|
data = collector.collect_metrics()
|
|
self.process_collector_data(data)
|
|
except Exception as e:
|
|
print(f"Error collecting metrics from {collector.__name__}: {e}")
|
|
|
|
def connect(self):
|
|
"""Connect to MQTT broker."""
|
|
try:
|
|
self.client.connect(
|
|
self.config['mqtt']['broker'],
|
|
self.config['mqtt']['port']
|
|
)
|
|
self.client.loop_start()
|
|
except Exception as e:
|
|
print(f"Error connecting to MQTT broker: {e}")
|
|
sys.exit(1)
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from MQTT broker."""
|
|
self.client.loop_stop()
|
|
self.client.disconnect()
|
|
|
|
def main():
|
|
"""Main function."""
|
|
system2mqtt = System2MQTT()
|
|
system2mqtt.connect()
|
|
|
|
try:
|
|
# Initial collection
|
|
system2mqtt.collect_and_publish()
|
|
|
|
# Keep collecting metrics every 60 seconds
|
|
while True:
|
|
time.sleep(60)
|
|
system2mqtt.collect_and_publish()
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down...")
|
|
finally:
|
|
system2mqtt.disconnect()
|
|
|
|
if __name__ == "__main__":
|
|
main() |