Added linux support

This commit is contained in:
= 2024-08-17 16:49:21 -04:00
parent ecbb5e8ede
commit f4a8e05b90
5 changed files with 482 additions and 481 deletions

30
.vscode/launch.json vendored
View File

@ -1,16 +1,16 @@
{ {
// Use IntelliSense to learn about possible attributes. // Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes. // Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0", "version": "0.2.0",
"configurations": [ "configurations": [
{ {
"name": "Python: Current File", "name": "Python: Current File",
"type": "python", "type": "python",
"request": "launch", "request": "launch",
"program": "${file}", "program": "${file}",
"console": "integratedTerminal", "console": "integratedTerminal",
"justMyCode": true "justMyCode": true
} }
] ]
} }

View File

@ -1,37 +1,37 @@
from enum import Enum from enum import Enum
# https://github.com/FrameworkComputer/inputmodule-rs/blob/main/commands.md # https://github.com/FrameworkComputer/inputmodule-rs/blob/main/commands.md
# Display is 9x34 wide x tall # Display is 9x34 wide x tall
class Commands(): class Commands():
Brightness = 0x00 Brightness = 0x00
Pattern = 0x01 Pattern = 0x01
Bootloader = 0x02 Bootloader = 0x02
Sleep = 0x03 Sleep = 0x03
GetSleep = 0x03 GetSleep = 0x03
Animate = 0x04 Animate = 0x04
GetAnimate = 0x04 GetAnimate = 0x04
Panic = 0x05 Panic = 0x05
DrawBW = 0x06 DrawBW = 0x06
StageCol = 0x07 StageCol = 0x07
FlushCols = 0x08 FlushCols = 0x08
SetText = 0x09 SetText = 0x09
StartGame = 0x10 StartGame = 0x10
GameCtrl = 0x11 GameCtrl = 0x11
GameStatus = 0x12 GameStatus = 0x12
SetColor = 0x13 SetColor = 0x13
DisplayOn = 0x14 DisplayOn = 0x14
InvertScreen = 0x15 InvertScreen = 0x15
SetPxCol = 0x16 SetPxCol = 0x16
FlushFB = 0x17 FlushFB = 0x17
Version = 0x20 Version = 0x20
def send_command(s, command_id, parameters = None, with_response=False): def send_command(s, command_id, parameters = None, with_response=False):
message = bytearray([0x32, 0xAC, command_id]) message = bytearray([0x32, 0xAC, command_id])
if parameters: if parameters:
message.extend(parameters) message.extend(parameters)
s.write(message) s.write(message)
if with_response: if with_response:
res = s.read(1) res = s.read(1)
return res return res

View File

