#!/usr/bin/env python3 import sys import os try: f = open("/var/log/man-srht-shell", "a") os.close(sys.stderr.fileno()) os.dup2(f.fileno(), sys.stderr.fileno()) except Exception as ex: sys.stderr.write("Unable to open log for writing\n") sys.stderr.write(str(ex) + "\n") import requests import shlex from datetime import datetime from mansrht.access import has_access, UserAccess from mansrht.types import User, Wiki from srht.config import cfg from srht.database import DbSession from srht.oauth import UserType from srht.validation import Validation db = DbSession(cfg("man.sr.ht", "connection-string")) db.init() def log(s, *args): sys.stderr.write("{} {}\n".format(datetime.now().isoformat(), s.format(*args) if isinstance(s, str) else str(s))) sys.stderr.flush() root = cfg("man.sr.ht", "origin") repos = cfg("man.sr.ht", "repo-path") _cmd = os.environ.get("SSH_ORIGINAL_COMMAND") if not _cmd: _cmd = "" if len(sys.argv) < 2: log("Error: expected 2 arguments from SSH") sys.exit(1) user_id = sys.argv[1] ssh_key = sys.argv[2] user = User.query.filter(User.id == user_id).first() if not user: log("Unknown user ID {}", user_id) sys.exit(1) log("User: {}", user.username) cmd = shlex.split(_cmd) valid_commands = ["git-receive-pack", "git-upload-pack", "git-upload-archive"] if len(cmd) < 1 or not cmd[0] in valid_commands: log("Not permitting unacceptable command") print("Hi {}! You've successfully authenticated, ".format(user.username) + "but I do not provide an interactive shell. Bye!") sys.exit(128) os.chdir(repos) path = os.path.abspath(cmd[-1]) if not path.startswith(repos): log("Access denied") sys.exit(128) cmd[-1] = path if path == os.path.join(repos, "root"): if cmd[0] == "git-receive-pack" and user.user_type != UserType.admin: sys.exit(128) else: wiki = Wiki.query.filter(Wiki.path == path).first() if not wiki: sys.exit(128) if cmd[0] == "git-receive-pack": if not has_access(wiki, UserAccess.write, user): sys.exit(128) else: if not has_access(wiki, UserAccess.read, user): sys.exit(128) log("Executing {}", " ".join(cmd)) sys.stderr.close() os.execvp(cmd[0], cmd)