removed all threads and wrote custom display brightness reader

This commit is contained in:
= 2024-08-05 22:41:54 -04:00
parent da718a6fc4
commit f8cee87170
3 changed files with 170 additions and 226 deletions

View File

@ -1,12 +1,15 @@
# Built In Dependencies # Built In Dependencies
import time import time
import math import math
import threading
# Internal Dependencies # Internal Dependencies
from commands import Commands, send_command from commands import Commands, send_command
# External Dependencies # External Dependencies
import numpy as np import numpy as np
import serial # pyserial
from serial.tools import list_ports
# This table represents the 3x3 grid of LEDs to be drawn for each fill ratio # This table represents the 3x3 grid of LEDs to be drawn for each fill ratio
@ -142,6 +145,7 @@ def draw_borders_left(grid, border_value):
grid[8, :] = border_value # Right grid[8, :] = border_value # Right
grid[:, 33] = border_value # Bottom grid[:, 33] = border_value # Bottom
def draw_borders_right(grid, border_value): def draw_borders_right(grid, border_value):
# Fill in the borders # Fill in the borders
# Middle Partition borders # Middle Partition borders
@ -153,6 +157,7 @@ def draw_borders_right(grid, border_value):
grid[8, :] = border_value # Right grid[8, :] = border_value # Right
grid[:, 33] = border_value # Bottom grid[:, 33] = border_value # Bottom
def draw_bar(grid, bar_ratio, bar_value, bar_x_offset = 1,draw_at_bottom = True): def draw_bar(grid, bar_ratio, bar_value, bar_x_offset = 1,draw_at_bottom = True):
bar_width = 3 bar_width = 3
bar_height = 16 bar_height = 16
@ -168,8 +173,42 @@ def draw_bar(grid, bar_ratio, bar_value, bar_x_offset = 1,draw_at_bottom = True)
else: else:
grid[bar_x_offset+i,1:1+pixels_col] = bar_value grid[bar_x_offset+i,1:1+pixels_col] = bar_value
def draw_to_LEDs(s, grid): def draw_to_LEDs(s, grid):
for i in range(grid.shape[0]): for i in range(grid.shape[0]):
params = bytearray([i]) + bytearray(grid[i, :].tolist()) params = bytearray([i]) + bytearray(grid[i, :].tolist())
send_command(s, Commands.StageCol, parameters=params) send_command(s, Commands.StageCol, parameters=params)
send_command(s, Commands.FlushCols) send_command(s, Commands.FlushCols)
def init_device(location = "1-4.2"):
try:
# VID = 1234
# PID = 5678
device_list = list_ports.comports()
for device in device_list:
if device.location == location:
s = serial.Serial(device.device, 115200)
return s
except Exception as e:
print(e)
class DrawingThread(threading.Thread):
def __init__(self, serial_port, input_queue):
super().__init__()
self.daemon = True
self.serial_port = init_device(serial_port)
self.input_queue = input_queue
def run(self):
while True:
try:
grid = self.input_queue.get()
draw_to_LEDs(self.serial_port, grid)
except Exception as e:
print(f"Error in DrawingThread: {e}")
del self.serial_port
time.sleep(1.0)
self.serial_port = init_device(self.serial_port)

View File