@ -1,214 +1,214 @@
# Built In Dependencies # Built In Dependencies
import time import time
import math import math
import threading import threading
# Internal Dependencies # Internal Dependencies
from commands import Commands, send_command from commands import Commands, send_command
# External Dependencies # External Dependencies
import numpy as np import numpy as np
import serial # pyserial import serial # pyserial
from serial.tools import list_ports from serial.tools import list_ports
# This table represents the 3x3 grid of LEDs to be drawn for each fill ratio # This table represents the 3x3 grid of LEDs to be drawn for each fill ratio
lookup_table = np.array( lookup_table = np.array(
[ [
[ [
[0, 0, 0], [0, 0, 0],
[0, 0, 0], [0, 0, 0],
[0, 0, 0] [0, 0, 0]
], ],
[ [
[0, 0, 0], [0, 0, 0],
[0, 1, 0], [0, 1, 0],
[0, 0, 0] [0, 0, 0]
], ],
[ [
[0, 1, 0], [0, 1, 0],
[0, 1, 0], [0, 1, 0],
[0, 0, 0] [0, 0, 0]
], ],
[ [
[0, 1, 1], [0, 1, 1],
[0, 1, 0], [0, 1, 0],
[0, 0, 0] [0, 0, 0]
], ],
[ [
[0, 1, 1], [0, 1, 1],
[0, 1, 1], [0, 1, 1],
[0, 0, 0] [0, 0, 0]
], ],
[ [
[0, 1, 1], [0, 1, 1],
[0, 1, 1], [0, 1, 1],
[0, 0, 1] [0, 0, 1]
], ],
[ [
[0, 1, 1], [0, 1, 1],
[0, 1, 1], [0, 1, 1],
[0, 1, 1] [0, 1, 1]
], ],
[ [
[0, 1, 1], [0, 1, 1],
[0, 1, 1], [0, 1, 1],
[1, 1, 1] [1, 1, 1]
], ],
[ [
[0, 1, 1], [0, 1, 1],
[1, 1, 1], [1, 1, 1],
[1, 1, 1] [1, 1, 1]
], ],
[ [
[1, 1, 1], [1, 1, 1],
[1, 1, 1], [1, 1, 1],
[1, 1, 1] [1, 1, 1]
] ]
] ]
) )
lightning_bolt = np.array( [[0,0,0,0,0,0,0], # 0 lightning_bolt = np.array( [[0,0,0,0,0,0,0], # 0
[0,0,0,0,0,1,0], # 1 [0,0,0,0,0,1,0], # 1
[0,0,0,0,1,1,0], # 2 [0,0,0,0,1,1,0], # 2
[0,0,0,1,1,0,0], # 3 [0,0,0,1,1,0,0], # 3
[0,0,1,1,1,0,0], # 4 [0,0,1,1,1,0,0], # 4
[0,1,1,1,0,0,0], # 5 [0,1,1,1,0,0,0], # 5
[0,1,1,1,1,1,0], # 6 [0,1,1,1,1,1,0], # 6
[0,0,0,1,1,1,0], # 7 [0,0,0,1,1,1,0], # 7
[0,0,1,1,1,0,0], # 8 [0,0,1,1,1,0,0], # 8
[0,0,1,1,0,0,0], # 9 [0,0,1,1,0,0,0], # 9
[0,1,1,0,0,0,0], #10 [0,1,1,0,0,0,0], #10
[0,1,0,0,0,0,0], #11 [0,1,0,0,0,0,0], #11
[0,0,0,0,0,0,0]],#12 [0,0,0,0,0,0,0]],#12
dtype=bool).T dtype=bool).T
# Correct table orientation for visual orientation when drawn # Correct table orientation for visual orientation when drawn
for i in range(lookup_table.shape[0]): for i in range(lookup_table.shape[0]):
lookup_table[i] = lookup_table[i].T lookup_table[i] = lookup_table[i].T
def spiral_index(fill_ratio): def spiral_index(fill_ratio):
return int(round(fill_ratio * 9.999999 - 0.5)) return int(round(fill_ratio * 9.999999 - 0.5))
# Takes up 15 rows, 7 columns, starting at 1,1 # Takes up 15 rows, 7 columns, starting at 1,1
def draw_cpu(grid, cpu_values, fill_value): def draw_cpu(grid, cpu_values, fill_value):
for i, v in enumerate(cpu_values): for i, v in enumerate(cpu_values):
column_number = i % 2 column_number = i % 2
row_number = i // 2 row_number = i // 2
fill_grid = lookup_table[spiral_index(v)] fill_grid = lookup_table[spiral_index(v)]
grid[1+column_number*4:4+column_number*4, 1+row_number*4:4+row_number*4] = fill_grid * fill_value grid[1+column_number*4:4+column_number*4, 1+row_number*4:4+row_number*4] = fill_grid * fill_value
# Takes up 2 rows, 7 columns, starting at 17,1 # Takes up 2 rows, 7 columns, starting at 17,1
def draw_memory(grid, memory_ratio, fill_value): def draw_memory(grid, memory_ratio, fill_value):
lit_pixels = 7 * 2 * memory_ratio lit_pixels = 7 * 2 * memory_ratio
pixels_bottom = int(round(lit_pixels / 2)) pixels_bottom = int(round(lit_pixels / 2))
pixels_top = int(round((lit_pixels - 0.49) / 2)) pixels_top = int(round((lit_pixels - 0.49) / 2))
grid[1:1+pixels_top,17] = fill_value grid[1:1+pixels_top,17] = fill_value
grid[1:1+pixels_bottom,18] = fill_value grid[1:1+pixels_bottom,18] = fill_value
# Takes up 13 rows, 7 columns, starting at 21,1 # Takes up 13 rows, 7 columns, starting at 21,1
def draw_battery(grid, battery_ratio, battery_plugged, fill_value, battery_low_thresh = 0.07, battery_low_flash_time = 2, charging_pulse_time = 3): def draw_battery(grid, battery_ratio, battery_plugged, fill_value, battery_low_thresh = 0.07, battery_low_flash_time = 2, charging_pulse_time = 3):
lit_pixels = int(round(13 * 7 * battery_ratio)) lit_pixels = int(round(13 * 7 * battery_ratio))
pixels_base = lit_pixels // 7 pixels_base = lit_pixels // 7
remainder = lit_pixels % 7 remainder = lit_pixels % 7
if battery_ratio <= battery_low_thresh and not battery_plugged: if battery_ratio <= battery_low_thresh and not battery_plugged:
if time.time() % battery_low_flash_time * 2 < battery_low_flash_time: # This will flash the battery indicator if too low if time.time() % battery_low_flash_time * 2 < battery_low_flash_time: # This will flash the battery indicator if too low
return return
for i in range(7): for i in range(7):
pixels_col = pixels_base pixels_col = pixels_base
if i < remainder: if i < remainder:
pixels_col += 1 pixels_col += 1
grid[i+1,33-pixels_col:33] = fill_value grid[i+1,33-pixels_col:33] = fill_value
if battery_plugged: if battery_plugged:
pulse_amount = math.sin(time.time() / charging_pulse_time) pulse_amount = math.sin(time.time() / charging_pulse_time)
grid[1:8,20:33][lightning_bolt] -= np.rint(fill_value + 10 * pulse_amount).astype(int) grid[1:8,20:33][lightning_bolt] -= np.rint(fill_value + 10 * pulse_amount).astype(int)
indices = grid[1:8,20:33] < 0 indices = grid[1:8,20:33] < 0
grid[1:8,20:33][indices] = -grid[1:8,20:33][indices] grid[1:8,20:33][indices] = -grid[1:8,20:33][indices]
def draw_borders_left(grid, border_value): def draw_borders_left(grid, border_value):
# Fill in the borders # Fill in the borders
# Cpu vertical partitions # Cpu vertical partitions
grid[4, :16] = border_value grid[4, :16] = border_value
# Cpu horizontal partitions # Cpu horizontal partitions
grid[:, 4] = border_value grid[:, 4] = border_value
grid[:, 8] = border_value grid[:, 8] = border_value
grid[:, 12] = border_value grid[:, 12] = border_value
grid[:, 16] = border_value grid[:, 16] = border_value
# Memory bottom partition # Memory bottom partition
grid[:, 19] = border_value grid[:, 19] = border_value
# Outer Edge borders # Outer Edge borders
grid[:, 0] = border_value # Top grid[:, 0] = border_value # Top
grid[0, :] = border_value # Left grid[0, :] = border_value # Left
grid[8, :] = border_value # Right grid[8, :] = border_value # Right
grid[:, 33] = border_value # Bottom grid[:, 33] = border_value # Bottom
def draw_borders_right(grid, border_value): def draw_borders_right(grid, border_value):
# Fill in the borders # Fill in the borders
# Middle Partition borders # Middle Partition borders
grid[:, 16] = border_value grid[:, 16] = border_value
grid[4, :] = border_value grid[4, :] = border_value
# Outer Edge borders # Outer Edge borders
grid[:, 0] = border_value # Top grid[:, 0] = border_value # Top
grid[0, :] = border_value # Left grid[0, :] = border_value # Left
grid[8, :] = border_value # Right grid[8, :] = border_value # Right
grid[:, 33] = border_value # Bottom grid[:, 33] = border_value # Bottom
def draw_bar(grid, bar_ratio, bar_value, bar_x_offset = 1,draw_at_bottom = True): def draw_bar(grid, bar_ratio, bar_value, bar_x_offset = 1,draw_at_bottom = True):
bar_width = 3 bar_width = 3
bar_height = 16 bar_height = 16
lit_pixels = int(round(bar_height * bar_width * bar_ratio)) lit_pixels = int(round(bar_height * bar_width * bar_ratio))
pixels_base = lit_pixels // bar_width pixels_base = lit_pixels // bar_width
remainder = lit_pixels % bar_width remainder = lit_pixels % bar_width
for i in range(bar_width): for i in range(bar_width):
pixels_col = pixels_base pixels_col = pixels_base
if i < remainder: if i < remainder:
pixels_col += 1 pixels_col += 1
if draw_at_bottom: if draw_at_bottom:
grid[bar_x_offset+i,33-pixels_col:33] = bar_value grid[bar_x_offset+i,33-pixels_col:33] = bar_value
else: else:
grid[bar_x_offset+i,1:1+pixels_col] = bar_value grid[bar_x_offset+i,1:1+pixels_col] = bar_value
def draw_to_LEDs(s, grid): def draw_to_LEDs(s, grid):
for i in range(grid.shape[0]): for i in range(grid.shape[0]):
params = bytearray([i]) + bytearray(grid[i, :].tolist()) params = bytearray([i]) + bytearray(grid[i, :].tolist())
send_command(s, Commands.StageCol, parameters=params) send_command(s, Commands.StageCol, parameters=params)
send_command(s, Commands.FlushCols) send_command(s, Commands.FlushCols)
def init_device(location = "1-4.2"): def init_device(location = "1-4.2"):
try: try:
# VID = 1234 # VID = 1234
# PID = 5678 # PID = 5678
device_list = list_ports.comports() device_list = list_ports.comports()
for device in device_list: for device in device_list:
if device.location == location: if device.location and device.location.startswith(location):
s = serial.Serial(device.device, 115200) s = serial.Serial(device.device, 115200)
return s return s
except Exception as e: except Exception as e:
print(e) print(e)
class DrawingThread(threading.Thread): class DrawingThread(threading.Thread):
def __init__(self, serial_port, input_queue): def __init__(self, serial_port, input_queue):
super().__init__() super().__init__()
self.daemon = True self.daemon = True
self.serial_port = init_device(serial_port) self.serial_port = init_device(serial_port)
self.input_queue = input_queue self.input_queue = input_queue
def run(self): def run(self):
while True: while True:
try: try:
grid = self.input_queue.get() grid = self.input_queue.get()
draw_to_LEDs(self.serial_port, grid) draw_to_LEDs(self.serial_port, grid)
except Exception as e: except Exception as e:
print(f"Error in DrawingThread: {e}") print(f"Error in DrawingThread: {e}")
del self.serial_port del self.serial_port
time.sleep(1.0) time.sleep(1.0)
self.serial_port = init_device(self.serial_port) self.serial_port = init_device(self.serial_port)

