326 lines
13 KiB
Python
326 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import os
|
|
import socket
|
|
import sys
|
|
import time
|
|
import yaml
|
|
import platform
|
|
import asyncio
|
|
import paho.mqtt.client as mqtt
|
|
from typing import Dict, Any, List
|
|
import importlib.util
|
|
import glob
|
|
from datetime import datetime, timedelta
|
|
|
|
# Default configuration values used when config file or env vars are missing
|
|
CONFIG_DEFAULTS = {
|
|
'mqtt': {
|
|
'host': 'localhost',
|
|
'port': 1883,
|
|
'username': None,
|
|
'password': None,
|
|
'client_id': 'system2mqtt_{hostname}',
|
|
'discovery_prefix': 'homeassistant'
|
|
},
|
|
'collectors': {
|
|
'default_interval': 60,
|
|
'intervals': {}
|
|
}
|
|
}
|
|
|
|
class System2MQTT:
|
|
def __init__(self, config_path: str = "config.yaml"):
|
|
self.config = self._load_config(config_path)
|
|
self.hostname = socket.gethostname()
|
|
self.client = None # paho MQTT client initialized in connect()
|
|
self.connected = False
|
|
self.device_info = self._get_device_info()
|
|
self.collectors = self._load_collectors()
|
|
self.last_run = {} # Speichert den Zeitpunkt des letzten Laufs für jeden Sammler
|
|
|
|
def _load_config(self, config_path: str) -> Dict[str, Any]:
|
|
"""Load configuration from YAML file, apply defaults and environment overrides.
|
|
|
|
Precedence: CONFIG_DEFAULTS < config file < environment variables
|
|
"""
|
|
# Determine config path: env var overrides parameter
|
|
env_path = os.environ.get('SYSTEM2MQTT_CONFIG')
|
|
if env_path:
|
|
config_path = env_path
|
|
|
|
config = {}
|
|
# Start with defaults
|
|
config.update(CONFIG_DEFAULTS)
|
|
|
|
# Try loading YAML if present
|
|
if os.path.exists(config_path):
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
loaded = yaml.safe_load(f) or {}
|
|
# Deep merge loaded config into defaults (shallow merge is enough for our shape)
|
|
for k, v in loaded.items():
|
|
if isinstance(v, dict) and k in config:
|
|
config[k].update(v)
|
|
else:
|
|
config[k] = v
|
|
except Exception as e:
|
|
print(f"Warning: failed to load config file {config_path}: {e}")
|
|
print("Proceeding with defaults and environment overrides.")
|
|
else:
|
|
print(f"Config file '{config_path}' not found; using defaults and environment variables if set.")
|
|
|
|
# Ensure necessary sub-keys exist
|
|
config.setdefault('mqtt', CONFIG_DEFAULTS['mqtt'].copy())
|
|
config.setdefault('collectors', CONFIG_DEFAULTS['collectors'].copy())
|
|
config['collectors'].setdefault('intervals', {})
|
|
|
|
# Apply environment variable overrides
|
|
self._merge_env_overrides(config)
|
|
|
|
return config
|
|
|
|
def _merge_env_overrides(self, config: Dict[str, Any]):
|
|
"""Merge environment variable overrides into the config dict.
|
|
|
|
Recognized env vars (examples): MQTT_HOST, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD,
|
|
MQTT_CLIENT_ID, MQTT_DISCOVERY_PREFIX, COLLECTORS_DEFAULT_INTERVAL, COLLECTOR_<NAME>_INTERVAL
|
|
"""
|
|
# MQTT overrides
|
|
if 'MQTT_HOST' in os.environ:
|
|
config['mqtt']['host'] = os.environ['MQTT_HOST']
|
|
if 'MQTT_PORT' in os.environ:
|
|
try:
|
|
config['mqtt']['port'] = int(os.environ['MQTT_PORT'])
|
|
except ValueError:
|
|
print("Warning: MQTT_PORT is not an integer; ignoring env override")
|
|
if 'MQTT_USERNAME' in os.environ:
|
|
config['mqtt']['username'] = os.environ['MQTT_USERNAME']
|
|
if 'MQTT_PASSWORD' in os.environ:
|
|
config['mqtt']['password'] = os.environ['MQTT_PASSWORD']
|
|
if 'MQTT_CLIENT_ID' in os.environ:
|
|
config['mqtt']['client_id'] = os.environ['MQTT_CLIENT_ID']
|
|
if 'MQTT_DISCOVERY_PREFIX' in os.environ:
|
|
config['mqtt']['discovery_prefix'] = os.environ['MQTT_DISCOVERY_PREFIX']
|
|
|
|
# Collectors default interval
|
|
if 'COLLECTORS_DEFAULT_INTERVAL' in os.environ:
|
|
try:
|
|
config['collectors']['default_interval'] = int(os.environ['COLLECTORS_DEFAULT_INTERVAL'])
|
|
except ValueError:
|
|
print("Warning: COLLECTORS_DEFAULT_INTERVAL is not an integer; ignoring env override")
|
|
|
|
# Per-collector overrides
|
|
for key, val in os.environ.items():
|
|
if key.startswith('COLLECTOR_') and key.endswith('_INTERVAL'):
|
|
# Example: COLLECTOR_system_metrics_INTERVAL
|
|
parts = key.split('_')
|
|
if len(parts) >= 3:
|
|
name = '_'.join(parts[1:-1])
|
|
try:
|
|
config['collectors']['intervals'][name] = int(val)
|
|
except ValueError:
|
|
print(f"Warning: {key} must be an integer; ignoring")
|
|
|
|
def _setup_mqtt_client(self) -> mqtt.Client:
|
|
"""Setup paho-mqtt client with configuration (callback API v2 when available)."""
|
|
client_id = self.config['mqtt'].get('client_id', 'system2mqtt_{hostname}').format(hostname=self.hostname)
|
|
# Prefer callback API v2 to avoid deprecation warnings; fall back if older paho
|
|
try:
|
|
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
|
|
except TypeError:
|
|
client = mqtt.Client(client_id=client_id)
|
|
username = self.config['mqtt'].get('username')
|
|
password = self.config['mqtt'].get('password')
|
|
if username or password:
|
|
client.username_pw_set(username, password)
|
|
client.on_connect = self._on_connect
|
|
client.on_disconnect = self._on_disconnect
|
|
return client
|
|
|
|
def _on_connect(self, client, userdata, flags, rc, properties=None):
|
|
"""Callback when connected to broker (paho)."""
|
|
try:
|
|
rc_val = int(rc)
|
|
except Exception:
|
|
rc_val = 0
|
|
if rc_val == 0:
|
|
print("Connected to MQTT broker")
|
|
self.connected = True
|
|
else:
|
|
print(f"Failed to connect to MQTT broker with code: {rc_val}")
|
|
self.connected = False
|
|
|
|
def _on_disconnect(self, client, userdata, rc, reason_code=None, properties=None):
|
|
"""Callback when disconnected (paho v2)."""
|
|
print("Disconnected from MQTT broker")
|
|
self.connected = False
|
|
|
|
def _get_device_info(self) -> Dict[str, Any]:
|
|
"""Get device information for Home Assistant."""
|
|
return {
|
|
"identifiers": [f"system2mqtt_{self.hostname}"],
|
|
"name": f"System {self.hostname}",
|
|
"model": platform.machine(),
|
|
"manufacturer": platform.system()
|
|
}
|
|
|
|
def _get_unique_id(self, sensor_id: str) -> str:
|
|
"""Generate unique_id from sensor_id."""
|
|
return f"system2mqtt_{self.hostname}_{sensor_id}"
|
|
|
|
def _get_state_topic(self, sensor_id: str) -> str:
|
|
"""Generate state topic from sensor_id."""
|
|
return f"system2mqtt/{self.hostname}/{sensor_id}/state"
|
|
|
|
def _load_collectors(self) -> List[Dict[str, Any]]:
|
|
"""Load all collector modules from the collectors directory."""
|
|
collectors = []
|
|
collector_dir = os.path.join(os.path.dirname(__file__), 'collectors')
|
|
|
|
# Find all Python files in the collectors directory
|
|
collector_files = glob.glob(os.path.join(collector_dir, '*.py'))
|
|
|
|
for collector_file in collector_files:
|
|
if collector_file.endswith('__init__.py'):
|
|
continue
|
|
|
|
module_name = os.path.splitext(os.path.basename(collector_file))[0]
|
|
spec = importlib.util.spec_from_file_location(module_name, collector_file)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
if hasattr(module, 'collect_metrics'):
|
|
# Get interval from config or use collector's default
|
|
default_interval = getattr(module, 'DEFAULT_INTERVAL', self.config['collectors']['default_interval'])
|
|
interval = self.config['collectors']['intervals'].get(
|
|
module_name,
|
|
default_interval
|
|
)
|
|
|
|
collectors.append({
|
|
'module': module,
|
|
'name': module_name,
|
|
'interval': interval
|
|
})
|
|
print(f"Loaded collector: {module_name} (interval: {interval}s)")
|
|
|
|
return collectors
|
|
|
|
async def process_collector_data(self, data: Dict[str, Any]):
|
|
"""Process data from collectors and publish to MQTT."""
|
|
if not self.connected:
|
|
print("Not connected to MQTT broker")
|
|
return
|
|
|
|
# Publish discovery messages for each entity
|
|
for entity in data['entities']:
|
|
sensor_id = entity['sensor_id']
|
|
unique_id = self._get_unique_id(sensor_id)
|
|
state_topic = self._get_state_topic(sensor_id)
|
|
discovery_topic = f"{self.config['mqtt']['discovery_prefix']}/sensor/{unique_id}/config"
|
|
attributes_topic = f"{state_topic}/attributes"
|
|
availability_topic = f"system2mqtt/{self.hostname}/status"
|
|
|
|
# Prepare discovery message
|
|
discovery_msg = {
|
|
"name": entity['name'],
|
|
"unique_id": unique_id,
|
|
"state_topic": state_topic,
|
|
"state_class": entity['state_class'],
|
|
"unit_of_measurement": entity['unit_of_measurement'],
|
|
"device_class": entity['device_class'],
|
|
"device": self.device_info,
|
|
"json_attributes_topic": attributes_topic,
|
|
"availability_topic": availability_topic,
|
|
"payload_available": "online",
|
|
"payload_not_available": "offline"
|
|
}
|
|
|
|
# Include icon if provided by the collector
|
|
if 'icon' in entity and entity['icon']:
|
|
discovery_msg["icon"] = entity['icon']
|
|
|
|
# Publish discovery message
|
|
self.client.publish(discovery_topic, json.dumps(discovery_msg), qos=0, retain=True)
|
|
# Publish availability (retained)
|
|
self.client.publish(availability_topic, "online", qos=0, retain=True)
|
|
|
|
# Publish state
|
|
self.client.publish(state_topic, str(entity['value']), qos=0, retain=True)
|
|
|
|
# Publish attributes if present
|
|
if 'attributes' in entity:
|
|
self.client.publish(attributes_topic, json.dumps(entity['attributes']), qos=0, retain=True)
|
|
|
|
def should_run_collector(self, collector: Dict[str, Any]) -> bool:
|
|
"""Check if a collector should run based on its interval."""
|
|
now = datetime.now()
|
|
last_run = self.last_run.get(collector['name'])
|
|
|
|
if last_run is None:
|
|
return True
|
|
|
|
interval = timedelta(seconds=collector['interval'])
|
|
return (now - last_run) >= interval
|
|
|
|
async def collect_and_publish(self):
|
|
"""Collect metrics from all collectors and publish them."""
|
|
for collector in self.collectors:
|
|
if not self.should_run_collector(collector):
|
|
continue
|
|
|
|
try:
|
|
data = collector['module'].collect_metrics()
|
|
await self.process_collector_data(data)
|
|
self.last_run[collector['name']] = datetime.now()
|
|
print(f"Updated {collector['name']} metrics")
|
|
except Exception as e:
|
|
print(f"Error collecting metrics from {collector['name']}: {e}")
|
|
|
|
async def connect(self):
|
|
"""Connect to MQTT broker using paho-mqtt and wait briefly for on_connect."""
|
|
try:
|
|
self.client = self._setup_mqtt_client()
|
|
self.client.connect(self.config['mqtt']['host'], self.config['mqtt']['port'])
|
|
self.client.loop_start()
|
|
# Wait up to 5 seconds for on_connect to fire
|
|
for _ in range(50):
|
|
if self.connected:
|
|
break
|
|
await asyncio.sleep(0.1)
|
|
except Exception as e:
|
|
print(f"Error connecting to MQTT broker: {e}")
|
|
sys.exit(1)
|
|
|
|
async def disconnect(self):
|
|
"""Disconnect from MQTT broker (paho)."""
|
|
try:
|
|
if self.client:
|
|
self.client.loop_stop()
|
|
self.client.disconnect()
|
|
finally:
|
|
pass
|
|
|
|
async def async_main():
|
|
"""Async main function."""
|
|
system2mqtt = System2MQTT()
|
|
await system2mqtt.connect()
|
|
|
|
try:
|
|
# Initial collection
|
|
await system2mqtt.collect_and_publish()
|
|
|
|
# Main loop - check every second if any collector needs to run
|
|
while True:
|
|
await system2mqtt.collect_and_publish()
|
|
await asyncio.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down...")
|
|
finally:
|
|
await system2mqtt.disconnect()
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(async_main()) |