# Written by Jeremy Karst 2025 # This software is licensed under the MIT License with Additional Terms. # See LICENSE.md for more information. # Built-in Dependencies import os import sqlite3 import time import threading import datetime import io import json import logging from logging.handlers import RotatingFileHandler from collections import deque from threading import Timer import multiprocessing from multiprocessing import Pool # External Dependencies import cherrypy from cherrypy.lib import file_generator from fontTools import ttLib import geocoder # Local Dependencies from sha_sign import hash_text, generate_signature_image def open_db(path): db_lock = threading.Lock() if not os.path.exists(path): conn = sqlite3.connect(path, check_same_thread=False, cached_statements=0, autocommit = False) conn.execute("CREATE TABLE signatures (hash TEXT PRIMARY KEY, name TEXT, time INTEGER, timezone INTEGER, font TEXT, invert BOOLEAN, identity TEXT)") conn.commit() else: conn = sqlite3.connect(path, check_same_thread=False, cached_statements=0) return conn, db_lock def run_server(host_ip, host_port, host_prefix, static_dir, database_connection, db_lock, font_data, production=False): logger = logging.getLogger(__name__) static_dir = os.path.abspath(static_dir) # Create process pool for signature generation process_pool = Pool(processes=multiprocessing.cpu_count() // 2) # Define web server root_server_config= { '/': { 'tools.staticdir.on': True, 'tools.staticdir.dir': static_dir, 'tools.staticdir.index': 'index.html', }, '/images': { 'tools.caching.on': True, 'tools.caching.delay': 60, } } global_server_config = { 'server.socket_host': host_ip, 'server.socket_port': host_port, 'server.max_request_body_size': 10*1024, 'server.socket_timeout': 10, 'response.timeout': 30, 'tools.gzip.on': True, 'tools.gzip.mime_types': ['text/*', 'application/*'], 'tools.encode.text_only': False } if production: global_server_config['environment'] = 'production' cherrypy.config.update(global_server_config) def error_page_404(status, message, traceback, version): return "Error 404: Page Not Found!" cherrypy.config.update({'error_page.404': error_page_404}) class RateLimiter: def __init__(self, requests_per_interval=20, limit_interval = 120, cleanup_interval=300): assert cleanup_interval > limit_interval, "Cleanup interval must be greater than limit interval" self.requests_per_minute = requests_per_interval self.window = limit_interval # seconds self.cleanup_interval = cleanup_interval # cleanup every 5 minutes self.requests = {} # ip -> deque of timestamps self.start_cleanup_timer() def start_cleanup_timer(self): """Start periodic cleanup of old IP records""" Timer(self.cleanup_interval, self._cleanup_old_records).start() def _cleanup_old_records(self): """Remove IPs that haven't made requests in the last window""" current_time = time.time() # Create list of IPs to remove to avoid runtime dictionary modification to_remove = [ ip for ip, timestamps in self.requests.items() if not timestamps or current_time - timestamps[-1] > self.window ] for ip in to_remove: del self.requests[ip] # Restart cleanup timer self.start_cleanup_timer() def is_rate_limited(self, ip): if ip == '127.0.0.1': return False current_time = time.time() # Initialize or get request deque for this IP if ip not in self.requests: self.requests[ip] = deque(maxlen=self.requests_per_minute) timestamps = self.requests[ip] # Remove timestamps outside the current window while timestamps and current_time - timestamps[0] > self.window: timestamps.popleft() # Check if rate limited if len(timestamps) >= self.requests_per_minute: return True # Add new request timestamp timestamps.append(current_time) return False class RootServer(object): rate_limiter = RateLimiter() with open(os.path.join(static_dir, "verification.html"), "r") as f: verification_html = f.read() with open(os.path.join(static_dir, "howto.html"), "r") as f: howto_html = f.read() with open(os.path.join(static_dir, "privacy.html"), "r") as f: privacy_html = f.read() @staticmethod def _get_ip(request): if 'Cf-Connecting-Ip' in request.headers: logger.debug("Using Cloudflare IP: %s", request.headers['Cf-Connecting-Ip']) ip = request.headers['CF-Connecting-IP'] elif 'X-Forwarded-For' in request.headers: logger.debug("Using X-Forwarded-For IP: %s", request.headers['X-Forwarded-For']) ip = request.headers['X-Forwarded-For'] else: logger.debug("Using Remote IP: %s", request.remote.ip) ip = request.remote.ip return ip @cherrypy.expose def register(self, name: str, font: str, timezone: str = '0', invert: str = 'false'): request = cherrypy.request ip = self._get_ip(request) if self.rate_limiter.is_rate_limited(ip): raise cherrypy.HTTPError(429, "Too Many Requests, try again later.") # Validate inputs if len(name) <= 0 or len(name) > 255: raise cherrypy.HTTPError(400, "Bad Request, invalid signature name") # Ensure name can be utf-8 encoded try: name.encode('utf-8') except UnicodeEncodeError: raise cherrypy.HTTPError(400, "Bad Request, invalid signature name") if font not in font_data: raise cherrypy.HTTPError(400, "Bad Request, invalid font name") try: timezone = int(timezone) except ValueError: raise cherrypy.HTTPError(400, "Bad Request, invalid timezone") if timezone < -12*60 or timezone > 14*60: raise cherrypy.HTTPError(400, "Bad Request, invalid timezone") if invert.lower() not in ['true', 'false']: raise cherrypy.HTTPError(400, "Bad Request, invalid invert value") else: try: invert = invert.lower() == 'true' except ValueError: raise cherrypy.HTTPError(400, "Bad Request, invalid invert value") # CamelCase the name if it's not already name_parts = name.split(' ') for i, part in enumerate(name_parts): if part[0].islower(): name_parts[i] = part[0].upper() + part[1:] name = ' '.join(name_parts) # Get user agent and ip for later signature verification user_agent = request.headers['User-Agent'] # Try to get user location try: if ip == '127.0.0.1': raise Exception("Localhost IP detected, skipping location lookup") location = geocoder.ip(ip) if location.lat is None or location.lng is None: latlon = [0.0, 0.0] else: latlon = [location.lat, location.lng] if location.org is None: provider = 'None Found' else: provider = location.org if location.address == "": address = 'None Found' else: address = location.address except Exception as e: latlon = [0.0, 0.0] address = 'None Found' provider = 'None Found' # Get current unix time for signature timestamp unix_time = int(time.time()) # Generate signature hash hash_id = hash_text(f"{name}{unix_time}") # Sanity check hash length, in theory there is a small chance the hash will be short since we prune '=' padding if len(hash_id) < 8: raise cherrypy.HTTPError(500, "Internal Server Error, hash generation failed") identity = json.dumps({ # This shouldn't be able to fail since geocoder's api is stable and returns stringifyable data 'ip': ip, 'useragent': user_agent, 'latlon': latlon, 'address': address, 'provider': provider }) # Insert signature into database with db_lock: try: database_connection.execute("INSERT INTO signatures (hash, name, time, timezone, font, invert, identity) VALUES (?, ?, ?, ?, ?, ?, ?)", (hash_id, name, unix_time, timezone, font, invert, identity)) database_connection.commit() except Exception as e: logger.error("Error inserting signature into database: %s", e) raise cherrypy.HTTPError(500, "Internal Server Error, failed to insert signature into database") return hash_id @cherrypy.expose def verify(self, *args): request = cherrypy.request ip = self._get_ip(request) if self.rate_limiter.is_rate_limited(ip): raise cherrypy.HTTPError(429, "Too Many Requests, try again later.") if len(args) == 1: id = args[0] if len(id) == 0 or len(id) > 64: raise cherrypy.HTTPError(400, "Bad Request, invalid signature id") try: id.encode('utf-8') except UnicodeEncodeError: raise cherrypy.HTTPError(400, "Bad Request, invalid signature id") # Get signature data from database cursor = database_connection.cursor() cursor.execute("SELECT name, time, timezone, font, invert, identity FROM signatures WHERE hash = ?", (id,)) result = cursor.fetchone() if result is None: raise cherrypy.HTTPError(404, "Not Found") name, time, timezone, font, invert, identity = result return json.dumps({ "name": name, "time": time, "timezone": timezone, "font": font, "invert": invert, "identity": json.loads(identity) }) else: # Redirect to verification page raise cherrypy.HTTPRedirect(f"{host_prefix}/verify/{id}") @cherrypy.expose def v(self, *args): request = cherrypy.request ip = self._get_ip(request) if self.rate_limiter.is_rate_limited(ip): raise cherrypy.HTTPError(429, "Too Many Requests, try again later.") if len(args) == 1: id = args[0] if len(id) == 0 or len(id) > 64: raise cherrypy.HTTPError(400, "Bad Request, invalid signature id") try: id.encode('utf-8') except UnicodeEncodeError: raise cherrypy.HTTPError(400, "Bad Request, invalid signature id") if 'environment' in global_server_config and global_server_config['environment'] == 'production': return self.verification_html else: # If we are debugging, read the newest verification.html file from disk with open(os.path.join(static_dir, "verification.html"), "r") as f: verification_html = f.read() return verification_html else: raise cherrypy.HTTPRedirect(f"{host_prefix}") @cherrypy.expose def fonts(self): return json.dumps(font_data) @cherrypy.expose def images(self, *args): request = cherrypy.request ip = self._get_ip(request) if self.rate_limiter.is_rate_limited(ip): raise cherrypy.HTTPError(429, "Too Many Requests, try again later.") if len(args) == 1: hash_id = args[0] if hash_id.endswith('.png'): hash_id = hash_id[:-4] try: hash_id.encode('utf-8') except UnicodeEncodeError: raise cherrypy.HTTPError(400, "Bad Request, invalid signature id") qr_url = f"{host_prefix}/v/{hash_id}" cursor = database_connection.cursor() cursor.execute("SELECT name, time, timezone, font, invert FROM signatures WHERE hash = ?", (hash_id,)) result = cursor.fetchone() if result is None: raise cherrypy.HTTPError(404, "Not Found") name, time, timezone, font, invert = result timezone = datetime.timezone(datetime.timedelta(minutes=timezone)) sig_font_path = os.path.join(static_dir, "fonts", font_data[font]) note_font_path = os.path.join(static_dir, "fonts", "Steelfish.ttf") # Offload image generation to process pool try: sig_image = process_pool.apply( generate_signature_image, (name, time, timezone, qr_url, sig_font_path, note_font_path, invert) ) buffer = io.BytesIO() sig_image.save(buffer, format="PNG") buffer.seek(0) return file_generator(buffer) except Exception as e: logger.error("Error generating signature image: %s", e) raise cherrypy.HTTPError(500, "Internal Server Error") else: raise cherrypy.HTTPError(404, "Not Found") @cherrypy.expose def howto(self): if 'environment' in global_server_config and global_server_config['environment'] == 'production': return self.howto_html else: # If we are debugging, read the newest file from disk with open(os.path.join(static_dir, "howto.html"), "r") as f: howto_html = f.read() return howto_html @cherrypy.expose def privacy(self): if 'environment' in global_server_config and global_server_config['environment'] == 'production': return self.privacy_html else: # If we are debugging, read the newest file from disk with open(os.path.join(static_dir, "privacy.html"), "r") as f: privacy_html = f.read() return privacy_html logger.info("Starting server on %s:%s", host_ip, host_port) cherrypy.quickstart(RootServer(), config=root_server_config) if __name__ == "__main__": socket_host = '0.0.0.0' socket_port = 8080 static_dir = "./web_content/" db_path = "./data/signatures.db" host_prefix = "osg.jkdev.org" production = True if os.environ.get("DOCKER"): host_prefix = os.environ.get("HOSTPREFIX") production = True os.makedirs(os.path.dirname(db_path), exist_ok=True) database_connection, db_lock = open_db(db_path) font_data = {} for file in os.listdir(os.path.join(static_dir, "fonts")): if file.endswith(".ttf"): font = ttLib.TTFont(os.path.join(static_dir, "fonts", file)) font_name = font['name'].getDebugName(4) if font_name.endswith(' Regular'): font_name = font_name[:-8] font_data[font_name] = file # Remove Steelfish from font data, we only use it for the note font font_data.pop('Steelfish') logger = logging.getLogger(__name__) logger.debug("Font data: %s", font_data) handler = RotatingFileHandler('server.log', maxBytes=10*1024*1024, backupCount=5) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers = [handler] ) run_server(socket_host, socket_port, host_prefix, static_dir, database_connection, db_lock, font_data, production)