Added exception handling

This commit is contained in:
= 2024-08-01 23:04:49 -04:00
parent 80fd16cbc6
commit 48c1fda83a
2 changed files with 48 additions and 32 deletions

View File

@ -82,22 +82,25 @@ if __name__ == "__main__":
last_battery_values = battery_queue.get() last_battery_values = battery_queue.get()
while True: while True:
if not cpu_queue.empty(): try:
last_cpu_values = cpu_queue.get() if not cpu_queue.empty():
if not memory_queue.empty(): last_cpu_values = cpu_queue.get()
last_memory_values = memory_queue.get() if not memory_queue.empty():
if not battery_queue.empty(): last_memory_values = memory_queue.get()
last_battery_values = battery_queue.get() if not battery_queue.empty():
last_battery_values = battery_queue.get()
screen_brightness = sbc.get_brightness()[0]
background_value = int(screen_brightness / 100 * (max_background_brightness - min_background_brightness) + min_background_brightness) screen_brightness = sbc.get_brightness()[0]
foreground_value = int(screen_brightness / 100 * (max_foreground_brightness - min_foreground_brightness) + min_foreground_brightness) background_value = int(screen_brightness / 100 * (max_background_brightness - min_background_brightness) + min_background_brightness)
grid = np.zeros((9,34), dtype = int) foreground_value = int(screen_brightness / 100 * (max_foreground_brightness - min_foreground_brightness) + min_foreground_brightness)
draw_cpu(grid, last_cpu_values, foreground_value) grid = np.zeros((9,34), dtype = int)
draw_memory(grid, last_memory_values, foreground_value) draw_cpu(grid, last_cpu_values, foreground_value)
draw_battery(grid, last_battery_values[0], last_battery_values[1], foreground_value) draw_memory(grid, last_memory_values, foreground_value)
draw_borders(grid, background_value) draw_battery(grid, last_battery_values[0], last_battery_values[1], foreground_value)
draw_to_LEDs(s, grid) draw_borders(grid, background_value)
draw_to_LEDs(s, grid)
except Exception as e:
print(f"Error in main loop: {e}")
time.sleep(0.05) time.sleep(0.05)

View File

