From f8cee87170a8a51290da500cd719732fe266b077 Mon Sep 17 00:00:00 2001 From: = <=> Date: Mon, 5 Aug 2024 22:41:54 -0400 Subject: [PATCH] removed all threads and wrote custom display brightness reader --- drawing.py | 41 ++++++- led_system_monitor.py | 104 +++++------------ monitors.py | 251 +++++++++++++++++------------------------- 3 files changed, 170 insertions(+), 226 deletions(-) diff --git a/drawing.py b/drawing.py index 493518e..148efcc 100644 --- a/drawing.py +++ b/drawing.py @@ -1,12 +1,15 @@ # Built In Dependencies import time import math +import threading # Internal Dependencies from commands import Commands, send_command # External Dependencies import numpy as np +import serial # pyserial +from serial.tools import list_ports # This table represents the 3x3 grid of LEDs to be drawn for each fill ratio @@ -142,6 +145,7 @@ def draw_borders_left(grid, border_value): grid[8, :] = border_value # Right grid[:, 33] = border_value # Bottom + def draw_borders_right(grid, border_value): # Fill in the borders # Middle Partition borders @@ -153,6 +157,7 @@ def draw_borders_right(grid, border_value): grid[8, :] = border_value # Right grid[:, 33] = border_value # Bottom + def draw_bar(grid, bar_ratio, bar_value, bar_x_offset = 1,draw_at_bottom = True): bar_width = 3 bar_height = 16 @@ -168,8 +173,42 @@ def draw_bar(grid, bar_ratio, bar_value, bar_x_offset = 1,draw_at_bottom = True) else: grid[bar_x_offset+i,1:1+pixels_col] = bar_value + def draw_to_LEDs(s, grid): for i in range(grid.shape[0]): params = bytearray([i]) + bytearray(grid[i, :].tolist()) send_command(s, Commands.StageCol, parameters=params) - send_command(s, Commands.FlushCols) \ No newline at end of file + send_command(s, Commands.FlushCols) + + +def init_device(location = "1-4.2"): + try: + # VID = 1234 + # PID = 5678 + device_list = list_ports.comports() + for device in device_list: + if device.location == location: + s = serial.Serial(device.device, 115200) + return s + except Exception as e: + print(e) + + +class DrawingThread(threading.Thread): + def __init__(self, serial_port, input_queue): + super().__init__() + self.daemon = True + self.serial_port = init_device(serial_port) + self.input_queue = input_queue + + def run(self): + while True: + try: + grid = self.input_queue.get() + draw_to_LEDs(self.serial_port, grid) + except Exception as e: + print(f"Error in DrawingThread: {e}") + del self.serial_port + time.sleep(1.0) + self.serial_port = init_device(self.serial_port) + diff --git a/led_system_monitor.py b/led_system_monitor.py index ac37720..88ff7df 100644 --- a/led_system_monitor.py +++ b/led_system_monitor.py @@ -3,36 +3,22 @@ import time import queue # Internal Dependencies -from drawing import draw_cpu, draw_memory, draw_battery, draw_borders_left, draw_to_LEDs, draw_bar, draw_borders_right -from monitors import CPUMonitorThread, MemoryMonitorThread, BatteryMonitorThread, DiskMonitorThread, NetworkMonitorThread +from drawing import draw_cpu, draw_memory, draw_battery, draw_borders_left, draw_bar, draw_borders_right, DrawingThread +from monitors import CPUMonitor, MemoryMonitor, BatteryMonitor, DiskMonitor, NetworkMonitor, get_monitor_brightness # External Dependencies try: + # These are used in later scripts, but imported here to test if missing import serial # pyserial from serial.tools import list_ports - import numpy as np # This is used in a module and we import it here to fetch it if needed - import screen_brightness_control as sbc + import numpy as np except ImportError: import pip - for dependency in ["numpy", "pyserial", "screen-brightness-control"]: + for dependency in ["numpy", "pyserial"]: pip.main(['install', '--user', dependency]) - import serial - from serial.tools import list_ports - import screen_brightness_control as sbc + import numpy as np # print(sbc.get_brightness()) - -def init_device(location = "1-4.2"): - try: - # VID = 1234 - # PID = 5678 - device_list = list_ports.comports() - for device in device_list: - if device.location == location: - s = serial.Serial(device.device, 115200) - return s - except Exception as e: - print(e) if __name__ == "__main__": @@ -45,65 +31,45 @@ if __name__ == "__main__": min_foreground_brightness = 30 max_foreground_brightness = 160 - cpu_queue = queue.Queue(2) - cpu_monitor = CPUMonitorThread(cpu_queue) - cpu_monitor.start() + cpu_monitor = CPUMonitor() + memory_monitor = MemoryMonitor() + battery_monitor = BatteryMonitor() - memory_queue = queue.Queue(2) - memory_monitor = MemoryMonitorThread(memory_queue) - memory_monitor.start() - - battery_queue = queue.Queue(2) - battery_monitor = BatteryMonitorThread(battery_queue) - battery_monitor.start() - - last_cpu_values = cpu_queue.get() - last_memory_values = memory_queue.get() - last_battery_values = battery_queue.get() - - s1 = init_device("1-4.2") + left_drawing_queue = queue.Queue(2) + left_drawing_thread = DrawingThread("1-4.2", left_drawing_queue) + left_drawing_thread.start() # Set up monitors and serial for right LED Matrix - disk_queue = queue.Queue(2) - disk_monitor = DiskMonitorThread(disk_queue) - disk_monitor.start() + disk_monitor = DiskMonitor() + network_monitor = NetworkMonitor() - network_queue = queue.Queue(2) - network_monitor = NetworkMonitorThread(network_queue) - network_monitor.start() - - last_disk_read, last_disk_write = disk_queue.get() - last_network_upload, last_network_download = network_queue.get() - - s2 = init_device("1-3.3") + right_drawing_queue = queue.Queue(2) + right_drawing_thread = DrawingThread("1-3.3", right_drawing_queue) + right_drawing_thread.start() while True: try: - screen_brightness = sbc.get_brightness()[0] + screen_brightness = get_monitor_brightness() background_value = int(screen_brightness / 100 * (max_background_brightness - min_background_brightness) + min_background_brightness) foreground_value = int(screen_brightness / 100 * (max_foreground_brightness - min_foreground_brightness) + min_foreground_brightness) + left_start_time = time.time() # Draw to left LED Matrix - if not cpu_queue.empty(): - last_cpu_values = cpu_queue.get() - if not memory_queue.empty(): - last_memory_values = memory_queue.get() - if not battery_queue.empty(): - last_battery_values = battery_queue.get() - + last_cpu_values = cpu_monitor.get() + last_memory_values = memory_monitor.get() + last_battery_values = battery_monitor.get() + grid = np.zeros((9,34), dtype = int) draw_cpu(grid, last_cpu_values, foreground_value) draw_memory(grid, last_memory_values, foreground_value) draw_battery(grid, last_battery_values[0], last_battery_values[1], foreground_value) draw_borders_left(grid, background_value) - draw_to_LEDs(s1, grid) + left_drawing_queue.put(grid) # Draw to right LED Matrix - if not disk_queue.empty(): - last_disk_read, last_disk_write = disk_queue.get() - if not network_queue.empty(): - last_network_upload, last_network_download = network_queue.get() + last_disk_read, last_disk_write = disk_monitor.get() + last_network_upload, last_network_download = network_monitor.get() grid = np.zeros((9,34), dtype = int) draw_bar(grid, last_disk_read, foreground_value, bar_x_offset=1, draw_at_bottom=False) # Read @@ -111,20 +77,10 @@ if __name__ == "__main__": draw_bar(grid, last_network_upload, foreground_value, bar_x_offset=5, draw_at_bottom=False) # Upload draw_bar(grid, last_network_download, foreground_value, bar_x_offset=5, draw_at_bottom=True) # Download draw_borders_right(grid, background_value) - draw_to_LEDs(s2, grid) - - + right_drawing_queue.put(grid) except Exception as e: + import traceback print(f"Error in main loop: {e}") - try: - del s1 - except: - pass - try: - del s2 - except: - pass + traceback.print_exc() time.sleep(1.0) - s1 = init_device("1-4.2") - s2 = init_device("1-3.3") - time.sleep(0.05) \ No newline at end of file + time.sleep(0.1) \ No newline at end of file diff --git a/monitors.py b/monitors.py index b83f83d..88f5d5c 100644 --- a/monitors.py +++ b/monitors.py @@ -1,181 +1,130 @@ # Built In Dependencies import time import psutil -import threading +import os +if os.name == 'nt': + import wmi +else: + raise Exception("This script is not supported on this OS") -class DiskMonitorThread(threading.Thread): - def __init__(self, output_queue, hysterisis_time = 3, update_interval = 0.5): - super().__init__() - self.daemon = True +class DiskMonitor: + def __init__(self, hysterisis_time = 20): self.read_usage_history = [] self.write_usage_history = [] self.history_times = [] self.highest_read_rate = 0.00001 self.highest_write_rate = 0.00001 - self.max_history_size = int(round(hysterisis_time / update_interval)) - self.update_interval = update_interval - self.output_queue = output_queue + self.max_history_size = hysterisis_time - def run(self): - while True: - try: - if not self.output_queue.full(): - disk_io = psutil.disk_io_counters() - else: - print("Disk monitor queue is full") - read_usage = disk_io.read_bytes - write_usage = disk_io.write_bytes - self.read_usage_history.append(read_usage) - self.write_usage_history.append(write_usage) - self.history_times.append(time.time()) - if len(self.read_usage_history) > self.max_history_size: - self.read_usage_history = self.read_usage_history[-self.max_history_size:] - self.write_usage_history = self.write_usage_history[-self.max_history_size:] - self.history_times = self.history_times[-self.max_history_size:] + def get(self): + try: + disk_io = psutil.disk_io_counters() + read_usage = disk_io.read_bytes + write_usage = disk_io.write_bytes + self.read_usage_history.append(read_usage) + self.write_usage_history.append(write_usage) + self.history_times.append(time.time()) + if len(self.read_usage_history) > self.max_history_size: + self.read_usage_history = self.read_usage_history[-self.max_history_size:] + self.write_usage_history = self.write_usage_history[-self.max_history_size:] + self.history_times = self.history_times[-self.max_history_size:] - if len(self.read_usage_history) == self.max_history_size: - read_diff = self.read_usage_history[-1] - self.read_usage_history[0] - write_diff = self.write_usage_history[-1] - self.write_usage_history[0] - time_diff = self.history_times[-1] - self.history_times[0] - read_rate = read_diff / time_diff - write_rate = write_diff / time_diff - self.highest_read_rate = max(self.highest_read_rate, read_rate) - self.highest_write_rate = max(self.highest_write_rate, write_rate) - read_percent = min(1.0, read_rate / self.highest_read_rate) - write_percent = min(1.0, write_rate / self.highest_write_rate) - self.output_queue.put((read_percent, write_percent)) - except Exception as e: - print(f"Error in DiskMonitorThread: {e}") - time.sleep(self.update_interval) + read_diff = self.read_usage_history[-1] - self.read_usage_history[0] + write_diff = self.write_usage_history[-1] - self.write_usage_history[0] + time_diff = self.history_times[-1] - self.history_times[0] + read_rate = read_diff / time_diff + write_rate = write_diff / time_diff + self.highest_read_rate = max(self.highest_read_rate, read_rate) + self.highest_write_rate = max(self.highest_write_rate, write_rate) + read_percent = min(1.0, read_rate / self.highest_read_rate) + write_percent = min(1.0, write_rate / self.highest_write_rate) + return read_percent, write_percent + except Exception as e: + print(f"Error in DiskMonitor.get(): {e}") + return 0, 0 -class NetworkMonitorThread(threading.Thread): - def __init__(self, output_queue, hysterisis_time = 3, update_interval = 0.5): - super().__init__() - self.daemon = True +class NetworkMonitor: + def __init__(self, hysterisis_time = 20): self.sent_usage_history = [] self.recv_usage_history = [] self.history_times = [] self.highest_sent_rate = 0.00001 self.highest_recv_rate = 0.00001 - self.max_history_size = int(round(hysterisis_time / update_interval)) - self.update_interval = update_interval - self.output_queue = output_queue + self.max_history_size = hysterisis_time - def run(self): - while True: - try: - if not self.output_queue.full(): - net_io = psutil.net_io_counters() - else: - print("Network monitor queue is full") - sent_usage = net_io.bytes_sent - recv_usage = net_io.bytes_recv - self.sent_usage_history.append(sent_usage) - self.recv_usage_history.append(recv_usage) - self.history_times.append(time.time()) - if len(self.sent_usage_history) > self.max_history_size: - self.sent_usage_history = self.sent_usage_history[-self.max_history_size:] - self.recv_usage_history = self.recv_usage_history[-self.max_history_size:] - self.history_times = self.history_times[-self.max_history_size:] + def get(self): + try: + net_io = psutil.net_io_counters() + sent_usage = net_io.bytes_sent + recv_usage = net_io.bytes_recv + self.sent_usage_history.append(sent_usage) + self.recv_usage_history.append(recv_usage) + self.history_times.append(time.time()) + if len(self.sent_usage_history) > self.max_history_size: + self.sent_usage_history = self.sent_usage_history[-self.max_history_size:] + self.recv_usage_history = self.recv_usage_history[-self.max_history_size:] + self.history_times = self.history_times[-self.max_history_size:] - if len(self.sent_usage_history) == self.max_history_size: - sent_diff = self.sent_usage_history[-1] - self.sent_usage_history[0] - recv_diff = self.recv_usage_history[-1] - self.recv_usage_history[0] - time_diff = self.history_times[-1] - self.history_times[0] - sent_rate = sent_diff / time_diff - recv_rate = recv_diff / time_diff - self.highest_sent_rate = max(self.highest_sent_rate, sent_rate) - self.highest_recv_rate = max(self.highest_recv_rate, recv_rate) - sent_percent = min(1.0, sent_rate / self.highest_sent_rate) - recv_percent = min(1.0, recv_rate / self.highest_recv_rate) - self.output_queue.put((sent_percent, recv_percent)) - except Exception as e: - print(f"Error in NetworkMonitorThread: {e}") - time.sleep(self.update_interval) + sent_diff = self.sent_usage_history[-1] - self.sent_usage_history[0] + recv_diff = self.recv_usage_history[-1] - self.recv_usage_history[0] + time_diff = self.history_times[-1] - self.history_times[0] + sent_rate = sent_diff / time_diff + recv_rate = recv_diff / time_diff + self.highest_sent_rate = max(self.highest_sent_rate, sent_rate) + self.highest_recv_rate = max(self.highest_recv_rate, recv_rate) + sent_percent = min(1.0, sent_rate / self.highest_sent_rate) + recv_percent = min(1.0, recv_rate / self.highest_recv_rate) + return sent_percent, recv_percent + except Exception as e: + print(f"Error in NetworkMonitor.get(): {e}") + return 0, 0 -class CPUMonitorThread(threading.Thread): - def __init__(self, output_queue, hysterisis_time = 3, update_interval = 0.5): - super().__init__() - self.daemon = True +class CPUMonitor: + def __init__(self, hysterisis_time = 10): self.cpu_count = psutil.cpu_count() // 2 # 2 logical cores per physical core self.cpu_usage_history = [[] for _ in range(self.cpu_count)] self.history_times = [] - self.max_history_size = int(round(hysterisis_time / update_interval)) - self.update_interval = update_interval - self.output_queue = output_queue + self.max_history_size = hysterisis_time - def run(self): - while True: - try: - if not self.output_queue.full(): - cpu_usage = psutil.cpu_percent(percpu=True) - else: - print("CPU monitor queue is full") + def get(self): + try: + cpu_usage = psutil.cpu_percent(percpu=True) + for i in range(self.cpu_count): + useage = 2 * max(cpu_usage[2*i], cpu_usage[2*i+1]) # Combine logical cores + if useage > 100: + useage = 100 + self.cpu_usage_history[i].append(useage / 100.0) + self.history_times.append(time.time()) + if len(self.cpu_usage_history[0]) > self.max_history_size: for i in range(self.cpu_count): - useage = 2 * max(cpu_usage[2*i], cpu_usage[2*i+1]) # Combine logical cores - if useage > 100: - useage = 100 - self.cpu_usage_history[i].append(useage / 100.0) - self.history_times.append(time.time()) - if len(self.cpu_usage_history[0]) > self.max_history_size: - for i in range(self.cpu_count): - self.cpu_usage_history[i] = self.cpu_usage_history[i][-self.max_history_size:] + self.cpu_usage_history[i] = self.cpu_usage_history[i][-self.max_history_size:] self.history_times = self.history_times[-self.max_history_size:] - if len(self.cpu_usage_history[0]) == self.max_history_size: - cpu_percentages = [sum(core_history) / self.max_history_size for core_history in self.cpu_usage_history] - self.output_queue.put(cpu_percentages) - except Exception as e: - print(f"Error in CPUMonitorThread: {e}") - time.sleep(self.update_interval) + cpu_percentages = [sum(core_history) / self.max_history_size for core_history in self.cpu_usage_history] + # Somehow cpu_percentages can have values greater than 1 so we clamp them + return [min(1.0, cpu_percentage) for cpu_percentage in cpu_percentages] + except Exception as e: + print(f"Error in CPUMonitor.get(): {e}") + return [0] * self.cpu_count -class MemoryMonitorThread(threading.Thread): - def __init__(self, output_queue, hysterisis_time = 5, update_interval = 1.0): - super().__init__() - self.daemon = True - self.memory_usage_history = [] - self.history_times = [] - self.max_history_size = int(round(hysterisis_time / update_interval)) - self.update_interval = update_interval - self.output_queue = output_queue +class MemoryMonitor: + @staticmethod + def get(): + return psutil.virtual_memory().percent / 100.0 + - def run(self): - while True: - try: - if not self.output_queue.full(): - memory_usage = psutil.virtual_memory().percent / 100.0 - else: - print("Memory monitor queue is full") - self.memory_usage_history.append(memory_usage) - self.history_times.append(time.time()) - if len(self.memory_usage_history) > self.max_history_size: - self.memory_usage_history = self.memory_usage_history[-self.max_history_size:] - self.history_times = self.history_times[-self.max_history_size:] - if len(self.memory_usage_history) == self.max_history_size: - avg_memory_usage = sum(self.memory_usage_history) / self.max_history_size - self.output_queue.put(avg_memory_usage) - except Exception as e: - print(f"Error in MemoryMonitorThread: {e}") - time.sleep(self.update_interval) +class BatteryMonitor: + @staticmethod + def get(): + battery = psutil.sensors_battery() + if battery is not None: + battery_percentage = battery.percent / 100.0 + battery_plugged = battery.power_plugged + return battery_percentage, battery_plugged + -class BatteryMonitorThread(threading.Thread): - def __init__(self, output_queue, update_interval = 1): - super().__init__() - self.daemon = True - self.update_interval = update_interval - self.output_queue = output_queue - - def run(self): - while True: - try: - if not self.output_queue.full(): - battery = psutil.sensors_battery() - if battery is not None: - battery_percentage = battery.percent / 100.0 - battery_plugged = battery.power_plugged - self.output_queue.put((battery_percentage, battery_plugged)) - else: - print("Battery monitor queue is full") - except Exception as e: - print(f"Error in BatteryMonitorThread: {e}") - time.sleep(self.update_interval) \ No newline at end of file +def get_monitor_brightness(): + try: + return wmi.WMI(namespace='wmi').WmiMonitorBrightness()[0].CurrentBrightness + except: + return 50