Projects / snek / src / snek / sssh.py

git clone https://molodetz.nl/retoor/snek.git

Raw source file available here .

# retoor <retoor@molodetz.nl>

import logging
from pathlib import Path

import asyncssh

global _app


def set_app(app):
global _app
_app = app


def get_app():
return _app


logger = logging.getLogger(__name__)

roots = {}


class SFTPServer(asyncssh.SFTPServer):

def __init__(self, chan: asyncssh.SSHServerChannel):
self.root = get_app().services.user.get_home_folder_by_username(
chan.get_extra_info("username")
)
self.root.mkdir(exist_ok=True)
self.root = str(self.root)
super().__init__(chan, chroot=self.root)

def map_path(self, path):
mapped_path = Path(self.root).joinpath(path.lstrip(b"/").decode())
logger.debug(f"Mapping client path {path} to {mapped_path}")
return str(mapped_path).encode()


class SSHServer(asyncssh.SSHServer):
def password_auth_supported(self):
return True

def validate_password(self, username, password):
logger.debug(f"Validating credentials for user {username}")
result = get_app().services.user.authenticate_sync(username, password)
logger.info(f"Validating credentials for user {username}: {result}")
return result


async def start_ssh_server(app, host, port):
set_app(app)
logger.info("Starting SFTP server setup")

host_key_path = Path("drive") / ".ssh" / "sftp_server_key"
host_key_path.parent.mkdir(exist_ok=True, parents=True)
try:
if not host_key_path.exists():
logger.info(f"Generating new host key at {host_key_path}")
key = asyncssh.generate_private_key("ecdsa-sha2-nistp256")
key.write_private_key(host_key_path)
else:
logger.info(f"Loading existing host key from {host_key_path}")
key = asyncssh.read_private_key(host_key_path)
except Exception as e:
logger.error(f"Failed to generate or load host key: {e}")
raise

logger.info(f"Starting SFTP server on 127.0.0.1:{port}")
try:
x = await asyncssh.listen(
host=host,
port=port,
# process_factory=handle_client,
server_host_keys=[key],
server_factory=SSHServer,
sftp_factory=SFTPServer,
)
return x
except Exception:
logger.warning(f"Failed to start SFTP server. Already running.")
pass