View File

@ -1,86 +1,86 @@
# Built In Dependencies # Built In Dependencies
import time import time
import queue import queue
# Internal Dependencies # Internal Dependencies
from drawing import draw_cpu, draw_memory, draw_battery, draw_borders_left, draw_bar, draw_borders_right, DrawingThread from drawing import draw_cpu, draw_memory, draw_battery, draw_borders_left, draw_bar, draw_borders_right, DrawingThread
from monitors import CPUMonitor, MemoryMonitor, BatteryMonitor, DiskMonitor, NetworkMonitor, get_monitor_brightness from monitors import CPUMonitor, MemoryMonitor, BatteryMonitor, DiskMonitor, NetworkMonitor, get_monitor_brightness
# External Dependencies # External Dependencies
try: try:
# These are used in later scripts, but imported here to test if missing # These are used in later scripts, but imported here to test if missing
import serial # pyserial import serial # pyserial
from serial.tools import list_ports from serial.tools import list_ports
import numpy as np import numpy as np
except ImportError: except ImportError:
import pip import pip
for dependency in ["numpy", "pyserial"]: for dependency in ["numpy", "pyserial"]:
pip.main(['install', '--user', dependency]) pip.main(['install', '--user', dependency])
import numpy as np import numpy as np
# print(sbc.get_brightness()) # print(sbc.get_brightness())
if __name__ == "__main__": if __name__ == "__main__":
# Left LED Matrix location: "1-4.2" # Left LED Matrix location: "1-4.2"
# Right LED Matrix location: "1-3.3" # Right LED Matrix location: "1-3.3"
# Set up monitors and serial for left LED Matrix # Set up monitors and serial for left LED Matrix
min_background_brightness = 8 min_background_brightness = 8
max_background_brightness = 35 max_background_brightness = 35
min_foreground_brightness = 30 min_foreground_brightness = 30
max_foreground_brightness = 160 max_foreground_brightness = 160
cpu_monitor = CPUMonitor() cpu_monitor = CPUMonitor()
memory_monitor = MemoryMonitor() memory_monitor = MemoryMonitor()
battery_monitor = BatteryMonitor() battery_monitor = BatteryMonitor()
left_drawing_queue = queue.Queue(2) left_drawing_queue = queue.Queue(2)
left_drawing_thread = DrawingThread("1-4.2", left_drawing_queue) left_drawing_thread = DrawingThread("1-4.2", left_drawing_queue)
left_drawing_thread.start() left_drawing_thread.start()
# Set up monitors and serial for right LED Matrix # Set up monitors and serial for right LED Matrix
disk_monitor = DiskMonitor() disk_monitor = DiskMonitor()
network_monitor = NetworkMonitor() network_monitor = NetworkMonitor()
right_drawing_queue = queue.Queue(2) right_drawing_queue = queue.Queue(2)
right_drawing_thread = DrawingThread("1-3.3", right_drawing_queue) right_drawing_thread = DrawingThread("1-3.3", right_drawing_queue)
right_drawing_thread.start() right_drawing_thread.start()
while True: while True:
try: try:
screen_brightness = get_monitor_brightness() screen_brightness = get_monitor_brightness()
background_value = int(screen_brightness / 100 * (max_background_brightness - min_background_brightness) + min_background_brightness) background_value = int(screen_brightness * (max_background_brightness - min_background_brightness) + min_background_brightness)
foreground_value = int(screen_brightness / 100 * (max_foreground_brightness - min_foreground_brightness) + min_foreground_brightness) foreground_value = int(screen_brightness * (max_foreground_brightness - min_foreground_brightness) + min_foreground_brightness)
left_start_time = time.time() left_start_time = time.time()
# Draw to left LED Matrix # Draw to left LED Matrix
last_cpu_values = cpu_monitor.get() last_cpu_values = cpu_monitor.get()
last_memory_values = memory_monitor.get() last_memory_values = memory_monitor.get()
last_battery_values = battery_monitor.get() last_battery_values = battery_monitor.get()
grid = np.zeros((9,34), dtype = int) grid = np.zeros((9,34), dtype = int)
draw_cpu(grid, last_cpu_values, foreground_value) draw_cpu(grid, last_cpu_values, foreground_value)
draw_memory(grid, last_memory_values, foreground_value) draw_memory(grid, last_memory_values, foreground_value)
draw_battery(grid, last_battery_values[0], last_battery_values[1], foreground_value) draw_battery(grid, last_battery_values[0], last_battery_values[1], foreground_value)
draw_borders_left(grid, background_value) draw_borders_left(grid, background_value)
left_drawing_queue.put(grid) left_drawing_queue.put(grid)
# Draw to right LED Matrix # Draw to right LED Matrix
last_disk_read, last_disk_write = disk_monitor.get() last_disk_read, last_disk_write = disk_monitor.get()
last_network_upload, last_network_download = network_monitor.get() last_network_upload, last_network_download = network_monitor.get()
grid = np.zeros((9,34), dtype = int) grid = np.zeros((9,34), dtype = int)
draw_bar(grid, last_disk_read, foreground_value, bar_x_offset=1, draw_at_bottom=False) # Read draw_bar(grid, last_disk_read, foreground_value, bar_x_offset=1, draw_at_bottom=False) # Read
draw_bar(grid, last_disk_write, foreground_value, bar_x_offset=1, draw_at_bottom=True) # Write draw_bar(grid, last_disk_write, foreground_value, bar_x_offset=1, draw_at_bottom=True) # Write
draw_bar(grid, last_network_upload, foreground_value, bar_x_offset=5, draw_at_bottom=False) # Upload draw_bar(grid, last_network_upload, foreground_value, bar_x_offset=5, draw_at_bottom=False) # Upload
draw_bar(grid, last_network_download, foreground_value, bar_x_offset=5, draw_at_bottom=True) # Download draw_bar(grid, last_network_download, foreground_value, bar_x_offset=5, draw_at_bottom=True) # Download
draw_borders_right(grid, background_value) draw_borders_right(grid, background_value)
right_drawing_queue.put(grid) right_drawing_queue.put(grid)
except Exception as e: except Exception as e:
import traceback import traceback
print(f"Error in main loop: {e}") print(f"Error in main loop: {e}")
traceback.print_exc() traceback.print_exc()
time.sleep(1.0) time.sleep(1.0)
time.sleep(0.1) time.sleep(0.1)

