open-signature-generator/server.py

417 lines
No EOL
17 KiB
Python

# Written by Jeremy Karst 2025
# This software is licensed under the MIT License with Additional Terms.
# See LICENSE.md for more information.
# Built-in Dependencies
import os
import sqlite3
import time
import threading
import datetime
import io
import json
import logging
from logging.handlers import RotatingFileHandler
from collections import deque
from threading import Timer
import multiprocessing
from multiprocessing import Pool
# External Dependencies
import cherrypy
from cherrypy.lib import file_generator
from fontTools import ttLib
import geocoder
# Local Dependencies
from sha_sign import hash_text, generate_signature_image
def open_db(path):
db_lock = threading.Lock()
if not os.path.exists(path):
conn = sqlite3.connect(path, check_same_thread=False, cached_statements=0, autocommit = False)
conn.execute("CREATE TABLE signatures (hash TEXT PRIMARY KEY, name TEXT, time INTEGER, timezone INTEGER, font TEXT, invert BOOLEAN, identity TEXT)")
conn.commit()
else:
conn = sqlite3.connect(path, check_same_thread=False, cached_statements=0)
return conn, db_lock
def run_server(host_ip, host_port, host_prefix, static_dir, database_connection, db_lock, font_data, production=False):
logger = logging.getLogger(__name__)
static_dir = os.path.abspath(static_dir)
# Create process pool for signature generation
process_pool = Pool(processes=multiprocessing.cpu_count() // 2)
# Define web server
root_server_config= {
'/': {
'tools.staticdir.on': True,
'tools.staticdir.dir': static_dir,
'tools.staticdir.index': 'index.html',
},
'/images': {
'tools.caching.on': True,
'tools.caching.delay': 60,
}
}
global_server_config = {
'server.socket_host': host_ip,
'server.socket_port': host_port,
'server.max_request_body_size': 10*1024,
'server.socket_timeout': 10,
'response.timeout': 30,
'tools.gzip.on': True,
'tools.gzip.mime_types': ['text/*', 'application/*'],
'tools.encode.text_only': False
}
if production:
global_server_config['environment'] = 'production'
cherrypy.config.update(global_server_config)
def error_page_404(status, message, traceback, version):
return "Error 404: Page Not Found!"
cherrypy.config.update({'error_page.404': error_page_404})
class RateLimiter:
def __init__(self, requests_per_interval=20, limit_interval = 120, cleanup_interval=300):
assert cleanup_interval > limit_interval, "Cleanup interval must be greater than limit interval"
self.requests_per_minute = requests_per_interval
self.window = limit_interval # seconds
self.cleanup_interval = cleanup_interval # cleanup every 5 minutes
self.requests = {} # ip -> deque of timestamps
self.start_cleanup_timer()
def start_cleanup_timer(self):
"""Start periodic cleanup of old IP records"""
Timer(self.cleanup_interval, self._cleanup_old_records).start()
def _cleanup_old_records(self):
"""Remove IPs that haven't made requests in the last window"""
current_time = time.time()
# Create list of IPs to remove to avoid runtime dictionary modification
to_remove = [
ip for ip, timestamps in self.requests.items()
if not timestamps or current_time - timestamps[-1] > self.window
]
for ip in to_remove:
del self.requests[ip]
# Restart cleanup timer
self.start_cleanup_timer()
def is_rate_limited(self, ip):
if ip == '127.0.0.1':
return False
current_time = time.time()
# Initialize or get request deque for this IP
if ip not in self.requests:
self.requests[ip] = deque(maxlen=self.requests_per_minute)
timestamps = self.requests[ip]
# Remove timestamps outside the current window
while timestamps and current_time - timestamps[0] > self.window:
timestamps.popleft()
# Check if rate limited
if len(timestamps) >= self.requests_per_minute:
return True
# Add new request timestamp
timestamps.append(current_time)
return False
class RootServer(object):
rate_limiter = RateLimiter()
with open(os.path.join(static_dir, "verification.html"), "r") as f:
verification_html = f.read()
with open(os.path.join(static_dir, "howto.html"), "r") as f:
howto_html = f.read()
with open(os.path.join(static_dir, "privacy.html"), "r") as f:
privacy_html = f.read()
@staticmethod
def _get_ip(request):
if 'Cf-Connecting-Ip' in request.headers:
logger.debug("Using Cloudflare IP: %s", request.headers['Cf-Connecting-Ip'])
ip = request.headers['CF-Connecting-IP']
elif 'X-Forwarded-For' in request.headers:
logger.debug("Using X-Forwarded-For IP: %s", request.headers['X-Forwarded-For'])
ip = request.headers['X-Forwarded-For']
else:
logger.debug("Using Remote IP: %s", request.remote.ip)
ip = request.remote.ip
return ip
@cherrypy.expose
def register(self, name: str, font: str, timezone: str = '0', invert: str = 'false'):
request = cherrypy.request
ip = self._get_ip(request)
if self.rate_limiter.is_rate_limited(ip):
raise cherrypy.HTTPError(429, "Too Many Requests, try again later.")
# Validate inputs
if len(name) <= 0 or len(name) > 255:
raise cherrypy.HTTPError(400, "Bad Request, invalid signature name")
# Ensure name can be utf-8 encoded
try:
name.encode('utf-8')
except UnicodeEncodeError:
raise cherrypy.HTTPError(400, "Bad Request, invalid signature name")
if font not in font_data:
raise cherrypy.HTTPError(400, "Bad Request, invalid font name")
try:
timezone = int(timezone)
except ValueError:
raise cherrypy.HTTPError(400, "Bad Request, invalid timezone")
if timezone < -12*60 or timezone > 14*60:
raise cherrypy.HTTPError(400, "Bad Request, invalid timezone")
if invert.lower() not in ['true', 'false']:
raise cherrypy.HTTPError(400, "Bad Request, invalid invert value")
else:
try:
invert = invert.lower() == 'true'
except ValueError:
raise cherrypy.HTTPError(400, "Bad Request, invalid invert value")
# CamelCase the name if it's not already
name_parts = name.split(' ')
for i, part in enumerate(name_parts):
if part[0].islower():
name_parts[i] = part[0].upper() + part[1:]
name = ' '.join(name_parts)
# Get user agent and ip for later signature verification
user_agent = request.headers['User-Agent']
# Try to get user location
try:
if ip == '127.0.0.1':
raise Exception("Localhost IP detected, skipping location lookup")
location = geocoder.ip(ip)
if location.lat is None or location.lng is None:
latlon = [0.0, 0.0]
else:
latlon = [location.lat, location.lng]
if location.org is None:
provider = 'None Found'
else:
provider = location.org
if location.address == "":
address = 'None Found'
else:
address = location.address
except Exception as e:
latlon = [0.0, 0.0]
address = 'None Found'
provider = 'None Found'
# Get current unix time for signature timestamp
unix_time = int(time.time())
# Generate signature hash
hash_id = hash_text(f"{name}{unix_time}")
# Sanity check hash length, in theory there is a small chance the hash will be short since we prune '=' padding
if len(hash_id) < 8:
raise cherrypy.HTTPError(500, "Internal Server Error, hash generation failed")
identity = json.dumps({ # This shouldn't be able to fail since geocoder's api is stable and returns stringifyable data
'ip': ip,
'useragent': user_agent,
'latlon': latlon,
'address': address,
'provider': provider
})
# Insert signature into database
with db_lock:
try:
database_connection.execute("INSERT INTO signatures (hash, name, time, timezone, font, invert, identity) VALUES (?, ?, ?, ?, ?, ?, ?)", (hash_id, name, unix_time, timezone, font, invert, identity))
database_connection.commit()
except Exception as e:
logger.error("Error inserting signature into database: %s", e)
raise cherrypy.HTTPError(500, "Internal Server Error, failed to insert signature into database")
return hash_id
@cherrypy.expose
def verify(self, *args):
request = cherrypy.request
ip = self._get_ip(request)
if self.rate_limiter.is_rate_limited(ip):
raise cherrypy.HTTPError(429, "Too Many Requests, try again later.")
if len(args) == 1:
id = args[0]
if len(id) == 0 or len(id) > 64:
raise cherrypy.HTTPError(400, "Bad Request, invalid signature id")
try:
id.encode('utf-8')
except UnicodeEncodeError:
raise cherrypy.HTTPError(400, "Bad Request, invalid signature id")
# Get signature data from database
cursor = database_connection.cursor()
cursor.execute("SELECT name, time, timezone, font, invert, identity FROM signatures WHERE hash = ?", (id,))
result = cursor.fetchone()
if result is None:
raise cherrypy.HTTPError(404, "Not Found")
name, time, timezone, font, invert, identity = result
return json.dumps({
"name": name,
"time": time,
"timezone": timezone,
"font": font,
"invert": invert,
"identity": json.loads(identity)
})
else: # Redirect to verification page
raise cherrypy.HTTPRedirect(f"{host_prefix}/verify/{id}")
@cherrypy.expose
def v(self, *args):
request = cherrypy.request
ip = self._get_ip(request)
if self.rate_limiter.is_rate_limited(ip):
raise cherrypy.HTTPError(429, "Too Many Requests, try again later.")
if len(args) == 1:
id = args[0]
if len(id) == 0 or len(id) > 64:
raise cherrypy.HTTPError(400, "Bad Request, invalid signature id")
try:
id.encode('utf-8')
except UnicodeEncodeError:
raise cherrypy.HTTPError(400, "Bad Request, invalid signature id")
if 'environment' in global_server_config and global_server_config['environment'] == 'production':
return self.verification_html
else: # If we are debugging, read the newest verification.html file from disk
with open(os.path.join(static_dir, "verification.html"), "r") as f:
verification_html = f.read()
return verification_html
else:
raise cherrypy.HTTPRedirect(f"{host_prefix}")
@cherrypy.expose
def fonts(self):
return json.dumps(font_data)
@cherrypy.expose
def images(self, *args):
request = cherrypy.request
ip = self._get_ip(request)
if self.rate_limiter.is_rate_limited(ip):
raise cherrypy.HTTPError(429, "Too Many Requests, try again later.")
if len(args) == 1:
hash_id = args[0]
if hash_id.endswith('.png'):
hash_id = hash_id[:-4]
try:
hash_id.encode('utf-8')
except UnicodeEncodeError:
raise cherrypy.HTTPError(400, "Bad Request, invalid signature id")
qr_url = f"{host_prefix}/v/{hash_id}"
cursor = database_connection.cursor()
cursor.execute("SELECT name, time, timezone, font, invert FROM signatures WHERE hash = ?", (hash_id,))
result = cursor.fetchone()
if result is None:
raise cherrypy.HTTPError(404, "Not Found")
name, time, timezone, font, invert = result
timezone = datetime.timezone(datetime.timedelta(minutes=timezone))
sig_font_path = os.path.join(static_dir, "fonts", font_data[font])
note_font_path = os.path.join(static_dir, "fonts", "Steelfish.ttf")
# Offload image generation to process pool
try:
sig_image = process_pool.apply(
generate_signature_image,
(name, time, timezone, qr_url, sig_font_path, note_font_path, invert)
)
buffer = io.BytesIO()
sig_image.save(buffer, format="PNG")
buffer.seek(0)
return file_generator(buffer)
except Exception as e:
logger.error("Error generating signature image: %s", e)
raise cherrypy.HTTPError(500, "Internal Server Error")
else:
raise cherrypy.HTTPError(404, "Not Found")
@cherrypy.expose
def howto(self):
if 'environment' in global_server_config and global_server_config['environment'] == 'production':
return self.howto_html
else: # If we are debugging, read the newest file from disk
with open(os.path.join(static_dir, "howto.html"), "r") as f:
howto_html = f.read()
return howto_html
@cherrypy.expose
def privacy(self):
if 'environment' in global_server_config and global_server_config['environment'] == 'production':
return self.privacy_html
else: # If we are debugging, read the newest file from disk
with open(os.path.join(static_dir, "privacy.html"), "r") as f:
privacy_html = f.read()
return privacy_html
logger.info("Starting server on %s:%s", host_ip, host_port)
cherrypy.quickstart(RootServer(), config=root_server_config)
if __name__ == "__main__":
socket_host = '0.0.0.0'
socket_port = 8080
static_dir = "./web_content/"
db_path = "./data/signatures.db"
host_prefix = "osg.jkdev.org"
production = True
if os.environ.get("DOCKER"):
host_prefix = os.environ.get("HOSTPREFIX")
production = True
os.makedirs(os.path.dirname(db_path), exist_ok=True)
database_connection, db_lock = open_db(db_path)
font_data = {}
for file in os.listdir(os.path.join(static_dir, "fonts")):
if file.endswith(".ttf"):
font = ttLib.TTFont(os.path.join(static_dir, "fonts", file))
font_name = font['name'].getDebugName(4)
if font_name.endswith(' Regular'):
font_name = font_name[:-8]
font_data[font_name] = file
# Remove Steelfish from font data, we only use it for the note font
font_data.pop('Steelfish')
logger = logging.getLogger(__name__)
logger.debug("Font data: %s", font_data)
handler = RotatingFileHandler('server.log', maxBytes=10*1024*1024, backupCount=5)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers = [handler]
)
run_server(socket_host, socket_port, host_prefix, static_dir, database_connection, db_lock, font_data, production)