420 lines
No EOL
17 KiB
Python
420 lines
No EOL
17 KiB
Python
# Written by Jeremy Karst 2025
|
|
# This software is licensed under the MIT License with Additional Terms.
|
|
# See LICENSE.md for more information.
|
|
|
|
# Built-in Dependencies
|
|
import os
|
|
import sqlite3
|
|
import time
|
|
import threading
|
|
import datetime
|
|
import io
|
|
import json
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
from collections import deque
|
|
from threading import Timer
|
|
import multiprocessing
|
|
from multiprocessing import Pool
|
|
|
|
# External Dependencies
|
|
import cherrypy
|
|
from cherrypy.lib import file_generator
|
|
from fontTools import ttLib
|
|
import geocoder
|
|
|
|
# Local Dependencies
|
|
from sha_sign import hash_text, generate_signature_image
|
|
|
|
|
|
def open_db(path):
|
|
db_lock = threading.Lock()
|
|
if not os.path.exists(path):
|
|
conn = sqlite3.connect(path, check_same_thread=False, cached_statements=0, autocommit = False)
|
|
conn.execute("CREATE TABLE signatures (hash TEXT PRIMARY KEY, name TEXT, time INTEGER, timezone INTEGER, font TEXT, invert BOOLEAN, identity TEXT)")
|
|
conn.commit()
|
|
else:
|
|
conn = sqlite3.connect(path, check_same_thread=False, cached_statements=0)
|
|
return conn, db_lock
|
|
|
|
|
|
def run_server(host_ip, host_port, host_prefix, static_dir, database_connection, db_lock, font_data, production=False):
|
|
logger = logging.getLogger(__name__)
|
|
static_dir = os.path.abspath(static_dir)
|
|
|
|
# Create process pool for signature generation
|
|
process_pool = Pool(processes=multiprocessing.cpu_count() // 2)
|
|
|
|
# Define web server
|
|
root_server_config= {
|
|
'/': {
|
|
'tools.staticdir.on': True,
|
|
'tools.staticdir.dir': static_dir,
|
|
'tools.staticdir.index': 'index.html',
|
|
},
|
|
'/images': {
|
|
'tools.caching.on': True,
|
|
'tools.caching.delay': 60,
|
|
}
|
|
}
|
|
|
|
global_server_config = {
|
|
'server.socket_host': host_ip,
|
|
'server.socket_port': host_port,
|
|
'server.max_request_body_size': 10*1024,
|
|
'server.socket_timeout': 10,
|
|
'response.timeout': 30,
|
|
'tools.gzip.on': True,
|
|
'tools.gzip.mime_types': ['text/*', 'application/*', 'font/*'],
|
|
'tools.encode.on': True,
|
|
'tools.encode.encoding': 'utf-8',
|
|
'tools.encode.text_only': False,
|
|
'cherrypy.response.headers': {'Cache-Control': f'max-age={30*24*60*60}'}, # 30 days
|
|
}
|
|
|
|
if production:
|
|
global_server_config['environment'] = 'production'
|
|
|
|
cherrypy.config.update(global_server_config)
|
|
|
|
def error_page_404(status, message, traceback, version):
|
|
return "Error 404: Page Not Found!"
|
|
cherrypy.config.update({'error_page.404': error_page_404})
|
|
|
|
class RateLimiter:
|
|
def __init__(self, requests_per_interval=20, limit_interval = 120, cleanup_interval=300):
|
|
assert cleanup_interval > limit_interval, "Cleanup interval must be greater than limit interval"
|
|
self.requests_per_minute = requests_per_interval
|
|
self.window = limit_interval # seconds
|
|
self.cleanup_interval = cleanup_interval # cleanup every 5 minutes
|
|
self.requests = {} # ip -> deque of timestamps
|
|
self.start_cleanup_timer()
|
|
|
|
def start_cleanup_timer(self):
|
|
"""Start periodic cleanup of old IP records"""
|
|
Timer(self.cleanup_interval, self._cleanup_old_records).start()
|
|
|
|
def _cleanup_old_records(self):
|
|
"""Remove IPs that haven't made requests in the last window"""
|
|
current_time = time.time()
|
|
# Create list of IPs to remove to avoid runtime dictionary modification
|
|
to_remove = [
|
|
ip for ip, timestamps in self.requests.items()
|
|
if not timestamps or current_time - timestamps[-1] > self.window
|
|
]
|
|
for ip in to_remove:
|
|
del self.requests[ip]
|
|
|
|
# Restart cleanup timer
|
|
self.start_cleanup_timer()
|
|
|
|
def is_rate_limited(self, ip):
|
|
if ip == '127.0.0.1':
|
|
return False
|
|
|
|
current_time = time.time()
|
|
|
|
# Initialize or get request deque for this IP
|
|
if ip not in self.requests:
|
|
self.requests[ip] = deque(maxlen=self.requests_per_minute)
|
|
|
|
timestamps = self.requests[ip]
|
|
|
|
# Remove timestamps outside the current window
|
|
while timestamps and current_time - timestamps[0] > self.window:
|
|
timestamps.popleft()
|
|
|
|
# Check if rate limited
|
|
if len(timestamps) >= self.requests_per_minute:
|
|
return True
|
|
|
|
# Add new request timestamp
|
|
timestamps.append(current_time)
|
|
return False
|
|
|
|
class RootServer(object):
|
|
rate_limiter = RateLimiter()
|
|
|
|
with open(os.path.join(static_dir, "verification.html"), "r") as f:
|
|
verification_html = f.read()
|
|
with open(os.path.join(static_dir, "howto.html"), "r") as f:
|
|
howto_html = f.read()
|
|
with open(os.path.join(static_dir, "privacy.html"), "r") as f:
|
|
privacy_html = f.read()
|
|
|
|
@staticmethod
|
|
def _get_ip(request):
|
|
if 'Cf-Connecting-Ip' in request.headers:
|
|
logger.debug("Using Cloudflare IP: %s", request.headers['Cf-Connecting-Ip'])
|
|
ip = request.headers['CF-Connecting-IP']
|
|
elif 'X-Forwarded-For' in request.headers:
|
|
logger.debug("Using X-Forwarded-For IP: %s", request.headers['X-Forwarded-For'])
|
|
ip = request.headers['X-Forwarded-For']
|
|
else:
|
|
logger.debug("Using Remote IP: %s", request.remote.ip)
|
|
ip = request.remote.ip
|
|
return ip
|
|
|
|
@cherrypy.expose
|
|
def register(self, name: str, font: str, timezone: str = '0', invert: str = 'false'):
|
|
request = cherrypy.request
|
|
ip = self._get_ip(request)
|
|
|
|
if self.rate_limiter.is_rate_limited(ip):
|
|
raise cherrypy.HTTPError(429, "Too Many Requests, try again later.")
|
|
|
|
# Validate inputs
|
|
if len(name) <= 0 or len(name) > 255:
|
|
raise cherrypy.HTTPError(400, "Bad Request, invalid signature name")
|
|
# Ensure name can be utf-8 encoded
|
|
try:
|
|
name.encode('utf-8')
|
|
except UnicodeEncodeError:
|
|
raise cherrypy.HTTPError(400, "Bad Request, invalid signature name")
|
|
if font not in font_data:
|
|
raise cherrypy.HTTPError(400, "Bad Request, invalid font name")
|
|
try:
|
|
timezone = int(timezone)
|
|
except ValueError:
|
|
raise cherrypy.HTTPError(400, "Bad Request, invalid timezone")
|
|
if timezone < -12*60 or timezone > 14*60:
|
|
raise cherrypy.HTTPError(400, "Bad Request, invalid timezone")
|
|
if invert.lower() not in ['true', 'false']:
|
|
raise cherrypy.HTTPError(400, "Bad Request, invalid invert value")
|
|
else:
|
|
try:
|
|
invert = invert.lower() == 'true'
|
|
except ValueError:
|
|
raise cherrypy.HTTPError(400, "Bad Request, invalid invert value")
|
|
|
|
# CamelCase the name if it's not already
|
|
name_parts = name.split(' ')
|
|
for i, part in enumerate(name_parts):
|
|
if part[0].islower():
|
|
name_parts[i] = part[0].upper() + part[1:]
|
|
name = ' '.join(name_parts)
|
|
|
|
# Get user agent and ip for later signature verification
|
|
user_agent = request.headers['User-Agent']
|
|
|
|
# Try to get user location
|
|
try:
|
|
if ip == '127.0.0.1':
|
|
raise Exception("Localhost IP detected, skipping location lookup")
|
|
location = geocoder.ip(ip)
|
|
if location.lat is None or location.lng is None:
|
|
latlon = [0.0, 0.0]
|
|
else:
|
|
latlon = [location.lat, location.lng]
|
|
if location.org is None:
|
|
provider = 'None Found'
|
|
else:
|
|
provider = location.org
|
|
if location.address == "":
|
|
address = 'None Found'
|
|
else:
|
|
address = location.address
|
|
except Exception as e:
|
|
latlon = [0.0, 0.0]
|
|
address = 'None Found'
|
|
provider = 'None Found'
|
|
|
|
# Get current unix time for signature timestamp
|
|
unix_time = int(time.time())
|
|
|
|
# Generate signature hash
|
|
hash_id = hash_text(f"{name}{unix_time}")
|
|
|
|
# Sanity check hash length, in theory there is a small chance the hash will be short since we prune '=' padding
|
|
if len(hash_id) < 8:
|
|
raise cherrypy.HTTPError(500, "Internal Server Error, hash generation failed")
|
|
|
|
identity = json.dumps({ # This shouldn't be able to fail since geocoder's api is stable and returns stringifyable data
|
|
'ip': ip,
|
|
'useragent': user_agent,
|
|
'latlon': latlon,
|
|
'address': address,
|
|
'provider': provider
|
|
})
|
|
# Insert signature into database
|
|
with db_lock:
|
|
try:
|
|
database_connection.execute("INSERT INTO signatures (hash, name, time, timezone, font, invert, identity) VALUES (?, ?, ?, ?, ?, ?, ?)", (hash_id, name, unix_time, timezone, font, invert, identity))
|
|
database_connection.commit()
|
|
except Exception as e:
|
|
logger.error("Error inserting signature into database: %s", e)
|
|
raise cherrypy.HTTPError(500, "Internal Server Error, failed to insert signature into database")
|
|
|
|
return hash_id
|
|
|
|
@cherrypy.expose
|
|
def verify(self, *args):
|
|
request = cherrypy.request
|
|
ip = self._get_ip(request)
|
|
|
|
if self.rate_limiter.is_rate_limited(ip):
|
|
raise cherrypy.HTTPError(429, "Too Many Requests, try again later.")
|
|
|
|
if len(args) == 1:
|
|
id = args[0]
|
|
if len(id) == 0 or len(id) > 64:
|
|
raise cherrypy.HTTPError(400, "Bad Request, invalid signature id")
|
|
try:
|
|
id.encode('utf-8')
|
|
except UnicodeEncodeError:
|
|
raise cherrypy.HTTPError(400, "Bad Request, invalid signature id")
|
|
|
|
# Get signature data from database
|
|
cursor = database_connection.cursor()
|
|
cursor.execute("SELECT name, time, timezone, font, invert, identity FROM signatures WHERE hash = ?", (id,))
|
|
result = cursor.fetchone()
|
|
if result is None:
|
|
raise cherrypy.HTTPError(404, "Not Found")
|
|
name, time, timezone, font, invert, identity = result
|
|
|
|
return json.dumps({
|
|
"name": name,
|
|
"time": time,
|
|
"timezone": timezone,
|
|
"font": font,
|
|
"invert": invert,
|
|
"identity": json.loads(identity)
|
|
})
|
|
|
|
else: # Redirect to verification page
|
|
raise cherrypy.HTTPRedirect(f"{host_prefix}/verify/{id}")
|
|
|
|
@cherrypy.expose
|
|
def v(self, *args):
|
|
request = cherrypy.request
|
|
ip = self._get_ip(request)
|
|
|
|
if self.rate_limiter.is_rate_limited(ip):
|
|
raise cherrypy.HTTPError(429, "Too Many Requests, try again later.")
|
|
|
|
if len(args) == 1:
|
|
id = args[0]
|
|
if len(id) == 0 or len(id) > 64:
|
|
raise cherrypy.HTTPError(400, "Bad Request, invalid signature id")
|
|
try:
|
|
id.encode('utf-8')
|
|
except UnicodeEncodeError:
|
|
raise cherrypy.HTTPError(400, "Bad Request, invalid signature id")
|
|
|
|
if 'environment' in global_server_config and global_server_config['environment'] == 'production':
|
|
return self.verification_html
|
|
else: # If we are debugging, read the newest verification.html file from disk
|
|
with open(os.path.join(static_dir, "verification.html"), "r") as f:
|
|
verification_html = f.read()
|
|
return verification_html
|
|
else:
|
|
raise cherrypy.HTTPRedirect(f"{host_prefix}")
|
|
|
|
@cherrypy.expose
|
|
def fonts(self):
|
|
return json.dumps(font_data)
|
|
|
|
|
|
@cherrypy.expose
|
|
def images(self, *args):
|
|
request = cherrypy.request
|
|
ip = self._get_ip(request)
|
|
|
|
if self.rate_limiter.is_rate_limited(ip):
|
|
raise cherrypy.HTTPError(429, "Too Many Requests, try again later.")
|
|
|
|
if len(args) == 1:
|
|
hash_id = args[0]
|
|
if hash_id.endswith('.png'):
|
|
hash_id = hash_id[:-4]
|
|
try:
|
|
hash_id.encode('utf-8')
|
|
except UnicodeEncodeError:
|
|
raise cherrypy.HTTPError(400, "Bad Request, invalid signature id")
|
|
qr_url = f"{host_prefix}/v/{hash_id}"
|
|
cursor = database_connection.cursor()
|
|
cursor.execute("SELECT name, time, timezone, font, invert FROM signatures WHERE hash = ?", (hash_id,))
|
|
result = cursor.fetchone()
|
|
if result is None:
|
|
raise cherrypy.HTTPError(404, "Not Found")
|
|
name, time, timezone, font, invert = result
|
|
|
|
timezone = datetime.timezone(datetime.timedelta(minutes=timezone))
|
|
sig_font_path = os.path.join(static_dir, "fonts", font_data[font])
|
|
note_font_path = os.path.join(static_dir, "fonts", "Steelfish.ttf")
|
|
|
|
# Offload image generation to process pool
|
|
try:
|
|
sig_image = process_pool.apply(
|
|
generate_signature_image,
|
|
(name, time, timezone, qr_url, sig_font_path, note_font_path, invert)
|
|
)
|
|
buffer = io.BytesIO()
|
|
sig_image.save(buffer, format="PNG")
|
|
buffer.seek(0)
|
|
return file_generator(buffer)
|
|
except Exception as e:
|
|
logger.error("Error generating signature image: %s", e)
|
|
raise cherrypy.HTTPError(500, "Internal Server Error")
|
|
else:
|
|
raise cherrypy.HTTPError(404, "Not Found")
|
|
|
|
@cherrypy.expose
|
|
def howto(self):
|
|
if 'environment' in global_server_config and global_server_config['environment'] == 'production':
|
|
return self.howto_html
|
|
else: # If we are debugging, read the newest file from disk
|
|
with open(os.path.join(static_dir, "howto.html"), "r") as f:
|
|
howto_html = f.read()
|
|
return howto_html
|
|
|
|
@cherrypy.expose
|
|
def privacy(self):
|
|
if 'environment' in global_server_config and global_server_config['environment'] == 'production':
|
|
return self.privacy_html
|
|
else: # If we are debugging, read the newest file from disk
|
|
with open(os.path.join(static_dir, "privacy.html"), "r") as f:
|
|
privacy_html = f.read()
|
|
return privacy_html
|
|
|
|
logger.info("Starting server on %s:%s", host_ip, host_port)
|
|
cherrypy.quickstart(RootServer(), config=root_server_config)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
socket_host = '0.0.0.0'
|
|
socket_port = 8080
|
|
static_dir = "./web_content/"
|
|
db_path = "./data/signatures.db"
|
|
host_prefix = "osg.jkdev.org"
|
|
production = False
|
|
|
|
if os.environ.get("DOCKER"):
|
|
host_prefix = os.environ.get("HOSTPREFIX")
|
|
production = True
|
|
|
|
os.makedirs(os.path.dirname(db_path), exist_ok=True)
|
|
database_connection, db_lock = open_db(db_path)
|
|
|
|
font_data = {}
|
|
for file in os.listdir(os.path.join(static_dir, "fonts")):
|
|
if file.endswith(".ttf"):
|
|
font = ttLib.TTFont(os.path.join(static_dir, "fonts", file))
|
|
font_name = font['name'].getDebugName(4)
|
|
if font_name.endswith(' Regular'):
|
|
font_name = font_name[:-8]
|
|
font_data[font_name] = file
|
|
# Remove Steelfish from font data, we only use it for the note font
|
|
font_data.pop('Steelfish')
|
|
logger = logging.getLogger(__name__)
|
|
logger.debug("Font data: %s", font_data)
|
|
|
|
handler = RotatingFileHandler('server.log', maxBytes=10*1024*1024, backupCount=5)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers = [handler]
|
|
)
|
|
|
|
run_server(socket_host, socket_port, host_prefix, static_dir, database_connection, db_lock, font_data, production) |