View File

@ -1,130 +1,131 @@
# Built In Dependencies # Built In Dependencies
import time import time
import psutil import psutil
import os import os
if os.name == 'nt': if os.name == 'nt':
import wmi import wmi
else:
raise Exception("This script is not supported on this OS") class DiskMonitor:
def __init__(self, hysterisis_time = 20):
class DiskMonitor: self.read_usage_history = [0]
def __init__(self, hysterisis_time = 20): self.write_usage_history = [0]
self.read_usage_history = [] self.history_times = [0]
self.write_usage_history = [] self.highest_read_rate = 0.00001
self.history_times = [] self.highest_write_rate = 0.00001
self.highest_read_rate = 0.00001 self.max_history_size = hysterisis_time
self.highest_write_rate = 0.00001
self.max_history_size = hysterisis_time def get(self):
try:
def get(self): disk_io = psutil.disk_io_counters()
try: read_usage = disk_io.read_bytes
disk_io = psutil.disk_io_counters() write_usage = disk_io.write_bytes
read_usage = disk_io.read_bytes self.read_usage_history.append(read_usage)
write_usage = disk_io.write_bytes self.write_usage_history.append(write_usage)
self.read_usage_history.append(read_usage) self.history_times.append(time.time())
self.write_usage_history.append(write_usage) if len(self.read_usage_history) > self.max_history_size:
self.history_times.append(time.time()) self.read_usage_history = self.read_usage_history[-self.max_history_size:]
if len(self.read_usage_history) > self.max_history_size: self.write_usage_history = self.write_usage_history[-self.max_history_size:]
self.read_usage_history = self.read_usage_history[-self.max_history_size:] self.history_times = self.history_times[-self.max_history_size:]
self.write_usage_history = self.write_usage_history[-self.max_history_size:]
self.history_times = self.history_times[-self.max_history_size:] read_diff = self.read_usage_history[-1] - self.read_usage_history[0]
write_diff = self.write_usage_history[-1] - self.write_usage_history[0]
read_diff = self.read_usage_history[-1] - self.read_usage_history[0] time_diff = self.history_times[-1] - self.history_times[0]
write_diff = self.write_usage_history[-1] - self.write_usage_history[0] read_rate = read_diff / time_diff
time_diff = self.history_times[-1] - self.history_times[0] write_rate = write_diff / time_diff
read_rate = read_diff / time_diff self.highest_read_rate = max(self.highest_read_rate, read_rate)
write_rate = write_diff / time_diff self.highest_write_rate = max(self.highest_write_rate, write_rate)
self.highest_read_rate = max(self.highest_read_rate, read_rate) read_percent = min(1.0, read_rate / self.highest_read_rate)
self.highest_write_rate = max(self.highest_write_rate, write_rate) write_percent = min(1.0, write_rate / self.highest_write_rate)
read_percent = min(1.0, read_rate / self.highest_read_rate) return read_percent, write_percent
write_percent = min(1.0, write_rate / self.highest_write_rate) except Exception as e:
return read_percent, write_percent print(f"Error in DiskMonitor.get(): {e}")
except Exception as e: return 0, 0
print(f"Error in DiskMonitor.get(): {e}")
return 0, 0 class NetworkMonitor:
def __init__(self, hysterisis_time = 20):
class NetworkMonitor: self.sent_usage_history = [0]
def __init__(self, hysterisis_time = 20): self.recv_usage_history = [0]
self.sent_usage_history = [] self.history_times = [0]
self.recv_usage_history = [] self.highest_sent_rate = 0.00001
self.history_times = [] self.highest_recv_rate = 0.00001
self.highest_sent_rate = 0.00001 self.max_history_size = hysterisis_time
self.highest_recv_rate = 0.00001
self.max_history_size = hysterisis_time def get(self):
try:
def get(self): net_io = psutil.net_io_counters()
try: sent_usage = net_io.bytes_sent
net_io = psutil.net_io_counters() recv_usage = net_io.bytes_recv
sent_usage = net_io.bytes_sent self.sent_usage_history.append(sent_usage)
recv_usage = net_io.bytes_recv self.recv_usage_history.append(recv_usage)
self.sent_usage_history.append(sent_usage) self.history_times.append(time.time())
self.recv_usage_history.append(recv_usage) if len(self.sent_usage_history) > self.max_history_size:
self.history_times.append(time.time()) self.sent_usage_history = self.sent_usage_history[-self.max_history_size:]
if len(self.sent_usage_history) > self.max_history_size: self.recv_usage_history = self.recv_usage_history[-self.max_history_size:]
self.sent_usage_history = self.sent_usage_history[-self.max_history_size:] self.history_times = self.history_times[-self.max_history_size:]
self.recv_usage_history = self.recv_usage_history[-self.max_history_size:]
self.history_times = self.history_times[-self.max_history_size:] sent_diff = self.sent_usage_history[-1] - self.sent_usage_history[0]
recv_diff = self.recv_usage_history[-1] - self.recv_usage_history[0]
sent_diff = self.sent_usage_history[-1] - self.sent_usage_history[0] time_diff = self.history_times[-1] - self.history_times[0]
recv_diff = self.recv_usage_history[-1] - self.recv_usage_history[0] sent_rate = sent_diff / time_diff
time_diff = self.history_times[-1] - self.history_times[0] recv_rate = recv_diff / time_diff
sent_rate = sent_diff / time_diff self.highest_sent_rate = max(self.highest_sent_rate, sent_rate)
recv_rate = recv_diff / time_diff self.highest_recv_rate = max(self.highest_recv_rate, recv_rate)
self.highest_sent_rate = max(self.highest_sent_rate, sent_rate) sent_percent = min(1.0, sent_rate / self.highest_sent_rate)
self.highest_recv_rate = max(self.highest_recv_rate, recv_rate) recv_percent = min(1.0, recv_rate / self.highest_recv_rate)
sent_percent = min(1.0, sent_rate / self.highest_sent_rate) return sent_percent, recv_percent
recv_percent = min(1.0, recv_rate / self.highest_recv_rate) except Exception as e:
return sent_percent, recv_percent print(f"Error in NetworkMonitor.get(): {e}")
except Exception as e: return 0, 0
print(f"Error in NetworkMonitor.get(): {e}")
return 0, 0 class CPUMonitor:
def __init__(self, hysterisis_time = 10):
class CPUMonitor: self.cpu_count = psutil.cpu_count() // 2 # 2 logical cores per physical core
def __init__(self, hysterisis_time = 10): self.cpu_usage_history = [[] for _ in range(self.cpu_count)]
self.cpu_count = psutil.cpu_count() // 2 # 2 logical cores per physical core self.history_times = []
self.cpu_usage_history = [[] for _ in range(self.cpu_count)] self.max_history_size = hysterisis_time
self.history_times = []
self.max_history_size = hysterisis_time def get(self):
try:
def get(self): cpu_usage = psutil.cpu_percent(percpu=True)
try: for i in range(self.cpu_count):
cpu_usage = psutil.cpu_percent(percpu=True) useage = 2 * max(cpu_usage[2*i], cpu_usage[2*i+1]) # Combine logical cores
for i in range(self.cpu_count): if useage > 100:
useage = 2 * max(cpu_usage[2*i], cpu_usage[2*i+1]) # Combine logical cores useage = 100
if useage > 100: self.cpu_usage_history[i].append(useage / 100.0)
useage = 100 self.history_times.append(time.time())
self.cpu_usage_history[i].append(useage / 100.0) if len(self.cpu_usage_history[0]) > self.max_history_size:
self.history_times.append(time.time()) for i in range(self.cpu_count):
if len(self.cpu_usage_history[0]) > self.max_history_size: self.cpu_usage_history[i] = self.cpu_usage_history[i][-self.max_history_size:]
for i in range(self.cpu_count): self.history_times = self.history_times[-self.max_history_size:]
self.cpu_usage_history[i] = self.cpu_usage_history[i][-self.max_history_size:] cpu_percentages = [sum(core_history) / self.max_history_size for core_history in self.cpu_usage_history]
self.history_times = self.history_times[-self.max_history_size:] # Somehow cpu_percentages can have values greater than 1 so we clamp them
cpu_percentages = [sum(core_history) / self.max_history_size for core_history in self.cpu_usage_history] return cpu_percentages
# Somehow cpu_percentages can have values greater than 1 so we clamp them except Exception as e:
return cpu_percentages print(f"Error in CPUMonitor.get(): {e}")
except Exception as e: return [0] * self.cpu_count
print(f"Error in CPUMonitor.get(): {e}")
return [0] * self.cpu_count class MemoryMonitor:
@staticmethod
class MemoryMonitor: def get():
@staticmethod return psutil.virtual_memory().percent / 100.0
def get():
return psutil.virtual_memory().percent / 100.0
class BatteryMonitor:
@staticmethod
class BatteryMonitor: def get():
@staticmethod battery = psutil.sensors_battery()
def get(): if battery is not None:
battery = psutil.sensors_battery() battery_percentage = battery.percent / 100.0
if battery is not None: battery_plugged = battery.power_plugged
battery_percentage = battery.percent / 100.0 return battery_percentage, battery_plugged
battery_plugged = battery.power_plugged
return battery_percentage, battery_plugged
def get_monitor_brightness():
try:
def get_monitor_brightness(): if os.name == 'nt':
try: return wmi.WMI(namespace='wmi').WmiMonitorBrightness()[0].CurrentBrightness / 100.0
return wmi.WMI(namespace='wmi').WmiMonitorBrightness()[0].CurrentBrightness else:
except: return int(open('/sys/class/backlight/amdgpu_bl1/brightness', 'r').read()) / 255.0
return 50 except Exception as e:
return 1.0