Back to project.

Raw source file available here .

import logging
import pathlib

logging.basicConfig(level=logging.DEBUG)
import base64
import datetime
import mimetypes
import os
import shutil
import uuid

import aiofiles
import aiohttp
import aiohttp.web
from app.cache import time_cache_async
from lxml import etree


@aiohttp.web.middleware
async def debug_middleware(request, handler):
print(request.method, request.path, request.headers)
result = await handler(request)
print(result.status)
try:
print(await result.text())
except:
pass
return result


class WebdavApplication(aiohttp.web.Application):
def __init__(self, parent, *args, **kwargs):
middlewares = [debug_middleware]

super().__init__(middlewares=middlewares, *args, **kwargs)
self.locks = {}

self.relative_url = "/webdav"

self.router.add_route("OPTIONS", "/{filename:.*}", self.handle_options)
self.router.add_route("GET", "/{filename:.*}", self.handle_get)
self.router.add_route("PUT", "/{filename:.*}", self.handle_put)
self.router.add_route("DELETE", "/{filename:.*}", self.handle_delete)
self.router.add_route("MKCOL", "/{filename:.*}", self.handle_mkcol)
self.router.add_route("MOVE", "/{filename:.*}", self.handle_move)
self.router.add_route("COPY", "/{filename:.*}", self.handle_copy)
self.router.add_route("PROPFIND", "/{filename:.*}", self.handle_propfind)
self.router.add_route("PROPPATCH", "/{filename:.*}", self.handle_proppatch)
self.router.add_route("LOCK", "/{filename:.*}", self.handle_lock)
self.router.add_route("UNLOCK", "/{filename:.*}", self.handle_unlock)
self.parent = parent

@property
def db(self):
return self.parent.db

@property
def services(self):
return self.parent.services

async def authenticate(self, request):
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Basic "):
return False
encoded_creds = auth_header.split("Basic ")[1]
decoded_creds = base64.b64decode(encoded_creds).decode()
username, password = decoded_creds.split(":", 1)
request["user"] = await self.services.user.authenticate(
username=username, password=password
)
try:
request["home"] = await self.services.user.get_home_folder(
request["user"]["uid"]
)
except Exception:
pass
return request["user"]

async def handle_get(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)

requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path

if not abs_path.exists():
return aiohttp.web.Response(status=404, text="File not found")

if abs_path.is_dir():
return aiohttp.web.Response(status=403, text="Cannot download a directory")

content_type, _ = mimetypes.guess_type(str(abs_path))
content_type = content_type or "application/octet-stream"

return aiohttp.web.FileResponse(
path=str(abs_path), headers={"Content-Type": content_type}, chunk_size=8192
)