@ -3,37 +3,23 @@ import time
import queue import queue
# Internal Dependencies # Internal Dependencies
from drawing import draw_cpu, draw_memory, draw_battery, draw_borders_left, draw_to_LEDs, draw_bar, draw_borders_right from drawing import draw_cpu, draw_memory, draw_battery, draw_borders_left, draw_bar, draw_borders_right, DrawingThread
from monitors import CPUMonitorThread, MemoryMonitorThread, BatteryMonitorThread, DiskMonitorThread, NetworkMonitorThread from monitors import CPUMonitor, MemoryMonitor, BatteryMonitor, DiskMonitor, NetworkMonitor, get_monitor_brightness
# External Dependencies # External Dependencies
try: try:
# These are used in later scripts, but imported here to test if missing
import serial # pyserial import serial # pyserial
from serial.tools import list_ports from serial.tools import list_ports
import numpy as np # This is used in a module and we import it here to fetch it if needed import numpy as np
import screen_brightness_control as sbc
except ImportError: except ImportError:
import pip import pip
for dependency in ["numpy", "pyserial", "screen-brightness-control"]: for dependency in ["numpy", "pyserial"]:
pip.main(['install', '--user', dependency]) pip.main(['install', '--user', dependency])
import serial import numpy as np
from serial.tools import list_ports
import screen_brightness_control as sbc
# print(sbc.get_brightness()) # print(sbc.get_brightness())
def init_device(location = "1-4.2"):
try:
# VID = 1234
# PID = 5678
device_list = list_ports.comports()
for device in device_list:
if device.location == location:
s = serial.Serial(device.device, 115200)
return s
except Exception as e:
print(e)
if __name__ == "__main__": if __name__ == "__main__":
# Left LED Matrix location: "1-4.2" # Left LED Matrix location: "1-4.2"
@ -45,65 +31,45 @@ if __name__ == "__main__":
min_foreground_brightness = 30 min_foreground_brightness = 30
max_foreground_brightness = 160 max_foreground_brightness = 160
cpu_queue = queue.Queue(2) cpu_monitor = CPUMonitor()
cpu_monitor = CPUMonitorThread(cpu_queue) memory_monitor = MemoryMonitor()
cpu_monitor.start() battery_monitor = BatteryMonitor()
memory_queue = queue.Queue(2) left_drawing_queue = queue.Queue(2)
memory_monitor = MemoryMonitorThread(memory_queue) left_drawing_thread = DrawingThread("1-4.2", left_drawing_queue)
memory_monitor.start() left_drawing_thread.start()
battery_queue = queue.Queue(2)
battery_monitor = BatteryMonitorThread(battery_queue)
battery_monitor.start()
last_cpu_values = cpu_queue.get()
last_memory_values = memory_queue.get()
last_battery_values = battery_queue.get()
s1 = init_device("1-4.2")
# Set up monitors and serial for right LED Matrix # Set up monitors and serial for right LED Matrix
disk_queue = queue.Queue(2) disk_monitor = DiskMonitor()
disk_monitor = DiskMonitorThread(disk_queue) network_monitor = NetworkMonitor()
disk_monitor.start()
network_queue = queue.Queue(2) right_drawing_queue = queue.Queue(2)
network_monitor = NetworkMonitorThread(network_queue) right_drawing_thread = DrawingThread("1-3.3", right_drawing_queue)
network_monitor.start() right_drawing_thread.start()
last_disk_read, last_disk_write = disk_queue.get()
last_network_upload, last_network_download = network_queue.get()
s2 = init_device("1-3.3")
while True: while True:
try: try:
screen_brightness = sbc.get_brightness()[0] screen_brightness = get_monitor_brightness()
background_value = int(screen_brightness / 100 * (max_background_brightness - min_background_brightness) + min_background_brightness) background_value = int(screen_brightness / 100 * (max_background_brightness - min_background_brightness) + min_background_brightness)
foreground_value = int(screen_brightness / 100 * (max_foreground_brightness - min_foreground_brightness) + min_foreground_brightness) foreground_value = int(screen_brightness / 100 * (max_foreground_brightness - min_foreground_brightness) + min_foreground_brightness)
left_start_time = time.time()
# Draw to left LED Matrix # Draw to left LED Matrix
if not cpu_queue.empty(): last_cpu_values = cpu_monitor.get()
last_cpu_values = cpu_queue.get() last_memory_values = memory_monitor.get()
if not memory_queue.empty(): last_battery_values = battery_monitor.get()
last_memory_values = memory_queue.get()
if not battery_queue.empty():
last_battery_values = battery_queue.get()
grid = np.zeros((9,34), dtype = int) grid = np.zeros((9,34), dtype = int)
draw_cpu(grid, last_cpu_values, foreground_value) draw_cpu(grid, last_cpu_values, foreground_value)
draw_memory(grid, last_memory_values, foreground_value) draw_memory(grid, last_memory_values, foreground_value)
draw_battery(grid, last_battery_values[0], last_battery_values[1], foreground_value) draw_battery(grid, last_battery_values[0], last_battery_values[1], foreground_value)
draw_borders_left(grid, background_value) draw_borders_left(grid, background_value)
draw_to_LEDs(s1, grid) left_drawing_queue.put(grid)
# Draw to right LED Matrix # Draw to right LED Matrix
if not disk_queue.empty(): last_disk_read, last_disk_write = disk_monitor.get()
last_disk_read, last_disk_write = disk_queue.get() last_network_upload, last_network_download = network_monitor.get()
if not network_queue.empty():
last_network_upload, last_network_download = network_queue.get()
grid = np.zeros((9,34), dtype = int) grid = np.zeros((9,34), dtype = int)
draw_bar(grid, last_disk_read, foreground_value, bar_x_offset=1, draw_at_bottom=False) # Read draw_bar(grid, last_disk_read, foreground_value, bar_x_offset=1, draw_at_bottom=False) # Read
@ -111,20 +77,10 @@ if __name__ == "__main__":
draw_bar(grid, last_network_upload, foreground_value, bar_x_offset=5, draw_at_bottom=False) # Upload draw_bar(grid, last_network_upload, foreground_value, bar_x_offset=5, draw_at_bottom=False) # Upload
draw_bar(grid, last_network_download, foreground_value, bar_x_offset=5, draw_at_bottom=True) # Download draw_bar(grid, last_network_download, foreground_value, bar_x_offset=5, draw_at_bottom=True) # Download
draw_borders_right(grid, background_value) draw_borders_right(grid, background_value)
draw_to_LEDs(s2, grid) right_drawing_queue.put(grid)
except Exception as e: except Exception as e:
import traceback
print(f"Error in main loop: {e}") print(f"Error in main loop: {e}")
try: traceback.print_exc()
del s1
except:
pass
try:
del s2
except:
pass
time.sleep(1.0) time.sleep(1.0)
s1 = init_device("1-4.2") time.sleep(0.1)
s2 = init_device("1-3.3")
time.sleep(0.05)

