131 lines
5.5 KiB
Python
131 lines
5.5 KiB
Python
# Built In Dependencies
|
|
import time
|
|
import psutil
|
|
import os
|
|
|
|
if os.name == 'nt':
|
|
import wmi
|
|
else:
|
|
raise Exception("This script is not supported on this OS")
|
|
|
|
class DiskMonitor:
|
|
def __init__(self, hysterisis_time = 20):
|
|
self.read_usage_history = []
|
|
self.write_usage_history = []
|
|
self.history_times = []
|
|
self.highest_read_rate = 0.00001
|
|
self.highest_write_rate = 0.00001
|
|
self.max_history_size = hysterisis_time
|
|
|
|
def get(self):
|
|
try:
|
|
disk_io = psutil.disk_io_counters()
|
|
read_usage = disk_io.read_bytes
|
|
write_usage = disk_io.write_bytes
|
|
self.read_usage_history.append(read_usage)
|
|
self.write_usage_history.append(write_usage)
|
|
self.history_times.append(time.time())
|
|
if len(self.read_usage_history) > self.max_history_size:
|
|
self.read_usage_history = self.read_usage_history[-self.max_history_size:]
|
|
self.write_usage_history = self.write_usage_history[-self.max_history_size:]
|
|
self.history_times = self.history_times[-self.max_history_size:]
|
|
|
|
read_diff = self.read_usage_history[-1] - self.read_usage_history[0]
|
|
write_diff = self.write_usage_history[-1] - self.write_usage_history[0]
|
|
time_diff = self.history_times[-1] - self.history_times[0]
|
|
read_rate = read_diff / time_diff
|
|
write_rate = write_diff / time_diff
|
|
self.highest_read_rate = max(self.highest_read_rate, read_rate)
|
|
self.highest_write_rate = max(self.highest_write_rate, write_rate)
|
|
read_percent = min(1.0, read_rate / self.highest_read_rate)
|
|
write_percent = min(1.0, write_rate / self.highest_write_rate)
|
|
return read_percent, write_percent
|
|
except Exception as e:
|
|
print(f"Error in DiskMonitor.get(): {e}")
|
|
return 0, 0
|
|
|
|
class NetworkMonitor:
|
|
def __init__(self, hysterisis_time = 20):
|
|
self.sent_usage_history = []
|
|
self.recv_usage_history = []
|
|
self.history_times = []
|
|
self.highest_sent_rate = 0.00001
|
|
self.highest_recv_rate = 0.00001
|
|
self.max_history_size = hysterisis_time
|
|
|
|
def get(self):
|
|
try:
|
|
net_io = psutil.net_io_counters()
|
|
sent_usage = net_io.bytes_sent
|
|
recv_usage = net_io.bytes_recv
|
|
self.sent_usage_history.append(sent_usage)
|
|
self.recv_usage_history.append(recv_usage)
|
|
self.history_times.append(time.time())
|
|
if len(self.sent_usage_history) > self.max_history_size:
|
|
self.sent_usage_history = self.sent_usage_history[-self.max_history_size:]
|
|
self.recv_usage_history = self.recv_usage_history[-self.max_history_size:]
|
|
self.history_times = self.history_times[-self.max_history_size:]
|
|
|
|
sent_diff = self.sent_usage_history[-1] - self.sent_usage_history[0]
|
|
recv_diff = self.recv_usage_history[-1] - self.recv_usage_history[0]
|
|
time_diff = self.history_times[-1] - self.history_times[0]
|
|
sent_rate = sent_diff / time_diff
|
|
recv_rate = recv_diff / time_diff
|
|
self.highest_sent_rate = max(self.highest_sent_rate, sent_rate)
|
|
self.highest_recv_rate = max(self.highest_recv_rate, recv_rate)
|
|
sent_percent = min(1.0, sent_rate / self.highest_sent_rate)
|
|
recv_percent = min(1.0, recv_rate / self.highest_recv_rate)
|
|
return sent_percent, recv_percent
|
|
except Exception as e:
|
|
print(f"Error in NetworkMonitor.get(): {e}")
|
|
return 0, 0
|
|
|
|
class CPUMonitor:
|
|
def __init__(self, hysterisis_time = 10):
|
|
self.cpu_count = psutil.cpu_count() // 2 # 2 logical cores per physical core
|
|
self.cpu_usage_history = [[] for _ in range(self.cpu_count)]
|
|
self.history_times = []
|
|
self.max_history_size = hysterisis_time
|
|
|
|
def get(self):
|
|
try:
|
|
cpu_usage = psutil.cpu_percent(percpu=True)
|
|
for i in range(self.cpu_count):
|
|
useage = 2 * max(cpu_usage[2*i], cpu_usage[2*i+1]) # Combine logical cores
|
|
if useage > 100:
|
|
useage = 100
|
|
self.cpu_usage_history[i].append(useage / 100.0)
|
|
self.history_times.append(time.time())
|
|
if len(self.cpu_usage_history[0]) > self.max_history_size:
|
|
for i in range(self.cpu_count):
|
|
self.cpu_usage_history[i] = self.cpu_usage_history[i][-self.max_history_size:]
|
|
self.history_times = self.history_times[-self.max_history_size:]
|
|
cpu_percentages = [sum(core_history) / self.max_history_size for core_history in self.cpu_usage_history]
|
|
# Somehow cpu_percentages can have values greater than 1 so we clamp them
|
|
return cpu_percentages
|
|
except Exception as e:
|
|
print(f"Error in CPUMonitor.get(): {e}")
|
|
return [0] * self.cpu_count
|
|
|
|
class MemoryMonitor:
|
|
@staticmethod
|
|
def get():
|
|
return psutil.virtual_memory().percent / 100.0
|
|
|
|
|
|
class BatteryMonitor:
|
|
@staticmethod
|
|
def get():
|
|
battery = psutil.sensors_battery()
|
|
if battery is not None:
|
|
battery_percentage = battery.percent / 100.0
|
|
battery_plugged = battery.power_plugged
|
|
return battery_percentage, battery_plugged
|
|
|
|
|
|
def get_monitor_brightness():
|
|
try:
|
|
return wmi.WMI(namespace='wmi').WmiMonitorBrightness()[0].CurrentBrightness
|
|
except:
|
|
return 50
|