@ -20,8 +20,9 @@ class DiskMonitorThread(threading.Thread):
def run(self): def run(self):
while True: while True:
if not self.output_queue.full(): try:
disk_io = psutil.disk_io_counters() if not self.output_queue.full():
disk_io = psutil.disk_io_counters()
read_usage = disk_io.read_bytes read_usage = disk_io.read_bytes
write_usage = disk_io.write_bytes write_usage = disk_io.write_bytes
self.read_usage_history.append(read_usage) self.read_usage_history.append(read_usage)
@ -43,7 +44,8 @@ class DiskMonitorThread(threading.Thread):
read_percent = min(1.0, read_rate / self.highest_read_rate) read_percent = min(1.0, read_rate / self.highest_read_rate)
write_percent = min(1.0, write_rate / self.highest_write_rate) write_percent = min(1.0, write_rate / self.highest_write_rate)
self.output_queue.put((read_percent, write_percent)) self.output_queue.put((read_percent, write_percent))
except Exception as e:
print(f"Error in DiskMonitorThread: {e}")
time.sleep(self.update_interval) time.sleep(self.update_interval)
class NetworkMonitorThread(threading.Thread): class NetworkMonitorThread(threading.Thread):
@ -61,8 +63,9 @@ class NetworkMonitorThread(threading.Thread):
def run(self): def run(self):
while True: while True:
if not self.output_queue.full(): try:
net_io = psutil.net_io_counters() if not self.output_queue.full():
net_io = psutil.net_io_counters()
sent_usage = net_io.bytes_sent sent_usage = net_io.bytes_sent
recv_usage = net_io.bytes_recv recv_usage = net_io.bytes_recv
self.sent_usage_history.append(sent_usage) self.sent_usage_history.append(sent_usage)
@ -84,7 +87,8 @@ class NetworkMonitorThread(threading.Thread):
sent_percent = min(1.0, sent_rate / self.highest_sent_rate) sent_percent = min(1.0, sent_rate / self.highest_sent_rate)
recv_percent = min(1.0, recv_rate / self.highest_recv_rate) recv_percent = min(1.0, recv_rate / self.highest_recv_rate)
self.output_queue.put((sent_percent, recv_percent)) self.output_queue.put((sent_percent, recv_percent))
except Exception as e:
print(f"Error in NetworkMonitorThread: {e}")
time.sleep(self.update_interval) time.sleep(self.update_interval)
class CPUMonitorThread(threading.Thread): class CPUMonitorThread(threading.Thread):
@ -100,8 +104,9 @@ class CPUMonitorThread(threading.Thread):
def run(self): def run(self):
while True: while True:
if not self.output_queue.full(): try:
cpu_usage = psutil.cpu_percent(percpu=True) if not self.output_queue.full():
cpu_usage = psutil.cpu_percent(percpu=True)
for i in range(self.cpu_count): for i in range(self.cpu_count):
useage = 2 * max(cpu_usage[2*i], cpu_usage[2*i+1]) # Combine logical cores useage = 2 * max(cpu_usage[2*i], cpu_usage[2*i+1]) # Combine logical cores
if useage > 100: if useage > 100:
@ -115,6 +120,8 @@ class CPUMonitorThread(threading.Thread):
if len(self.cpu_usage_history[0]) == self.max_history_size: if len(self.cpu_usage_history[0]) == self.max_history_size:
cpu_percentages = [sum(core_history) / self.max_history_size for core_history in self.cpu_usage_history] cpu_percentages = [sum(core_history) / self.max_history_size for core_history in self.cpu_usage_history]
self.output_queue.put(cpu_percentages) self.output_queue.put(cpu_percentages)
except Exception as e:
print(f"Error in CPUMonitorThread: {e}")
time.sleep(self.update_interval) time.sleep(self.update_interval)
class MemoryMonitorThread(threading.Thread): class MemoryMonitorThread(threading.Thread):
@ -129,8 +136,9 @@ class MemoryMonitorThread(threading.Thread):
def run(self): def run(self):
while True: while True:
if not self.output_queue.full(): try:
memory_usage = psutil.virtual_memory().percent / 100.0 if not self.output_queue.full():
memory_usage = psutil.virtual_memory().percent / 100.0
self.memory_usage_history.append(memory_usage) self.memory_usage_history.append(memory_usage)
self.history_times.append(time.time()) self.history_times.append(time.time())
if len(self.memory_usage_history) > self.max_history_size: if len(self.memory_usage_history) > self.max_history_size:
@ -139,6 +147,8 @@ class MemoryMonitorThread(threading.Thread):
if len(self.memory_usage_history) == self.max_history_size: if len(self.memory_usage_history) == self.max_history_size:
avg_memory_usage = sum(self.memory_usage_history) / self.max_history_size avg_memory_usage = sum(self.memory_usage_history) / self.max_history_size
self.output_queue.put(avg_memory_usage) self.output_queue.put(avg_memory_usage)
except Exception as e:
print(f"Error in MemoryMonitorThread: {e}")
time.sleep(self.update_interval) time.sleep(self.update_interval)
class BatteryMonitorThread(threading.Thread): class BatteryMonitorThread(threading.Thread):
@ -150,12 +160,15 @@ class BatteryMonitorThread(threading.Thread):
def run(self): def run(self):
while True: while True:
if not self.output_queue.full(): try:
battery = psutil.sensors_battery() if not self.output_queue.full():
if battery is not None: battery = psutil.sensors_battery()
battery_percentage = battery.percent / 100.0 if battery is not None:
battery_plugged = battery.power_plugged battery_percentage = battery.percent / 100.0
self.output_queue.put((battery_percentage, battery_plugged)) battery_plugged = battery.power_plugged
self.output_queue.put((battery_percentage, battery_plugged))
except Exception as e:
print(f"Error in BatteryMonitorThread: {e}")
time.sleep(self.update_interval) time.sleep(self.update_interval)
if __name__ == "__main__": if __name__ == "__main__":