View File

@ -1,181 +1,130 @@
# Built In Dependencies # Built In Dependencies
import time import time
import psutil import psutil
import threading import os
if os.name == 'nt':
import wmi
else:
raise Exception("This script is not supported on this OS")
class DiskMonitorThread(threading.Thread): class DiskMonitor:
def __init__(self, output_queue, hysterisis_time = 3, update_interval = 0.5): def __init__(self, hysterisis_time = 20):
super().__init__()
self.daemon = True
self.read_usage_history = [] self.read_usage_history = []
self.write_usage_history = [] self.write_usage_history = []
self.history_times = [] self.history_times = []
self.highest_read_rate = 0.00001 self.highest_read_rate = 0.00001
self.highest_write_rate = 0.00001 self.highest_write_rate = 0.00001
self.max_history_size = int(round(hysterisis_time / update_interval)) self.max_history_size = hysterisis_time
self.update_interval = update_interval
self.output_queue = output_queue
def run(self): def get(self):
while True: try:
try: disk_io = psutil.disk_io_counters()
if not self.output_queue.full(): read_usage = disk_io.read_bytes
disk_io = psutil.disk_io_counters() write_usage = disk_io.write_bytes
else: self.read_usage_history.append(read_usage)
print("Disk monitor queue is full") self.write_usage_history.append(write_usage)
read_usage = disk_io.read_bytes self.history_times.append(time.time())
write_usage = disk_io.write_bytes if len(self.read_usage_history) > self.max_history_size:
self.read_usage_history.append(read_usage) self.read_usage_history = self.read_usage_history[-self.max_history_size:]
self.write_usage_history.append(write_usage) self.write_usage_history = self.write_usage_history[-self.max_history_size:]
self.history_times.append(time.time()) self.history_times = self.history_times[-self.max_history_size:]
if len(self.read_usage_history) > self.max_history_size:
self.read_usage_history = self.read_usage_history[-self.max_history_size:]
self.write_usage_history = self.write_usage_history[-self.max_history_size:]
self.history_times = self.history_times[-self.max_history_size:]
if len(self.read_usage_history) == self.max_history_size: read_diff = self.read_usage_history[-1] - self.read_usage_history[0]
read_diff = self.read_usage_history[-1] - self.read_usage_history[0] write_diff = self.write_usage_history[-1] - self.write_usage_history[0]
write_diff = self.write_usage_history[-1] - self.write_usage_history[0] time_diff = self.history_times[-1] - self.history_times[0]
time_diff = self.history_times[-1] - self.history_times[0] read_rate = read_diff / time_diff
read_rate = read_diff / time_diff write_rate = write_diff / time_diff
write_rate = write_diff / time_diff self.highest_read_rate = max(self.highest_read_rate, read_rate)
self.highest_read_rate = max(self.highest_read_rate, read_rate) self.highest_write_rate = max(self.highest_write_rate, write_rate)
self.highest_write_rate = max(self.highest_write_rate, write_rate) read_percent = min(1.0, read_rate / self.highest_read_rate)
read_percent = min(1.0, read_rate / self.highest_read_rate) write_percent = min(1.0, write_rate / self.highest_write_rate)
write_percent = min(1.0, write_rate / self.highest_write_rate) return read_percent, write_percent
self.output_queue.put((read_percent, write_percent)) except Exception as e:
except Exception as e: print(f"Error in DiskMonitor.get(): {e}")
print(f"Error in DiskMonitorThread: {e}") return 0, 0
time.sleep(self.update_interval)
class NetworkMonitorThread(threading.Thread): class NetworkMonitor:
def __init__(self, output_queue, hysterisis_time = 3, update_interval = 0.5): def __init__(self, hysterisis_time = 20):
super().__init__()
self.daemon = True
self.sent_usage_history = [] self.sent_usage_history = []
self.recv_usage_history = [] self.recv_usage_history = []
self.history_times = [] self.history_times = []
self.highest_sent_rate = 0.00001 self.highest_sent_rate = 0.00001
self.highest_recv_rate = 0.00001 self.highest_recv_rate = 0.00001
self.max_history_size = int(round(hysterisis_time / update_interval)) self.max_history_size = hysterisis_time
self.update_interval = update_interval
self.output_queue = output_queue
def run(self): def get(self):
while True: try:
try: net_io = psutil.net_io_counters()
if not self.output_queue.full(): sent_usage = net_io.bytes_sent
net_io = psutil.net_io_counters() recv_usage = net_io.bytes_recv
else: self.sent_usage_history.append(sent_usage)
print("Network monitor queue is full") self.recv_usage_history.append(recv_usage)
sent_usage = net_io.bytes_sent self.history_times.append(time.time())
recv_usage = net_io.bytes_recv if len(self.sent_usage_history) > self.max_history_size:
self.sent_usage_history.append(sent_usage) self.sent_usage_history = self.sent_usage_history[-self.max_history_size:]
self.recv_usage_history.append(recv_usage) self.recv_usage_history = self.recv_usage_history[-self.max_history_size:]
self.history_times.append(time.time()) self.history_times = self.history_times[-self.max_history_size:]
if len(self.sent_usage_history) > self.max_history_size:
self.sent_usage_history = self.sent_usage_history[-self.max_history_size:]
self.recv_usage_history = self.recv_usage_history[-self.max_history_size:]
self.history_times = self.history_times[-self.max_history_size:]
if len(self.sent_usage_history) == self.max_history_size: sent_diff = self.sent_usage_history[-1] - self.sent_usage_history[0]
sent_diff = self.sent_usage_history[-1] - self.sent_usage_history[0] recv_diff = self.recv_usage_history[-1] - self.recv_usage_history[0]
recv_diff = self.recv_usage_history[-1] - self.recv_usage_history[0] time_diff = self.history_times[-1] - self.history_times[0]
time_diff = self.history_times[-1] - self.history_times[0] sent_rate = sent_diff / time_diff
sent_rate = sent_diff / time_diff recv_rate = recv_diff / time_diff
recv_rate = recv_diff / time_diff self.highest_sent_rate = max(self.highest_sent_rate, sent_rate)
self.highest_sent_rate = max(self.highest_sent_rate, sent_rate) self.highest_recv_rate = max(self.highest_recv_rate, recv_rate)
self.highest_recv_rate = max(self.highest_recv_rate, recv_rate) sent_percent = min(1.0, sent_rate / self.highest_sent_rate)
sent_percent = min(1.0, sent_rate / self.highest_sent_rate) recv_percent = min(1.0, recv_rate / self.highest_recv_rate)
recv_percent = min(1.0, recv_rate / self.highest_recv_rate) return sent_percent, recv_percent
self.output_queue.put((sent_percent, recv_percent)) except Exception as e:
except Exception as e: print(f"Error in NetworkMonitor.get(): {e}")
print(f"Error in NetworkMonitorThread: {e}") return 0, 0
time.sleep(self.update_interval)
class CPUMonitorThread(threading.Thread): class CPUMonitor:
def __init__(self, output_queue, hysterisis_time = 3, update_interval = 0.5): def __init__(self, hysterisis_time = 10):
super().__init__()
self.daemon = True
self.cpu_count = psutil.cpu_count() // 2 # 2 logical cores per physical core self.cpu_count = psutil.cpu_count() // 2 # 2 logical cores per physical core
self.cpu_usage_history = [[] for _ in range(self.cpu_count)] self.cpu_usage_history = [[] for _ in range(self.cpu_count)]
self.history_times = [] self.history_times = []
self.max_history_size = int(round(hysterisis_time / update_interval)) self.max_history_size = hysterisis_time
self.update_interval = update_interval
self.output_queue = output_queue
def run(self): def get(self):
while True: try:
try: cpu_usage = psutil.cpu_percent(percpu=True)
if not self.output_queue.full(): for i in range(self.cpu_count):
cpu_usage = psutil.cpu_percent(percpu=True) useage = 2 * max(cpu_usage[2*i], cpu_usage[2*i+1]) # Combine logical cores
else: if useage > 100:
print("CPU monitor queue is full") useage = 100
self.cpu_usage_history[i].append(useage / 100.0)
self.history_times.append(time.time())
if len(self.cpu_usage_history[0]) > self.max_history_size:
for i in range(self.cpu_count): for i in range(self.cpu_count):
useage = 2 * max(cpu_usage[2*i], cpu_usage[2*i+1]) # Combine logical cores self.cpu_usage_history[i] = self.cpu_usage_history[i][-self.max_history_size:]
if useage > 100:
useage = 100
self.cpu_usage_history[i].append(useage / 100.0)
self.history_times.append(time.time())
if len(self.cpu_usage_history[0]) > self.max_history_size:
for i in range(self.cpu_count):
self.cpu_usage_history[i] = self.cpu_usage_history[i][-self.max_history_size:]
self.history_times = self.history_times[-self.max_history_size:] self.history_times = self.history_times[-self.max_history_size:]
if len(self.cpu_usage_history[0]) == self.max_history_size: cpu_percentages = [sum(core_history) / self.max_history_size for core_history in self.cpu_usage_history]
cpu_percentages = [sum(core_history) / self.max_history_size for core_history in self.cpu_usage_history] # Somehow cpu_percentages can have values greater than 1 so we clamp them
self.output_queue.put(cpu_percentages) return [min(1.0, cpu_percentage) for cpu_percentage in cpu_percentages]
except Exception as e: except Exception as e:
print(f"Error in CPUMonitorThread: {e}") print(f"Error in CPUMonitor.get(): {e}")
time.sleep(self.update_interval) return [0] * self.cpu_count
class MemoryMonitorThread(threading.Thread): class MemoryMonitor:
def __init__(self, output_queue, hysterisis_time = 5, update_interval = 1.0): @staticmethod
super().__init__() def get():
self.daemon = True return psutil.virtual_memory().percent / 100.0
self.memory_usage_history = []
self.history_times = []
self.max_history_size = int(round(hysterisis_time / update_interval))
self.update_interval = update_interval
self.output_queue = output_queue
def run(self):
while True:
try:
if not self.output_queue.full():
memory_usage = psutil.virtual_memory().percent / 100.0
else:
print("Memory monitor queue is full")
self.memory_usage_history.append(memory_usage)
self.history_times.append(time.time())
if len(self.memory_usage_history) > self.max_history_size:
self.memory_usage_history = self.memory_usage_history[-self.max_history_size:]
self.history_times = self.history_times[-self.max_history_size:]
if len(self.memory_usage_history) == self.max_history_size:
avg_memory_usage = sum(self.memory_usage_history) / self.max_history_size
self.output_queue.put(avg_memory_usage)
except Exception as e:
print(f"Error in MemoryMonitorThread: {e}")
time.sleep(self.update_interval)
class BatteryMonitorThread(threading.Thread): class BatteryMonitor:
def __init__(self, output_queue, update_interval = 1): @staticmethod
super().__init__() def get():
self.daemon = True battery = psutil.sensors_battery()
self.update_interval = update_interval if battery is not None:
self.output_queue = output_queue battery_percentage = battery.percent / 100.0
battery_plugged = battery.power_plugged
return battery_percentage, battery_plugged
def run(self):
while True: def get_monitor_brightness():
try: try:
if not self.output_queue.full(): return wmi.WMI(namespace='wmi').WmiMonitorBrightness()[0].CurrentBrightness
battery = psutil.sensors_battery() except:
if battery is not None: return 50
battery_percentage = battery.percent / 100.0
battery_plugged = battery.power_plugged
self.output_queue.put((battery_percentage, battery_plugged))
else:
print("Battery monitor queue is full")
except Exception as e:
print(f"Error in BatteryMonitorThread: {e}")
time.sleep(self.update_interval)