async def handle_put(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
file_path = request["home"] / request.match_info["filename"]
file_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(file_path, "wb") as f:
while chunk := await request.content.read(1024):
await f.write(chunk)
return aiohttp.web.Response(status=201, text="File uploaded")

async def handle_delete(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
file_path = request["home"] / request.match_info["filename"]
if file_path.is_file():
file_path.unlink()
return aiohttp.web.Response(status=204)
elif file_path.is_dir():
shutil.rmtree(file_path)
return aiohttp.web.Response(status=204)
return aiohttp.web.Response(status=404, text="Not found")

async def handle_mkcol(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
dir_path = request["home"] / request.match_info["filename"]
if dir_path.exists():
return aiohttp.web.Response(status=405, text="Directory already exists")
dir_path.mkdir(parents=True, exist_ok=True)
return aiohttp.web.Response(status=201, text="Directory created")

async def handle_move(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
src_path = request["home"] / request.match_info["filename"]
dest_path = request["home"] / request.headers.get("Destination", "").replace(
"http://localhost:8080/", ""
)
if not src_path.exists():
return aiohttp.web.Response(status=404, text="Source not found")
shutil.move(str(src_path), str(dest_path))
return aiohttp.web.Response(status=201, text="Moved successfully")

async def handle_copy(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
src_path = request["home"] / request.match_info["filename"]
dest_path = request["home"] / request.headers.get("Destination", "").replace(
"http://localhost:8080/", ""
)
if not src_path.exists():
return aiohttp.web.Response(status=404, text="Source not found")
if src_path.is_file():
shutil.copy2(str(src_path), str(dest_path))
else:
shutil.copytree(str(src_path), str(dest_path))
return aiohttp.web.Response(status=201, text="Copied successfully")

async def handle_options(self, request):
headers = {
"DAV": "1, 2",
"Allow": "OPTIONS, GET, PUT, DELETE, MKCOL, MOVE, COPY, PROPFIND, PROPPATCH",
}
return aiohttp.web.Response(status=200, headers=headers)

def get_current_utc_time(self, filepath):
if filepath.exists():
modified_time = datetime.datetime.utcfromtimestamp(filepath.stat().st_mtime)
else:
modified_time = datetime.datetime.utcnow()
return modified_time.strftime("%Y-%m-%dT%H:%M:%SZ"), modified_time.strftime(
"%a, %d %b %Y %H:%M:%S GMT"
)

@time_cache_async(10)
async def get_file_size(self, path):
loop = self.parent.loop
stat = await loop.run_in_executor(None, os.stat, path)
return stat.st_size

@time_cache_async(10)
async def get_directory_size(self, directory):
total_size = 0
for dirpath, _, filenames in os.walk(directory):
for f in filenames:
fp = pathlib.Path(dirpath) / f
if fp.exists():
total_size += await self.get_file_size(str(fp))
return total_size

@time_cache_async(30)
async def get_disk_free_space(self, path="/"):
loop = self.parent.loop
statvfs = await loop.run_in_executor(None, os.statvfs, path)
return statvfs.f_bavail * statvfs.f_frsize

async def create_node(self, request, response_xml, full_path, depth):
abs_path = pathlib.Path(full_path)
relative_path = str(full_path.relative_to(request["home"]))

href_path = f"{self.relative_url}/{relative_path}".strip(".")
href_path = href_path.replace("./", "/")
href_path = href_path.replace("//", "/")

response = etree.SubElement(response_xml, "{DAV:}response")
href = etree.SubElement(response, "{DAV:}href")
href.text = href_path
propstat = etree.SubElement(response, "{DAV:}propstat")
prop = etree.SubElement(propstat, "{DAV:}prop")
res_type = etree.SubElement(prop, "{DAV:}resourcetype")
if full_path.is_dir():
etree.SubElement(res_type, "{DAV:}collection")
creation_date, last_modified = self.get_current_utc_time(full_path)
etree.SubElement(prop, "{DAV:}creationdate").text = creation_date
etree.SubElement(prop, "{DAV:}quota-used-bytes").text = str(
await self.get_file_size(full_path)
if full_path.is_file()
else await self.get_directory_size(full_path)
)
etree.SubElement(prop, "{DAV:}quota-available-bytes").text = str(
await self.get_disk_free_space(request["home"])
)
etree.SubElement(prop, "{DAV:}getlastmodified").text = last_modified
etree.SubElement(prop, "{DAV:}displayname").text = full_path.name
etree.SubElement(prop, "{DAV:}lockdiscovery")
mimetype, _ = mimetypes.guess_type(full_path.name)
if full_path.is_file():
etree.SubElement(prop, "{DAV:}contenttype").text = mimetype
etree.SubElement(prop, "{DAV:}getcontentlength").text = str(
await self.get_file_size(full_path)
if full_path.is_file()
else await self.get_directory_size(full_path)
)
supported_lock = etree.SubElement(prop, "{DAV:}supportedlock")
lock_entry_1 = etree.SubElement(supported_lock, "{DAV:}lockentry")
lock_scope_1 = etree.SubElement(lock_entry_1, "{DAV:}lockscope")
etree.SubElement(lock_scope_1, "{DAV:}exclusive")
lock_type_1 = etree.SubElement(lock_entry_1, "{DAV:}locktype")
etree.SubElement(lock_type_1, "{DAV:}write")
lock_entry_2 = etree.SubElement(supported_lock, "{DAV:}lockentry")
lock_scope_2 = etree.SubElement(lock_entry_2, "{DAV:}lockscope")
etree.SubElement(lock_scope_2, "{DAV:}shared")
lock_type_2 = etree.SubElement(lock_entry_2, "{DAV:}locktype")
etree.SubElement(lock_type_2, "{DAV:}write")
etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK"

if abs_path.is_dir() and depth > 0:
for item in abs_path.iterdir():
await self.create_node(request, response_xml, item, depth - 1)

async def handle_propfind(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)

depth = 0
try:
depth = int(request.headers.get("Depth", "0"))
except ValueError:
pass

requested_path = request.match_info.get("filename", "")

abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="Directory not found")
nsmap = {"D": "DAV:"}
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)

await self.create_node(request, response_xml, abs_path, depth)

xml_output = etree.tostring(
response_xml, encoding="utf-8", xml_declaration=True
).decode()
return aiohttp.web.Response(
status=207, text=xml_output, content_type="application/xml"
)

async def handle_proppatch(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
return aiohttp.web.Response(status=207, text="PROPPATCH OK (Not Implemented)")

async def handle_lock(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
resource = request.match_info.get("filename", "/")
lock_id = str(uuid.uuid4())
self.locks[resource] = lock_id
xml_response = await self.generate_lock_response(lock_id)
headers = {
"Lock-Token": f"opaquelocktoken:{lock_id}",
"Content-Type": "application/xml",
}
return aiohttp.web.Response(text=xml_response, headers=headers, status=200)

async def handle_unlock(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
resource = request.match_info.get("filename", "/")
lock_token = request.headers.get("Lock-Token", "").replace(
"opaquelocktoken:", ""
)[1:-1]
if self.locks.get(resource) == lock_token:
del self.locks[resource]
return aiohttp.web.Response(status=204)
return aiohttp.web.Response(status=400, text="Invalid Lock Token")

async def generate_lock_response(self, lock_id):
nsmap = {"D": "DAV:"}
root = etree.Element("{DAV:}prop", nsmap=nsmap)
lock_discovery = etree.SubElement(root, "{DAV:}lockdiscovery")
active_lock = etree.SubElement(lock_discovery, "{DAV:}activelock")
lock_type = etree.SubElement(active_lock, "{DAV:}locktype")
etree.SubElement(lock_type, "{DAV:}write")
lock_scope = etree.SubElement(active_lock, "{DAV:}lockscope")
etree.SubElement(lock_scope, "{DAV:}exclusive")
etree.SubElement(active_lock, "{DAV:}depth").text = "Infinity"
owner = etree.SubElement(active_lock, "{DAV:}owner")
etree.SubElement(owner, "{DAV:}href").text = lock_id
etree.SubElement(active_lock, "{DAV:}timeout").text = "Infinite"
lock_token = etree.SubElement(active_lock, "{DAV:}locktoken")
etree.SubElement(lock_token, "{DAV:}href").text = f"opaquelocktoken:{lock_id}"
return etree.tostring(root, pretty_print=True, encoding="utf-8").decode()

def get_last_modified(self, path):
if not path.exists():
return None
timestamp = path.stat().st_mtime
dt = datetime.datetime.utcfromtimestamp(timestamp)
return dt.strftime("%a, %d %b %Y %H:%M:%S GMT")

async def handle_head(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)

requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path

if not abs_path.exists():
return aiohttp.web.Response(status=404, text="File not found")

if abs_path.is_dir():
return aiohttp.web.Response(
status=403, text="Cannot get metadata for a directory"
)

content_type, _ = mimetypes.guess_type(str(abs_path))
content_type = content_type or "application/octet-stream"
file_size = abs_path.stat().st_size

headers = {
"Content-Type": content_type,
"Content-Length": str(file_size),
"Last-Modified": self.get_last_modified(abs_path),
}

return aiohttp.web.Response(status=200, headers=headers)