Files
system2mqtt/system2mqtt.py

368 lines
16 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import json
import os
import socket
import sys
import time
import yaml
import platform
import asyncio
import paho.mqtt.client as mqtt
from typing import Dict, Any, List
import importlib.util
import glob
from datetime import datetime, timedelta
from pathlib import Path
# Default configuration values used when config file or env vars are missing
CONFIG_DEFAULTS = {
'mqtt': {
'host': 'localhost',
'port': 1883,
'username': None,
'password': None,
'client_id': 'system2mqtt_{hostname}',
'discovery_prefix': 'homeassistant'
},
'collectors': {
'default_interval': 60,
'intervals': {}
}
}
class System2MQTT:
@staticmethod
def _timestamp() -> str:
"""Get formatted timestamp for logging."""
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def __init__(self, config_path: str = None):
self.config = self._load_config(config_path)
self.hostname = socket.gethostname()
self.client = None # paho MQTT client initialized in connect()
self.connected = False
self.device_info = self._get_device_info()
self.collectors = self._load_collectors()
self.last_run = {} # Speichert den Zeitpunkt des letzten Laufs für jeden Sammler
def _get_default_config_path(self) -> str:
"""Get default config path following XDG Base Directory Specification.
Returns path in order of preference:
1. $XDG_CONFIG_HOME/system2mqtt/config.yaml
2. ~/.config/system2mqtt/config.yaml (if XDG_CONFIG_HOME not set)
"""
xdg_config_home = os.environ.get('XDG_CONFIG_HOME')
if xdg_config_home:
config_dir = Path(xdg_config_home) / 'system2mqtt'
else:
config_dir = Path.home() / '.config' / 'system2mqtt'
return str(config_dir / 'config.yaml')
def _load_config(self, config_path: str = None) -> Dict[str, Any]:
"""Load configuration from YAML file, apply defaults and environment overrides.
Precedence: CONFIG_DEFAULTS < config file < environment variables
Config path precedence: SYSTEM2MQTT_CONFIG env var > parameter > XDG location
"""
# Determine config path with precedence:
# 1. SYSTEM2MQTT_CONFIG environment variable
# 2. Provided config_path parameter
# 3. XDG Base Directory Specification location
env_path = os.environ.get('SYSTEM2MQTT_CONFIG')
if env_path:
config_path = env_path
elif config_path is None:
config_path = self._get_default_config_path()
config = {}
# Start with defaults
config.update(CONFIG_DEFAULTS)
# Try loading YAML if present
if os.path.exists(config_path):
try:
with open(config_path, 'r') as f:
loaded = yaml.safe_load(f) or {}
# Deep merge loaded config into defaults (shallow merge is enough for our shape)
for k, v in loaded.items():
if isinstance(v, dict) and k in config:
config[k].update(v)
else:
config[k] = v
except Exception as e:
print(f"⚠️ [{self._timestamp()}] WARNING: Failed to load config file '{config_path}': {e}")
print(f" [{self._timestamp()}] INFO: Proceeding with defaults and environment overrides")
else:
print(f" [{self._timestamp()}] INFO: Config file '{config_path}' not found")
print(f" [{self._timestamp()}] INFO: Using defaults and environment variables")
# Ensure necessary sub-keys exist
config.setdefault('mqtt', CONFIG_DEFAULTS['mqtt'].copy())
config.setdefault('collectors', CONFIG_DEFAULTS['collectors'].copy())
config['collectors'].setdefault('intervals', {})
# Apply environment variable overrides
self._merge_env_overrides(config)
# Log loaded config path
print(f" [{self._timestamp()}] INFO: Loaded configuration from '{config_path}'")
return config
def _merge_env_overrides(self, config: Dict[str, Any]):
"""Merge environment variable overrides into the config dict.
Recognized env vars (examples): MQTT_HOST, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD,
MQTT_CLIENT_ID, MQTT_DISCOVERY_PREFIX, COLLECTORS_DEFAULT_INTERVAL, COLLECTOR_<NAME>_INTERVAL
"""
# MQTT overrides
if 'MQTT_HOST' in os.environ:
config['mqtt']['host'] = os.environ['MQTT_HOST']
if 'MQTT_PORT' in os.environ:
try:
config['mqtt']['port'] = int(os.environ['MQTT_PORT'])
except ValueError:
print(f"⚠️ [{self._timestamp()}] WARNING: MQTT_PORT environment variable is not an integer; ignoring override")
if 'MQTT_USERNAME' in os.environ:
config['mqtt']['username'] = os.environ['MQTT_USERNAME']
if 'MQTT_PASSWORD' in os.environ:
config['mqtt']['password'] = os.environ['MQTT_PASSWORD']
if 'MQTT_CLIENT_ID' in os.environ:
config['mqtt']['client_id'] = os.environ['MQTT_CLIENT_ID']
if 'MQTT_DISCOVERY_PREFIX' in os.environ:
config['mqtt']['discovery_prefix'] = os.environ['MQTT_DISCOVERY_PREFIX']
# Collectors default interval
if 'COLLECTORS_DEFAULT_INTERVAL' in os.environ:
try:
config['collectors']['default_interval'] = int(os.environ['COLLECTORS_DEFAULT_INTERVAL'])
except ValueError:
print(f"⚠️ [{self._timestamp()}] WARNING: COLLECTORS_DEFAULT_INTERVAL is not an integer; ignoring override")
# Per-collector overrides
for key, val in os.environ.items():
if key.startswith('COLLECTOR_') and key.endswith('_INTERVAL'):
# Example: COLLECTOR_system_metrics_INTERVAL
parts = key.split('_')
if len(parts) >= 3:
name = '_'.join(parts[1:-1])
try:
config['collectors']['intervals'][name] = int(val)
except ValueError:
print(f"⚠️ [{self._timestamp()}] WARNING: Collector interval '{key}' must be an integer; ignoring")
def _setup_mqtt_client(self) -> mqtt.Client:
"""Setup paho-mqtt client with configuration (callback API v2 when available)."""
client_id = self.config['mqtt'].get('client_id', 'system2mqtt_{hostname}').format(hostname=self.hostname)
# Prefer callback API v2 to avoid deprecation warnings; fall back if older paho
try:
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
except TypeError:
client = mqtt.Client(client_id=client_id)
username = self.config['mqtt'].get('username')
password = self.config['mqtt'].get('password')
if username or password:
client.username_pw_set(username, password)
client.on_connect = self._on_connect
client.on_disconnect = self._on_disconnect
return client
def _on_connect(self, client, userdata, flags, rc, properties=None):
"""Callback when connected to broker (paho)."""
try:
rc_val = int(rc)
except Exception:
rc_val = 0
if rc_val == 0:
mqtt_host = self.config['mqtt']['host']
mqtt_port = self.config['mqtt']['port']
print(f"✓ [{self._timestamp()}] SUCCESS: Connected to MQTT broker at {mqtt_host}:{mqtt_port}")
self.connected = True
else:
mqtt_host = self.config['mqtt']['host']
mqtt_port = self.config['mqtt']['port']
print(f"✗ [{self._timestamp()}] ERROR: Failed to connect to MQTT broker at {mqtt_host}:{mqtt_port} (code: {rc_val})")
self.connected = False
def _on_disconnect(self, client, userdata, rc, reason_code=None, properties=None):
"""Callback when disconnected (paho v2)."""
mqtt_host = self.config['mqtt']['host']
mqtt_port = self.config['mqtt']['port']
print(f"⚠️ [{self._timestamp()}] INFO: Disconnected from MQTT broker at {mqtt_host}:{mqtt_port}")
self.connected = False
def _get_device_info(self) -> Dict[str, Any]:
"""Get device information for Home Assistant."""
return {
"identifiers": [f"system2mqtt_{self.hostname}"],
"name": f"System {self.hostname}",
"model": platform.machine(),
"manufacturer": platform.system()
}
def _get_unique_id(self, sensor_id: str) -> str:
"""Generate unique_id from sensor_id."""
return f"system2mqtt_{self.hostname}_{sensor_id}"
def _get_state_topic(self, sensor_id: str) -> str:
"""Generate state topic from sensor_id."""
return f"system2mqtt/{self.hostname}/{sensor_id}/state"
def _load_collectors(self) -> List[Dict[str, Any]]:
"""Load all collector modules from the collectors directory."""
collectors = []
collector_dir = os.path.join(os.path.dirname(__file__), 'collectors')
# Find all Python files in the collectors directory
collector_files = glob.glob(os.path.join(collector_dir, '*.py'))
for collector_file in collector_files:
if collector_file.endswith('__init__.py'):
continue
module_name = os.path.splitext(os.path.basename(collector_file))[0]
spec = importlib.util.spec_from_file_location(module_name, collector_file)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, 'collect_metrics'):
# Get interval from config or use collector's default
default_interval = getattr(module, 'DEFAULT_INTERVAL', self.config['collectors']['default_interval'])
interval = self.config['collectors']['intervals'].get(
module_name,
default_interval
)
collectors.append({
'module': module,
'name': module_name,
'interval': interval
})
print(f"✓ [{self._timestamp()}] SUCCESS: Loaded collector '{module_name}' (update interval: {interval}s)")
return collectors
async def process_collector_data(self, data: Dict[str, Any]):
"""Process data from collectors and publish to MQTT."""
if not self.connected:
print(f"⚠️ [{self._timestamp()}] WARNING: Cannot process collector data - not connected to MQTT broker")
return
# Publish discovery messages for each entity
for entity in data['entities']:
sensor_id = entity['sensor_id']
unique_id = self._get_unique_id(sensor_id)
state_topic = self._get_state_topic(sensor_id)
discovery_topic = f"{self.config['mqtt']['discovery_prefix']}/sensor/{unique_id}/config"
attributes_topic = f"{state_topic}/attributes"
availability_topic = f"system2mqtt/{self.hostname}/status"
# Prepare discovery message
discovery_msg = {
"name": entity['name'],
"unique_id": unique_id,
"state_topic": state_topic,
"state_class": entity['state_class'],
"unit_of_measurement": entity['unit_of_measurement'],
"device_class": entity['device_class'],
"device": self.device_info,
"json_attributes_topic": attributes_topic,
"availability_topic": availability_topic,
"payload_available": "online",
"payload_not_available": "offline"
}
# Include icon if provided by the collector
if 'icon' in entity and entity['icon']:
discovery_msg["icon"] = entity['icon']
# Publish discovery message
self.client.publish(discovery_topic, json.dumps(discovery_msg), qos=0, retain=True)
# Publish availability (retained)
self.client.publish(availability_topic, "online", qos=0, retain=True)
# Publish state
self.client.publish(state_topic, str(entity['value']), qos=0, retain=True)
# Publish attributes if present
if 'attributes' in entity:
self.client.publish(attributes_topic, json.dumps(entity['attributes']), qos=0, retain=True)
def should_run_collector(self, collector: Dict[str, Any]) -> bool:
"""Check if a collector should run based on its interval."""
now = datetime.now()
last_run = self.last_run.get(collector['name'])
if last_run is None:
return True
interval = timedelta(seconds=collector['interval'])
return (now - last_run) >= interval
async def collect_and_publish(self):
"""Collect metrics from all collectors and publish them."""
for collector in self.collectors:
if not self.should_run_collector(collector):
continue
try:
data = collector['module'].collect_metrics()
entity_count = len(data.get('entities', []))
await self.process_collector_data(data)
self.last_run[collector['name']] = datetime.now()
print(f"✓ [{self._timestamp()}] SUCCESS: Updated {entity_count} metrics from '{collector['name']}'")
except Exception as e:
print(f"✗ [{self._timestamp()}] ERROR: Failed to collect metrics from '{collector['name']}': {e}")
async def connect(self):
"""Connect to MQTT broker using paho-mqtt and wait briefly for on_connect."""
try:
self.client = self._setup_mqtt_client()
self.client.connect(self.config['mqtt']['host'], self.config['mqtt']['port'])
self.client.loop_start()
# Wait up to 5 seconds for on_connect to fire
for _ in range(50):
if self.connected:
break
await asyncio.sleep(0.1)
except Exception as e:
mqtt_host = self.config['mqtt']['host']
mqtt_port = self.config['mqtt']['port']
print(f"✗ [{self._timestamp()}] FATAL: Cannot connect to MQTT broker at {mqtt_host}:{mqtt_port}: {e}")
sys.exit(1)
async def disconnect(self):
"""Disconnect from MQTT broker (paho)."""
try:
if self.client:
self.client.loop_stop()
self.client.disconnect()
finally:
pass
async def async_main():
"""Async main function."""
print(f"🚀 [{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO: Starting system2mqtt...")
system2mqtt = System2MQTT()
await system2mqtt.connect()
try:
# Initial collection
await system2mqtt.collect_and_publish()
# Main loop - check every second if any collector needs to run
while True:
await system2mqtt.collect_and_publish()
await asyncio.sleep(1)
except KeyboardInterrupt:
print(f"\n⚠️ [{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO: Received shutdown signal, stopping...")
finally:
await system2mqtt.disconnect()
print(f"✓ [{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO: Shutdown complete")
if __name__ == "__main__":
asyncio.run(async_main())