From f4a8e05b90757eabd6cd9bd71532227d05384408 Mon Sep 17 00:00:00 2001 From: = <=> Date: Sat, 17 Aug 2024 16:49:21 -0400 Subject: [PATCH] Added linux support --- .vscode/launch.json | 30 +-- commands.py | 74 ++++---- drawing.py | 428 +++++++++++++++++++++--------------------- led_system_monitor.py | 170 ++++++++--------- monitors.py | 261 +++++++++++++------------- 5 files changed, 482 insertions(+), 481 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 306f58e..948a236 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,16 +1,16 @@ -{ - // Use IntelliSense to learn about possible attributes. - // Hover to view descriptions of existing attributes. - // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - { - "name": "Python: Current File", - "type": "python", - "request": "launch", - "program": "${file}", - "console": "integratedTerminal", - "justMyCode": true - } - ] +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "justMyCode": true + } + ] } \ No newline at end of file diff --git a/commands.py b/commands.py index 955f145..8eba206 100644 --- a/commands.py +++ b/commands.py @@ -1,37 +1,37 @@ -from enum import Enum - -# https://github.com/FrameworkComputer/inputmodule-rs/blob/main/commands.md - -# Display is 9x34 wide x tall -class Commands(): - Brightness = 0x00 - Pattern = 0x01 - Bootloader = 0x02 - Sleep = 0x03 - GetSleep = 0x03 - Animate = 0x04 - GetAnimate = 0x04 - Panic = 0x05 - DrawBW = 0x06 - StageCol = 0x07 - FlushCols = 0x08 - SetText = 0x09 - StartGame = 0x10 - GameCtrl = 0x11 - GameStatus = 0x12 - SetColor = 0x13 - DisplayOn = 0x14 - InvertScreen = 0x15 - SetPxCol = 0x16 - FlushFB = 0x17 - Version = 0x20 - - -def send_command(s, command_id, parameters = None, with_response=False): - message = bytearray([0x32, 0xAC, command_id]) - if parameters: - message.extend(parameters) - s.write(message) - if with_response: - res = s.read(1) - return res +from enum import Enum + +# https://github.com/FrameworkComputer/inputmodule-rs/blob/main/commands.md + +# Display is 9x34 wide x tall +class Commands(): + Brightness = 0x00 + Pattern = 0x01 + Bootloader = 0x02 + Sleep = 0x03 + GetSleep = 0x03 + Animate = 0x04 + GetAnimate = 0x04 + Panic = 0x05 + DrawBW = 0x06 + StageCol = 0x07 + FlushCols = 0x08 + SetText = 0x09 + StartGame = 0x10 + GameCtrl = 0x11 + GameStatus = 0x12 + SetColor = 0x13 + DisplayOn = 0x14 + InvertScreen = 0x15 + SetPxCol = 0x16 + FlushFB = 0x17 + Version = 0x20 + + +def send_command(s, command_id, parameters = None, with_response=False): + message = bytearray([0x32, 0xAC, command_id]) + if parameters: + message.extend(parameters) + s.write(message) + if with_response: + res = s.read(1) + return res diff --git a/drawing.py b/drawing.py index 148efcc..c79b861 100644 --- a/drawing.py +++ b/drawing.py @@ -1,214 +1,214 @@ -# Built In Dependencies -import time -import math -import threading - -# Internal Dependencies -from commands import Commands, send_command - -# External Dependencies -import numpy as np -import serial # pyserial -from serial.tools import list_ports - - -# This table represents the 3x3 grid of LEDs to be drawn for each fill ratio -lookup_table = np.array( - [ - [ - [0, 0, 0], - [0, 0, 0], - [0, 0, 0] - ], - [ - [0, 0, 0], - [0, 1, 0], - [0, 0, 0] - ], - [ - [0, 1, 0], - [0, 1, 0], - [0, 0, 0] - ], - [ - [0, 1, 1], - [0, 1, 0], - [0, 0, 0] - ], - [ - [0, 1, 1], - [0, 1, 1], - [0, 0, 0] - ], - [ - [0, 1, 1], - [0, 1, 1], - [0, 0, 1] - ], - [ - [0, 1, 1], - [0, 1, 1], - [0, 1, 1] - ], - [ - [0, 1, 1], - [0, 1, 1], - [1, 1, 1] - ], - [ - [0, 1, 1], - [1, 1, 1], - [1, 1, 1] - ], - [ - [1, 1, 1], - [1, 1, 1], - [1, 1, 1] - ] - ] -) - -lightning_bolt = np.array( [[0,0,0,0,0,0,0], # 0 - [0,0,0,0,0,1,0], # 1 - [0,0,0,0,1,1,0], # 2 - [0,0,0,1,1,0,0], # 3 - [0,0,1,1,1,0,0], # 4 - [0,1,1,1,0,0,0], # 5 - [0,1,1,1,1,1,0], # 6 - [0,0,0,1,1,1,0], # 7 - [0,0,1,1,1,0,0], # 8 - [0,0,1,1,0,0,0], # 9 - [0,1,1,0,0,0,0], #10 - [0,1,0,0,0,0,0], #11 - [0,0,0,0,0,0,0]],#12 - dtype=bool).T - - -# Correct table orientation for visual orientation when drawn -for i in range(lookup_table.shape[0]): - lookup_table[i] = lookup_table[i].T - - -def spiral_index(fill_ratio): - return int(round(fill_ratio * 9.999999 - 0.5)) - -# Takes up 15 rows, 7 columns, starting at 1,1 -def draw_cpu(grid, cpu_values, fill_value): - for i, v in enumerate(cpu_values): - column_number = i % 2 - row_number = i // 2 - fill_grid = lookup_table[spiral_index(v)] - grid[1+column_number*4:4+column_number*4, 1+row_number*4:4+row_number*4] = fill_grid * fill_value - -# Takes up 2 rows, 7 columns, starting at 17,1 -def draw_memory(grid, memory_ratio, fill_value): - lit_pixels = 7 * 2 * memory_ratio - pixels_bottom = int(round(lit_pixels / 2)) - pixels_top = int(round((lit_pixels - 0.49) / 2)) - grid[1:1+pixels_top,17] = fill_value - grid[1:1+pixels_bottom,18] = fill_value - -# Takes up 13 rows, 7 columns, starting at 21,1 -def draw_battery(grid, battery_ratio, battery_plugged, fill_value, battery_low_thresh = 0.07, battery_low_flash_time = 2, charging_pulse_time = 3): - lit_pixels = int(round(13 * 7 * battery_ratio)) - pixels_base = lit_pixels // 7 - remainder = lit_pixels % 7 - if battery_ratio <= battery_low_thresh and not battery_plugged: - if time.time() % battery_low_flash_time * 2 < battery_low_flash_time: # This will flash the battery indicator if too low - return - for i in range(7): - pixels_col = pixels_base - if i < remainder: - pixels_col += 1 - grid[i+1,33-pixels_col:33] = fill_value - if battery_plugged: - pulse_amount = math.sin(time.time() / charging_pulse_time) - grid[1:8,20:33][lightning_bolt] -= np.rint(fill_value + 10 * pulse_amount).astype(int) - indices = grid[1:8,20:33] < 0 - grid[1:8,20:33][indices] = -grid[1:8,20:33][indices] - - -def draw_borders_left(grid, border_value): - # Fill in the borders - # Cpu vertical partitions - grid[4, :16] = border_value - # Cpu horizontal partitions - grid[:, 4] = border_value - grid[:, 8] = border_value - grid[:, 12] = border_value - grid[:, 16] = border_value - # Memory bottom partition - grid[:, 19] = border_value - # Outer Edge borders - grid[:, 0] = border_value # Top - grid[0, :] = border_value # Left - grid[8, :] = border_value # Right - grid[:, 33] = border_value # Bottom - - -def draw_borders_right(grid, border_value): - # Fill in the borders - # Middle Partition borders - grid[:, 16] = border_value - grid[4, :] = border_value - # Outer Edge borders - grid[:, 0] = border_value # Top - grid[0, :] = border_value # Left - grid[8, :] = border_value # Right - grid[:, 33] = border_value # Bottom - - -def draw_bar(grid, bar_ratio, bar_value, bar_x_offset = 1,draw_at_bottom = True): - bar_width = 3 - bar_height = 16 - lit_pixels = int(round(bar_height * bar_width * bar_ratio)) - pixels_base = lit_pixels // bar_width - remainder = lit_pixels % bar_width - for i in range(bar_width): - pixels_col = pixels_base - if i < remainder: - pixels_col += 1 - if draw_at_bottom: - grid[bar_x_offset+i,33-pixels_col:33] = bar_value - else: - grid[bar_x_offset+i,1:1+pixels_col] = bar_value - - -def draw_to_LEDs(s, grid): - for i in range(grid.shape[0]): - params = bytearray([i]) + bytearray(grid[i, :].tolist()) - send_command(s, Commands.StageCol, parameters=params) - send_command(s, Commands.FlushCols) - - -def init_device(location = "1-4.2"): - try: - # VID = 1234 - # PID = 5678 - device_list = list_ports.comports() - for device in device_list: - if device.location == location: - s = serial.Serial(device.device, 115200) - return s - except Exception as e: - print(e) - - -class DrawingThread(threading.Thread): - def __init__(self, serial_port, input_queue): - super().__init__() - self.daemon = True - self.serial_port = init_device(serial_port) - self.input_queue = input_queue - - def run(self): - while True: - try: - grid = self.input_queue.get() - draw_to_LEDs(self.serial_port, grid) - except Exception as e: - print(f"Error in DrawingThread: {e}") - del self.serial_port - time.sleep(1.0) - self.serial_port = init_device(self.serial_port) - +# Built In Dependencies +import time +import math +import threading + +# Internal Dependencies +from commands import Commands, send_command + +# External Dependencies +import numpy as np +import serial # pyserial +from serial.tools import list_ports + + +# This table represents the 3x3 grid of LEDs to be drawn for each fill ratio +lookup_table = np.array( + [ + [ + [0, 0, 0], + [0, 0, 0], + [0, 0, 0] + ], + [ + [0, 0, 0], + [0, 1, 0], + [0, 0, 0] + ], + [ + [0, 1, 0], + [0, 1, 0], + [0, 0, 0] + ], + [ + [0, 1, 1], + [0, 1, 0], + [0, 0, 0] + ], + [ + [0, 1, 1], + [0, 1, 1], + [0, 0, 0] + ], + [ + [0, 1, 1], + [0, 1, 1], + [0, 0, 1] + ], + [ + [0, 1, 1], + [0, 1, 1], + [0, 1, 1] + ], + [ + [0, 1, 1], + [0, 1, 1], + [1, 1, 1] + ], + [ + [0, 1, 1], + [1, 1, 1], + [1, 1, 1] + ], + [ + [1, 1, 1], + [1, 1, 1], + [1, 1, 1] + ] + ] +) + +lightning_bolt = np.array( [[0,0,0,0,0,0,0], # 0 + [0,0,0,0,0,1,0], # 1 + [0,0,0,0,1,1,0], # 2 + [0,0,0,1,1,0,0], # 3 + [0,0,1,1,1,0,0], # 4 + [0,1,1,1,0,0,0], # 5 + [0,1,1,1,1,1,0], # 6 + [0,0,0,1,1,1,0], # 7 + [0,0,1,1,1,0,0], # 8 + [0,0,1,1,0,0,0], # 9 + [0,1,1,0,0,0,0], #10 + [0,1,0,0,0,0,0], #11 + [0,0,0,0,0,0,0]],#12 + dtype=bool).T + + +# Correct table orientation for visual orientation when drawn +for i in range(lookup_table.shape[0]): + lookup_table[i] = lookup_table[i].T + + +def spiral_index(fill_ratio): + return int(round(fill_ratio * 9.999999 - 0.5)) + +# Takes up 15 rows, 7 columns, starting at 1,1 +def draw_cpu(grid, cpu_values, fill_value): + for i, v in enumerate(cpu_values): + column_number = i % 2 + row_number = i // 2 + fill_grid = lookup_table[spiral_index(v)] + grid[1+column_number*4:4+column_number*4, 1+row_number*4:4+row_number*4] = fill_grid * fill_value + +# Takes up 2 rows, 7 columns, starting at 17,1 +def draw_memory(grid, memory_ratio, fill_value): + lit_pixels = 7 * 2 * memory_ratio + pixels_bottom = int(round(lit_pixels / 2)) + pixels_top = int(round((lit_pixels - 0.49) / 2)) + grid[1:1+pixels_top,17] = fill_value + grid[1:1+pixels_bottom,18] = fill_value + +# Takes up 13 rows, 7 columns, starting at 21,1 +def draw_battery(grid, battery_ratio, battery_plugged, fill_value, battery_low_thresh = 0.07, battery_low_flash_time = 2, charging_pulse_time = 3): + lit_pixels = int(round(13 * 7 * battery_ratio)) + pixels_base = lit_pixels // 7 + remainder = lit_pixels % 7 + if battery_ratio <= battery_low_thresh and not battery_plugged: + if time.time() % battery_low_flash_time * 2 < battery_low_flash_time: # This will flash the battery indicator if too low + return + for i in range(7): + pixels_col = pixels_base + if i < remainder: + pixels_col += 1 + grid[i+1,33-pixels_col:33] = fill_value + if battery_plugged: + pulse_amount = math.sin(time.time() / charging_pulse_time) + grid[1:8,20:33][lightning_bolt] -= np.rint(fill_value + 10 * pulse_amount).astype(int) + indices = grid[1:8,20:33] < 0 + grid[1:8,20:33][indices] = -grid[1:8,20:33][indices] + + +def draw_borders_left(grid, border_value): + # Fill in the borders + # Cpu vertical partitions + grid[4, :16] = border_value + # Cpu horizontal partitions + grid[:, 4] = border_value + grid[:, 8] = border_value + grid[:, 12] = border_value + grid[:, 16] = border_value + # Memory bottom partition + grid[:, 19] = border_value + # Outer Edge borders + grid[:, 0] = border_value # Top + grid[0, :] = border_value # Left + grid[8, :] = border_value # Right + grid[:, 33] = border_value # Bottom + + +def draw_borders_right(grid, border_value): + # Fill in the borders + # Middle Partition borders + grid[:, 16] = border_value + grid[4, :] = border_value + # Outer Edge borders + grid[:, 0] = border_value # Top + grid[0, :] = border_value # Left + grid[8, :] = border_value # Right + grid[:, 33] = border_value # Bottom + + +def draw_bar(grid, bar_ratio, bar_value, bar_x_offset = 1,draw_at_bottom = True): + bar_width = 3 + bar_height = 16 + lit_pixels = int(round(bar_height * bar_width * bar_ratio)) + pixels_base = lit_pixels // bar_width + remainder = lit_pixels % bar_width + for i in range(bar_width): + pixels_col = pixels_base + if i < remainder: + pixels_col += 1 + if draw_at_bottom: + grid[bar_x_offset+i,33-pixels_col:33] = bar_value + else: + grid[bar_x_offset+i,1:1+pixels_col] = bar_value + + +def draw_to_LEDs(s, grid): + for i in range(grid.shape[0]): + params = bytearray([i]) + bytearray(grid[i, :].tolist()) + send_command(s, Commands.StageCol, parameters=params) + send_command(s, Commands.FlushCols) + + +def init_device(location = "1-4.2"): + try: + # VID = 1234 + # PID = 5678 + device_list = list_ports.comports() + for device in device_list: + if device.location and device.location.startswith(location): + s = serial.Serial(device.device, 115200) + return s + except Exception as e: + print(e) + + +class DrawingThread(threading.Thread): + def __init__(self, serial_port, input_queue): + super().__init__() + self.daemon = True + self.serial_port = init_device(serial_port) + self.input_queue = input_queue + + def run(self): + while True: + try: + grid = self.input_queue.get() + draw_to_LEDs(self.serial_port, grid) + except Exception as e: + print(f"Error in DrawingThread: {e}") + del self.serial_port + time.sleep(1.0) + self.serial_port = init_device(self.serial_port) + diff --git a/led_system_monitor.py b/led_system_monitor.py index 88ff7df..be34761 100644 --- a/led_system_monitor.py +++ b/led_system_monitor.py @@ -1,86 +1,86 @@ -# Built In Dependencies -import time -import queue - -# Internal Dependencies -from drawing import draw_cpu, draw_memory, draw_battery, draw_borders_left, draw_bar, draw_borders_right, DrawingThread -from monitors import CPUMonitor, MemoryMonitor, BatteryMonitor, DiskMonitor, NetworkMonitor, get_monitor_brightness - -# External Dependencies -try: - # These are used in later scripts, but imported here to test if missing - import serial # pyserial - from serial.tools import list_ports - import numpy as np -except ImportError: - import pip - for dependency in ["numpy", "pyserial"]: - pip.main(['install', '--user', dependency]) - import numpy as np - -# print(sbc.get_brightness()) - - -if __name__ == "__main__": - # Left LED Matrix location: "1-4.2" - # Right LED Matrix location: "1-3.3" - - # Set up monitors and serial for left LED Matrix - min_background_brightness = 8 - max_background_brightness = 35 - min_foreground_brightness = 30 - max_foreground_brightness = 160 - - cpu_monitor = CPUMonitor() - memory_monitor = MemoryMonitor() - battery_monitor = BatteryMonitor() - - left_drawing_queue = queue.Queue(2) - left_drawing_thread = DrawingThread("1-4.2", left_drawing_queue) - left_drawing_thread.start() - - - # Set up monitors and serial for right LED Matrix - disk_monitor = DiskMonitor() - network_monitor = NetworkMonitor() - - right_drawing_queue = queue.Queue(2) - right_drawing_thread = DrawingThread("1-3.3", right_drawing_queue) - right_drawing_thread.start() - - while True: - try: - screen_brightness = get_monitor_brightness() - background_value = int(screen_brightness / 100 * (max_background_brightness - min_background_brightness) + min_background_brightness) - foreground_value = int(screen_brightness / 100 * (max_foreground_brightness - min_foreground_brightness) + min_foreground_brightness) - - left_start_time = time.time() - # Draw to left LED Matrix - last_cpu_values = cpu_monitor.get() - last_memory_values = memory_monitor.get() - last_battery_values = battery_monitor.get() - - grid = np.zeros((9,34), dtype = int) - draw_cpu(grid, last_cpu_values, foreground_value) - draw_memory(grid, last_memory_values, foreground_value) - draw_battery(grid, last_battery_values[0], last_battery_values[1], foreground_value) - draw_borders_left(grid, background_value) - left_drawing_queue.put(grid) - - # Draw to right LED Matrix - last_disk_read, last_disk_write = disk_monitor.get() - last_network_upload, last_network_download = network_monitor.get() - - grid = np.zeros((9,34), dtype = int) - draw_bar(grid, last_disk_read, foreground_value, bar_x_offset=1, draw_at_bottom=False) # Read - draw_bar(grid, last_disk_write, foreground_value, bar_x_offset=1, draw_at_bottom=True) # Write - draw_bar(grid, last_network_upload, foreground_value, bar_x_offset=5, draw_at_bottom=False) # Upload - draw_bar(grid, last_network_download, foreground_value, bar_x_offset=5, draw_at_bottom=True) # Download - draw_borders_right(grid, background_value) - right_drawing_queue.put(grid) - except Exception as e: - import traceback - print(f"Error in main loop: {e}") - traceback.print_exc() - time.sleep(1.0) +# Built In Dependencies +import time +import queue + +# Internal Dependencies +from drawing import draw_cpu, draw_memory, draw_battery, draw_borders_left, draw_bar, draw_borders_right, DrawingThread +from monitors import CPUMonitor, MemoryMonitor, BatteryMonitor, DiskMonitor, NetworkMonitor, get_monitor_brightness + +# External Dependencies +try: + # These are used in later scripts, but imported here to test if missing + import serial # pyserial + from serial.tools import list_ports + import numpy as np +except ImportError: + import pip + for dependency in ["numpy", "pyserial"]: + pip.main(['install', '--user', dependency]) + import numpy as np + +# print(sbc.get_brightness()) + + +if __name__ == "__main__": + # Left LED Matrix location: "1-4.2" + # Right LED Matrix location: "1-3.3" + + # Set up monitors and serial for left LED Matrix + min_background_brightness = 8 + max_background_brightness = 35 + min_foreground_brightness = 30 + max_foreground_brightness = 160 + + cpu_monitor = CPUMonitor() + memory_monitor = MemoryMonitor() + battery_monitor = BatteryMonitor() + + left_drawing_queue = queue.Queue(2) + left_drawing_thread = DrawingThread("1-4.2", left_drawing_queue) + left_drawing_thread.start() + + + # Set up monitors and serial for right LED Matrix + disk_monitor = DiskMonitor() + network_monitor = NetworkMonitor() + + right_drawing_queue = queue.Queue(2) + right_drawing_thread = DrawingThread("1-3.3", right_drawing_queue) + right_drawing_thread.start() + + while True: + try: + screen_brightness = get_monitor_brightness() + background_value = int(screen_brightness * (max_background_brightness - min_background_brightness) + min_background_brightness) + foreground_value = int(screen_brightness * (max_foreground_brightness - min_foreground_brightness) + min_foreground_brightness) + + left_start_time = time.time() + # Draw to left LED Matrix + last_cpu_values = cpu_monitor.get() + last_memory_values = memory_monitor.get() + last_battery_values = battery_monitor.get() + + grid = np.zeros((9,34), dtype = int) + draw_cpu(grid, last_cpu_values, foreground_value) + draw_memory(grid, last_memory_values, foreground_value) + draw_battery(grid, last_battery_values[0], last_battery_values[1], foreground_value) + draw_borders_left(grid, background_value) + left_drawing_queue.put(grid) + + # Draw to right LED Matrix + last_disk_read, last_disk_write = disk_monitor.get() + last_network_upload, last_network_download = network_monitor.get() + + grid = np.zeros((9,34), dtype = int) + draw_bar(grid, last_disk_read, foreground_value, bar_x_offset=1, draw_at_bottom=False) # Read + draw_bar(grid, last_disk_write, foreground_value, bar_x_offset=1, draw_at_bottom=True) # Write + draw_bar(grid, last_network_upload, foreground_value, bar_x_offset=5, draw_at_bottom=False) # Upload + draw_bar(grid, last_network_download, foreground_value, bar_x_offset=5, draw_at_bottom=True) # Download + draw_borders_right(grid, background_value) + right_drawing_queue.put(grid) + except Exception as e: + import traceback + print(f"Error in main loop: {e}") + traceback.print_exc() + time.sleep(1.0) time.sleep(0.1) \ No newline at end of file diff --git a/monitors.py b/monitors.py index f7e09be..ad24d9c 100644 --- a/monitors.py +++ b/monitors.py @@ -1,130 +1,131 @@ -# Built In Dependencies -import time -import psutil -import os - -if os.name == 'nt': - import wmi -else: - raise Exception("This script is not supported on this OS") - -class DiskMonitor: - def __init__(self, hysterisis_time = 20): - self.read_usage_history = [] - self.write_usage_history = [] - self.history_times = [] - self.highest_read_rate = 0.00001 - self.highest_write_rate = 0.00001 - self.max_history_size = hysterisis_time - - def get(self): - try: - disk_io = psutil.disk_io_counters() - read_usage = disk_io.read_bytes - write_usage = disk_io.write_bytes - self.read_usage_history.append(read_usage) - self.write_usage_history.append(write_usage) - self.history_times.append(time.time()) - if len(self.read_usage_history) > self.max_history_size: - self.read_usage_history = self.read_usage_history[-self.max_history_size:] - self.write_usage_history = self.write_usage_history[-self.max_history_size:] - self.history_times = self.history_times[-self.max_history_size:] - - read_diff = self.read_usage_history[-1] - self.read_usage_history[0] - write_diff = self.write_usage_history[-1] - self.write_usage_history[0] - time_diff = self.history_times[-1] - self.history_times[0] - read_rate = read_diff / time_diff - write_rate = write_diff / time_diff - self.highest_read_rate = max(self.highest_read_rate, read_rate) - self.highest_write_rate = max(self.highest_write_rate, write_rate) - read_percent = min(1.0, read_rate / self.highest_read_rate) - write_percent = min(1.0, write_rate / self.highest_write_rate) - return read_percent, write_percent - except Exception as e: - print(f"Error in DiskMonitor.get(): {e}") - return 0, 0 - -class NetworkMonitor: - def __init__(self, hysterisis_time = 20): - self.sent_usage_history = [] - self.recv_usage_history = [] - self.history_times = [] - self.highest_sent_rate = 0.00001 - self.highest_recv_rate = 0.00001 - self.max_history_size = hysterisis_time - - def get(self): - try: - net_io = psutil.net_io_counters() - sent_usage = net_io.bytes_sent - recv_usage = net_io.bytes_recv - self.sent_usage_history.append(sent_usage) - self.recv_usage_history.append(recv_usage) - self.history_times.append(time.time()) - if len(self.sent_usage_history) > self.max_history_size: - self.sent_usage_history = self.sent_usage_history[-self.max_history_size:] - self.recv_usage_history = self.recv_usage_history[-self.max_history_size:] - self.history_times = self.history_times[-self.max_history_size:] - - sent_diff = self.sent_usage_history[-1] - self.sent_usage_history[0] - recv_diff = self.recv_usage_history[-1] - self.recv_usage_history[0] - time_diff = self.history_times[-1] - self.history_times[0] - sent_rate = sent_diff / time_diff - recv_rate = recv_diff / time_diff - self.highest_sent_rate = max(self.highest_sent_rate, sent_rate) - self.highest_recv_rate = max(self.highest_recv_rate, recv_rate) - sent_percent = min(1.0, sent_rate / self.highest_sent_rate) - recv_percent = min(1.0, recv_rate / self.highest_recv_rate) - return sent_percent, recv_percent - except Exception as e: - print(f"Error in NetworkMonitor.get(): {e}") - return 0, 0 - -class CPUMonitor: - def __init__(self, hysterisis_time = 10): - self.cpu_count = psutil.cpu_count() // 2 # 2 logical cores per physical core - self.cpu_usage_history = [[] for _ in range(self.cpu_count)] - self.history_times = [] - self.max_history_size = hysterisis_time - - def get(self): - try: - cpu_usage = psutil.cpu_percent(percpu=True) - for i in range(self.cpu_count): - useage = 2 * max(cpu_usage[2*i], cpu_usage[2*i+1]) # Combine logical cores - if useage > 100: - useage = 100 - self.cpu_usage_history[i].append(useage / 100.0) - self.history_times.append(time.time()) - if len(self.cpu_usage_history[0]) > self.max_history_size: - for i in range(self.cpu_count): - self.cpu_usage_history[i] = self.cpu_usage_history[i][-self.max_history_size:] - self.history_times = self.history_times[-self.max_history_size:] - cpu_percentages = [sum(core_history) / self.max_history_size for core_history in self.cpu_usage_history] - # Somehow cpu_percentages can have values greater than 1 so we clamp them - return cpu_percentages - except Exception as e: - print(f"Error in CPUMonitor.get(): {e}") - return [0] * self.cpu_count - -class MemoryMonitor: - @staticmethod - def get(): - return psutil.virtual_memory().percent / 100.0 - - -class BatteryMonitor: - @staticmethod - def get(): - battery = psutil.sensors_battery() - if battery is not None: - battery_percentage = battery.percent / 100.0 - battery_plugged = battery.power_plugged - return battery_percentage, battery_plugged - - -def get_monitor_brightness(): - try: - return wmi.WMI(namespace='wmi').WmiMonitorBrightness()[0].CurrentBrightness - except: - return 50 +# Built In Dependencies +import time +import psutil +import os + +if os.name == 'nt': + import wmi + +class DiskMonitor: + def __init__(self, hysterisis_time = 20): + self.read_usage_history = [0] + self.write_usage_history = [0] + self.history_times = [0] + self.highest_read_rate = 0.00001 + self.highest_write_rate = 0.00001 + self.max_history_size = hysterisis_time + + def get(self): + try: + disk_io = psutil.disk_io_counters() + read_usage = disk_io.read_bytes + write_usage = disk_io.write_bytes + self.read_usage_history.append(read_usage) + self.write_usage_history.append(write_usage) + self.history_times.append(time.time()) + if len(self.read_usage_history) > self.max_history_size: + self.read_usage_history = self.read_usage_history[-self.max_history_size:] + self.write_usage_history = self.write_usage_history[-self.max_history_size:] + self.history_times = self.history_times[-self.max_history_size:] + + read_diff = self.read_usage_history[-1] - self.read_usage_history[0] + write_diff = self.write_usage_history[-1] - self.write_usage_history[0] + time_diff = self.history_times[-1] - self.history_times[0] + read_rate = read_diff / time_diff + write_rate = write_diff / time_diff + self.highest_read_rate = max(self.highest_read_rate, read_rate) + self.highest_write_rate = max(self.highest_write_rate, write_rate) + read_percent = min(1.0, read_rate / self.highest_read_rate) + write_percent = min(1.0, write_rate / self.highest_write_rate) + return read_percent, write_percent + except Exception as e: + print(f"Error in DiskMonitor.get(): {e}") + return 0, 0 + +class NetworkMonitor: + def __init__(self, hysterisis_time = 20): + self.sent_usage_history = [0] + self.recv_usage_history = [0] + self.history_times = [0] + self.highest_sent_rate = 0.00001 + self.highest_recv_rate = 0.00001 + self.max_history_size = hysterisis_time + + def get(self): + try: + net_io = psutil.net_io_counters() + sent_usage = net_io.bytes_sent + recv_usage = net_io.bytes_recv + self.sent_usage_history.append(sent_usage) + self.recv_usage_history.append(recv_usage) + self.history_times.append(time.time()) + if len(self.sent_usage_history) > self.max_history_size: + self.sent_usage_history = self.sent_usage_history[-self.max_history_size:] + self.recv_usage_history = self.recv_usage_history[-self.max_history_size:] + self.history_times = self.history_times[-self.max_history_size:] + + sent_diff = self.sent_usage_history[-1] - self.sent_usage_history[0] + recv_diff = self.recv_usage_history[-1] - self.recv_usage_history[0] + time_diff = self.history_times[-1] - self.history_times[0] + sent_rate = sent_diff / time_diff + recv_rate = recv_diff / time_diff + self.highest_sent_rate = max(self.highest_sent_rate, sent_rate) + self.highest_recv_rate = max(self.highest_recv_rate, recv_rate) + sent_percent = min(1.0, sent_rate / self.highest_sent_rate) + recv_percent = min(1.0, recv_rate / self.highest_recv_rate) + return sent_percent, recv_percent + except Exception as e: + print(f"Error in NetworkMonitor.get(): {e}") + return 0, 0 + +class CPUMonitor: + def __init__(self, hysterisis_time = 10): + self.cpu_count = psutil.cpu_count() // 2 # 2 logical cores per physical core + self.cpu_usage_history = [[] for _ in range(self.cpu_count)] + self.history_times = [] + self.max_history_size = hysterisis_time + + def get(self): + try: + cpu_usage = psutil.cpu_percent(percpu=True) + for i in range(self.cpu_count): + useage = 2 * max(cpu_usage[2*i], cpu_usage[2*i+1]) # Combine logical cores + if useage > 100: + useage = 100 + self.cpu_usage_history[i].append(useage / 100.0) + self.history_times.append(time.time()) + if len(self.cpu_usage_history[0]) > self.max_history_size: + for i in range(self.cpu_count): + self.cpu_usage_history[i] = self.cpu_usage_history[i][-self.max_history_size:] + self.history_times = self.history_times[-self.max_history_size:] + cpu_percentages = [sum(core_history) / self.max_history_size for core_history in self.cpu_usage_history] + # Somehow cpu_percentages can have values greater than 1 so we clamp them + return cpu_percentages + except Exception as e: + print(f"Error in CPUMonitor.get(): {e}") + return [0] * self.cpu_count + +class MemoryMonitor: + @staticmethod + def get(): + return psutil.virtual_memory().percent / 100.0 + + +class BatteryMonitor: + @staticmethod + def get(): + battery = psutil.sensors_battery() + if battery is not None: + battery_percentage = battery.percent / 100.0 + battery_plugged = battery.power_plugged + return battery_percentage, battery_plugged + + +def get_monitor_brightness(): + try: + if os.name == 'nt': + return wmi.WMI(namespace='wmi').WmiMonitorBrightness()[0].CurrentBrightness / 100.0 + else: + return int(open('/sys/class/backlight/amdgpu_bl1/brightness', 'r').read()) / 255.0 + except Exception as e: + return 1.0