diff --git a/app.py b/app.py index 3533643..7225375 100644 --- a/app.py +++ b/app.py @@ -1,23 +1,23 @@ from flask import ( - Flask, - render_template, - request, - redirect, - url_for, - send_from_directory, - flash, - abort, - jsonify, - session + Flask, + render_template, + request, + redirect, + url_for, + send_from_directory, + flash, + abort, + jsonify, + session, ) from flask_sqlalchemy import SQLAlchemy from flask_login import ( - LoginManager, - UserMixin, - login_user, - logout_user, - login_required, - current_user, + LoginManager, + UserMixin, + login_user, + logout_user, + login_required, + current_user, ) from flask_apscheduler import APScheduler from werkzeug.security import generate_password_hash, check_password_hash @@ -32,84 +32,112 @@ from secrets import token_hex app = Flask(__name__) try: - with open("/app/instance/secret.key", "rb") as f: - app.config["SECRET_KEY"] = bytes.hex(f.readline()) + with open("/app/instance/secret.key", "rb") as f: + app.config["SECRET_KEY"] = bytes.hex(f.readline()) except FileNotFoundError as e: - if not os.path.exists("/app/instance"): - os.mkdir("/app/instance") + if not os.path.exists("/app/instance"): + os.mkdir("/app/instance") - with open("/app/instance/secret.key", "wb") as f: - newKey = token_hex(64) - f.write(bytearray.fromhex(newKey)) - app.config["SECRET_KEY"] = newKey + with open("/app/instance/secret.key", "wb") as f: + newKey = token_hex(64) + f.write(bytearray.fromhex(newKey)) + app.config["SECRET_KEY"] = newKey PORT = 5121 app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:////app/instance/db.sqlite" app.config["UPLOAD_FOLDER"] = "sites" app.config["SERVER_NAME"] = "tinysite.cloud" -# app.config["SESSION_COOKIE_DOMAIN"] = ".tinysite.cloud" +app.config["SESSION_COOKIE_DOMAIN"] = ".tinysite.cloud" db = SQLAlchemy(app) RESERVED_SUBDOMAINS = { - "", - "www", - "api", - "admin", - "support", - "docs", - "blog", - "cdn", - "test", - "dev", - "staging", - "secure", - "mail", - "status", - "gateway", + "", + "www", + "api", + "admin", + "support", + "docs", + "blog", + "cdn", + "test", + "dev", + "staging", + "secure", + "mail", + "status", + "gateway", } # TODO: add specific page redirects here as they're added def isDefaultRoute(hostname: str): - if hostname.count(".") < 2: - return True + # exactly the main domain + if hostname == app.config["SERVER_NAME"]: + return True - subdomain = hostname.split(".")[0] - return not subdomain or subdomain in RESERVED_SUBDOMAINS + # a reserved subdomain + parts = hostname.split(".") + server_parts = app.config["SERVER_NAME"].split(".") + + # the host ends with the server domain + if parts[-len(server_parts) :] != server_parts: + return False + + # subdomain portion + subdomain = ".".join(parts[: -len(server_parts)]) + + # any part of the subdomain is reserved + return ( + any(part in RESERVED_SUBDOMAINS for part in subdomain.split(".")) + or subdomain in RESERVED_SUBDOMAINS + ) # Models class User(UserMixin, db.Model): - id = db.Column(db.Integer, primary_key=True) - email = db.Column(db.String(100), unique=True) - password_hash = db.Column(db.String(128)) # Renamed for clarity + id = db.Column(db.Integer, primary_key=True) + email = db.Column(db.String(100), unique=True) + password_hash = db.Column(db.String(128)) # Renamed for clarity - def set_password(self, password): - self.password_hash = generate_password_hash(password) + def set_password(self, password): + self.password_hash = generate_password_hash(password) - def check_password(self, password): - return check_password_hash(self.password_hash, password) + def check_password(self, password): + return check_password_hash(self.password_hash, password) class Site(db.Model): - id = db.Column(db.Integer, primary_key=True) - user_id = db.Column(db.Integer) - name = db.Column(db.String(100)) - subdomain = db.Column(db.String(100), unique=True) - last_accessed = db.Column(db.DateTime) - created_at = db.Column(db.DateTime, default=datetime.utcnow) + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer) + name = db.Column(db.String(100)) + subdomain = db.Column(db.String(100), unique=True) + last_accessed = db.Column(db.DateTime) + created_at = db.Column(db.DateTime, default=datetime.utcnow) + + +@app.context_processor +def inject_subdomain(): + host = request.host + server_parts = app.config["SERVER_NAME"].split(".") + host_parts = host.split(".") + + subdomain = None + if host_parts[-len(server_parts) :] == server_parts: + subdomain_parts = host_parts[: -len(server_parts)] + if subdomain_parts: + subdomain = ".".join(subdomain_parts) + + return { + "SUBDOMAIN": subdomain, + "FULL_DOMAIN": host, + "SERVERNAME": app.config["SERVER_NAME"], + } @app.errorhandler(404) def page_not_found(_): - return render_template("404.html", domain=request.host), 404 - -@app.context_processor -def inject_global_variable(): - return { - "SERVERNAME": app.config["SERVER_NAME"], - } + return render_template("404.html", domain=request.host), 404 # Auth setup @@ -120,7 +148,7 @@ login_manager.login_view = "login" @login_manager.user_loader def load_user(user_id): - return User.query.get(int(user_id)) + return User.query.get(int(user_id)) # Scheduler for auto-deletion @@ -129,259 +157,254 @@ scheduler.init_app(app) def delete_inactive_sites(): - with app.app_context(): - cutoff = datetime.utcnow() - timedelta(days=30) - sites = Site.query.filter(Site.last_accessed < cutoff).all() - for site in sites: - site_dir = os.path.join( - app.config["UPLOAD_FOLDER"], str(site.user_id), str(site.id) - ) - if os.path.exists(site_dir): - shutil.rmtree(site_dir) - db.session.delete(site) - db.session.commit() + with app.app_context(): + cutoff = datetime.utcnow() - timedelta(days=30) + sites = Site.query.filter(Site.last_accessed < cutoff).all() + for site in sites: + site_dir = os.path.join( + app.config["UPLOAD_FOLDER"], str(site.user_id), str(site.id) + ) + if os.path.exists(site_dir): + shutil.rmtree(site_dir) + db.session.delete(site) + db.session.commit() if not scheduler.running: - scheduler.start() - scheduler.add_job( - id="delete_job", func=delete_inactive_sites, trigger="interval", days=1 - ) + scheduler.start() + scheduler.add_job( + id="delete_job", func=delete_inactive_sites, trigger="interval", days=1 + ) # Routes @app.route("/login", methods=["GET", "POST"]) def login(): - if request.method == "POST": - email = request.form.get("email") - password = request.form.get("password") - user = User.query.filter_by(email=email).first() + if request.method == "POST": + email = request.form.get("email") + password = request.form.get("password") + user = User.query.filter_by(email=email).first() - if user and user.check_password(password): - login_user(user) - return redirect(url_for("dashboard")) - flash("Invalid email or password") - return render_template("login.html") + if user and user.check_password(password): + login_user(user) + return redirect(url_for("dashboard")) + flash("Invalid email or password") + return render_template("login.html") @app.route("/logout") def logout(): - logout_user() - return redirect(url_for("home")) + logout_user() + return redirect(url_for("home")) @app.route("/register", methods=["GET", "POST"]) def register(): - if request.method == "POST": - email = request.form.get("email") - password = request.form.get("password") + if request.method == "POST": + email = request.form.get("email") + password = request.form.get("password") - if User.query.filter_by(email=email).first(): - flash("Email already exists") - return redirect(url_for("register")) + if User.query.filter_by(email=email).first(): + flash("Email already exists") + return redirect(url_for("register")) - new_user = User(email=email) - new_user.set_password(password) - db.session.add(new_user) - db.session.commit() + new_user = User(email=email) + new_user.set_password(password) + db.session.add(new_user) + db.session.commit() - login_user(new_user) - return redirect(url_for("dashboard")) - return render_template("register.html") + login_user(new_user) + return redirect(url_for("dashboard")) + return render_template("register.html") @app.route("/dashboard") @login_required def dashboard(): - sites = Site.query.filter_by(user_id=current_user.id).all() - return render_template( - "dashboard.html", - sites=sites, - subdomain=request.host.split(".")[0], - hostname=app.config["SERVER_NAME"], - ) + sites = Site.query.filter_by(user_id=current_user.id).all() + return render_template( + "dashboard.html", + sites=sites, + subdomain=request.host.split(".")[0], + hostname=app.config["SERVER_NAME"], + ) @app.route("/upload", methods=["POST"]) @login_required def upload_site(): - site_name = request.form.get("name") - subdomain = request.form.get("subdomain").strip().lower() # normalize to lowercase + site_name = request.form.get("name") + subdomain = request.form.get("subdomain").strip().lower() # normalize to lowercase - # Subdomain validation - if not re.match(r"^[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?$", subdomain): - flash( - "Invalid subdomain. Use lowercase letters, numbers, and hyphens.", "error" - ) - return redirect(url_for("dashboard")) + # Subdomain validation + if not re.match(r"^[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?$", subdomain): + flash( + "Invalid subdomain. Use lowercase letters, numbers, and hyphens.", "error" + ) + return redirect(url_for("dashboard")) - if not site_name or not subdomain: - flash("Site name and URL subdomain are required", "error") - return redirect(url_for("dashboard")) + if not site_name or not subdomain: + flash("Site name and URL subdomain are required", "error") + return redirect(url_for("dashboard")) - # Check subdomain availability - if Site.query.filter_by(subdomain=subdomain).first(): - flash("This URL is already taken", "error") - return redirect(url_for("dashboard")) + # Check subdomain availability + if Site.query.filter_by(subdomain=subdomain).first(): + flash("This URL is already taken", "error") + return redirect(url_for("dashboard")) - # Check if index.html is included - files = request.files.getlist("files") - if not any(file.filename == "index.html" for file in files): - flash("You must include an index.html file", "error") - return redirect(url_for("dashboard")) + # Check if index.html is included + files = request.files.getlist("files") + if not any(file.filename == "index.html" for file in files): + flash("You must include an index.html file", "error") + return redirect(url_for("dashboard")) - # Create site directory - site = Site(user_id=current_user.id, name=site_name, subdomain=subdomain) - db.session.add(site) - db.session.commit() + # Create site directory + site = Site(user_id=current_user.id, name=site_name, subdomain=subdomain) + db.session.add(site) + db.session.commit() - site_dir = os.path.join( - app.config["UPLOAD_FOLDER"], str(current_user.id), str(site.id) - ) - os.makedirs(site_dir, exist_ok=True) + site_dir = os.path.join( + app.config["UPLOAD_FOLDER"], str(current_user.id), str(site.id) + ) + os.makedirs(site_dir, exist_ok=True) - # Save uploaded files - for file in files: - if file.filename == "": - continue - file.save(os.path.join(site_dir, file.filename)) + # Save uploaded files + for file in files: + if file.filename == "": + continue + file.save(os.path.join(site_dir, file.filename)) - flash("Site created successfully!", "success") - return redirect(url_for("dashboard")) + flash("Site created successfully!", "success") + return redirect(url_for("dashboard")) @app.route("/edit/", methods=["GET", "POST"]) @login_required def edit_site(site_id): - site = Site.query.get_or_404(site_id) - if site.user_id != current_user.id: - return "Unauthorized", 403 + site = Site.query.get_or_404(site_id) + if site.user_id != current_user.id: + return "Unauthorized", 403 - site_dir = os.path.join( - app.config["UPLOAD_FOLDER"], str(current_user.id), str(site.id) - ) + site_dir = os.path.join( + app.config["UPLOAD_FOLDER"], str(current_user.id), str(site.id) + ) - if request.method == "POST": - site.name = request.form.get("name", site.name) - db.session.commit() + if request.method == "POST": + site.name = request.form.get("name", site.name) + db.session.commit() - for filename, content in request.form.items(): - if filename.endswith((".html", ".css", ".js")): - filepath = os.path.join(site_dir, filename) - with open(filepath, "w") as f: - f.write(content) + for filename, content in request.form.items(): + if filename.endswith((".html", ".css", ".js")): + filepath = os.path.join(site_dir, filename) + with open(filepath, "w") as f: + f.write(content) - if request.headers.get("X-Requested-With") == "XMLHttpRequest": - return jsonify({"success": True, "message": "Site updated successfully!"}) + if request.headers.get("X-Requested-With") == "XMLHttpRequest": + return jsonify({"success": True, "message": "Site updated successfully!"}) - flash("Site updated successfully!", "success") - return redirect(url_for("edit_site", site_id=site.id)) + flash("Site updated successfully!", "success") + return redirect(url_for("edit_site", site_id=site.id)) - files = {} - for file in os.listdir(site_dir): - with open(os.path.join(site_dir, file), "r") as f: - files[file] = f.read() + files = {} + for file in os.listdir(site_dir): + with open(os.path.join(site_dir, file), "r") as f: + files[file] = f.read() - return render_template("edit.html", site=site, files=files) + return render_template("edit.html", site=site, files=files) @app.route("/delete/", methods=["POST"]) @login_required def delete_site(site_id): - site = Site.query.get_or_404(site_id) - if site.user_id != current_user.id: - return "Unauthorized", 403 + site = Site.query.get_or_404(site_id) + if site.user_id != current_user.id: + return "Unauthorized", 403 - site_dir = os.path.join( - app.config["UPLOAD_FOLDER"], str(current_user.id), str(site.id) - ) - if os.path.exists(site_dir): - shutil.rmtree(site_dir) + site_dir = os.path.join( + app.config["UPLOAD_FOLDER"], str(current_user.id), str(site.id) + ) + if os.path.exists(site_dir): + shutil.rmtree(site_dir) - db.session.delete(site) - db.session.commit() - return redirect(url_for("dashboard")) + db.session.delete(site) + db.session.commit() + return redirect(url_for("dashboard")) @app.route("/delete_file//", methods=["POST"]) @login_required def delete_file(site_id, filename): - # Get the site and verify ownership - site = Site.query.get_or_404(site_id) - if site.user_id != current_user.id: - return "Unauthorized", 403 + # Get the site and verify ownership + site = Site.query.get_or_404(site_id) + if site.user_id != current_user.id: + return "Unauthorized", 403 - # Build the file path - site_dir = os.path.join( - app.config["UPLOAD_FOLDER"], str(current_user.id), str(site.id) - ) - file_path = os.path.join(site_dir, filename) + # Build the file path + site_dir = os.path.join( + app.config["UPLOAD_FOLDER"], str(current_user.id), str(site.id) + ) + file_path = os.path.join(site_dir, filename) - # Delete the file if it exists - if os.path.exists(file_path): - os.remove(file_path) - flash(f"File '{filename}' deleted successfully!", "success") - else: - flash(f"File '{filename}' not found!", "error") + # Delete the file if it exists + if os.path.exists(file_path): + os.remove(file_path) + flash(f"File '{filename}' deleted successfully!", "success") + else: + flash(f"File '{filename}' not found!", "error") - return redirect(url_for("dashboard")) + return redirect(url_for("dashboard")) -# use subdomains @app.route("/", subdomain="", defaults={"filename": "index.html"}) -@app.route("/") -def serve_site_content(filename): - subdomain = request.host.split(".")[0] +@app.route("/", subdomain="") +def serve_site_content(subdomain, filename): + if isDefaultRoute(request.host): + abort(404) # Reserve default routes for main app - site = Site.query.filter_by(subdomain=subdomain).first_or_404() - site.last_accessed = datetime.utcnow() - db.session.commit() + site = Site.query.filter_by(subdomain=subdomain).first_or_404() + site.last_accessed = datetime.utcnow() + db.session.commit() - site_dir = os.path.join( - app.config["UPLOAD_FOLDER"], str(site.user_id), str(site.id) - ) + site_dir = os.path.join( + app.config["UPLOAD_FOLDER"], str(site.user_id), str(site.id) + ) - if isDefaultRoute(request.host): - return send_from_directory("index.html") + # Security checks + if ".." in filename or filename.startswith("/") or not os.path.exists(site_dir): + abort(404) - # Security check - if ".." in filename or filename.startswith("/") or not os.path.exists(site_dir): - return render_template("404.html"), 404 - - try: - return send_from_directory(site_dir, filename) - except NotFound: - if "." not in filename: - try: - return send_from_directory(site_dir, f"{filename}.html") - except NotFound: - redirect(app.config["SERVER_NAME"]) - return send_from_directory(site_dir, "index.html") - abort(404) + try: + return send_from_directory(site_dir, filename) + except NotFound: + if "." not in filename: + try: + return send_from_directory(site_dir, f"{filename}.html") + except NotFound: + return send_from_directory(site_dir, "index.html") + abort(404) def list_files(directory): - try: - return os.listdir(directory) - except FileNotFoundError: - return [] + try: + return os.listdir(directory) + except FileNotFoundError: + return [] @app.context_processor def inject_utilities(): - return dict(list_files=list_files) + return dict(list_files=list_files) @app.route("/") def home(): - if isDefaultRoute(request.host): - return render_template("home.html") - else: - return serve_site_content("index.html") + if not isDefaultRoute(request.host): + abort(404) + return render_template("home.html") if __name__ == "__main__": - os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) - with app.app_context(): - db.create_all() - serve(app, host="0.0.0.0", port=PORT) + os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) + with app.app_context(): + db.create_all() + serve(app, host="0.0.0.0", port=PORT) diff --git a/templates/base.html b/templates/base.html index fd640ab..27318ea 100644 --- a/templates/base.html +++ b/templates/base.html @@ -13,14 +13,13 @@ - + - ION Static Site Hosting - {% block title %}{% endblock %} @@ -53,13 +52,13 @@