system2mqtt/collectors/cpu_temperature.py

124 lines
4.4 KiB
Python

#!/usr/bin/env python3
import os
import platform
import subprocess
import glob
from typing import Dict, Any, Optional, List, Tuple
import sys
# Default update interval in seconds
DEFAULT_INTERVAL = 30 # 30 seconds
def get_temperature_linux_coretemp() -> List[Tuple[float, str]]:
"""Get CPU temperatures using coretemp module."""
temps = []
try:
for hwmon_dir in glob.glob('/sys/class/hwmon/hwmon*'):
try:
with open(os.path.join(hwmon_dir, 'name'), 'r') as f:
if f.read().strip() == 'coretemp':
# Found coretemp, get all temperatures
for temp_file in glob.glob(os.path.join(hwmon_dir, 'temp*_input')):
try:
with open(temp_file, 'r') as tf:
temp = float(tf.read().strip()) / 1000.0
# Get label if available
label = "Package"
label_file = temp_file.replace('_input', '_label')
if os.path.exists(label_file):
with open(label_file, 'r') as lf:
label = lf.read().strip()
temps.append((temp, label))
except (FileNotFoundError, ValueError):
continue
except (FileNotFoundError, ValueError):
continue
except Exception:
pass
return temps
def get_temperature_linux_thermal() -> List[Tuple[float, str]]:
"""Get CPU temperatures using thermal zones."""
temps = []
try:
for thermal_dir in glob.glob('/sys/class/thermal/thermal_zone*'):
try:
with open(os.path.join(thermal_dir, 'type'), 'r') as f:
zone_type = f.read().strip()
if 'cpu' in zone_type.lower():
with open(os.path.join(thermal_dir, 'temp'), 'r') as tf:
temp = float(tf.read().strip()) / 1000.0
temps.append((temp, zone_type))
except (FileNotFoundError, ValueError):
continue
except Exception:
pass
return temps
def get_temperature_freebsd() -> List[Tuple[float, str]]:
"""Get CPU temperatures on FreeBSD systems."""
temps = []
try:
# Get number of CPUs
cpu_count = int(subprocess.check_output(['sysctl', '-n', 'hw.ncpu']).decode().strip())
# Get temperature for each CPU
for cpu in range(cpu_count):
try:
temp = subprocess.check_output(['sysctl', '-n', f'dev.cpu.{cpu}.temperature']).decode().strip()
temp_value = float(temp)
temps.append((temp_value, f'CPU {cpu}'))
except (subprocess.SubprocessError, ValueError):
continue
except (subprocess.SubprocessError, ValueError):
pass
return temps
def collect_metrics() -> Dict[str, Any]:
"""Collect CPU temperature metrics."""
metrics = {
"entities": []
}
temps = []
# Get CPU temperatures based on OS
if sys.platform.startswith('linux'):
# Try coretemp first (most reliable)
temps.extend(get_temperature_linux_coretemp())
# If no coretemp found, try thermal zones
if not temps:
temps.extend(get_temperature_linux_thermal())
elif sys.platform.startswith('freebsd'):
temps.extend(get_temperature_freebsd())
# Add temperature sensors
if temps:
# Only keep package temperatures
package_temps = [(t, l) for t, l in temps if 'Package' in l]
# Add package temperature
for temp, label in package_temps:
metrics['entities'].append({
'sensor_id': 'cpu_temperature',
'name': 'CPU Temperature',
'value': str(round(temp, 1)),
'state_class': 'measurement',
'unit_of_measurement': '°C',
'device_class': 'temperature',
'icon': 'mdi:thermometer',
'attributes': {
'friendly_name': 'CPU Temperature',
'source': 'coretemp'
}
})
return metrics
if __name__ == "__main__":
# Example usage
metrics = collect_metrics()
print(metrics)