Projects / snek / src / snek / webdav.py

git clone https://molodetz.nl/retoor/snek.git

Raw source file available here .

# retoor <retoor@molodetz.nl>

import logging
import pathlib
import base64
import datetime
import json
import mimetypes
import os
import shutil
import uuid
import aiofiles
import aiohttp
import aiohttp.web
import hashlib
from app.cache import time_cache_async
from lxml import etree
from urllib.parse import urlparse

logging.basicConfig(level=logging.DEBUG)

@aiohttp.web.middleware
async def debug_middleware(request, handler):
print(request.method, request.path, request.headers)
result = await handler(request)
print(result.status)
try:
print(await result.text())
except:
pass
return result

class WebdavApplication(aiohttp.web.Application):
def __init__(self, parent, *args, **kwargs):
middlewares = [debug_middleware]
super().__init__(middlewares=middlewares, *args, **kwargs)
self.locks = {} # rel_path -> lock_dict
self.relative_url = "/webdav"
self.router.add_route("OPTIONS", "/{filename:.*}", self.handle_options)
self.router.add_route("GET", "/{filename:.*}", self.handle_get)
self.router.add_route("HEAD", "/{filename:.*}", self.handle_head)
self.router.add_route("PUT", "/{filename:.*}", self.handle_put)
self.router.add_route("DELETE", "/{filename:.*}", self.handle_delete)
self.router.add_route("MKCOL", "/{filename:.*}", self.handle_mkcol)
self.router.add_route("MOVE", "/{filename:.*}", self.handle_move)
self.router.add_route("COPY", "/{filename:.*}", self.handle_copy)
self.router.add_route("PROPFIND", "/{filename:.*}", self.handle_propfind)
self.router.add_route("PROPPATCH", "/{filename:.*}", self.handle_proppatch)
self.router.add_route("LOCK", "/{filename:.*}", self.handle_lock)
self.router.add_route("UNLOCK", "/{filename:.*}", self.handle_unlock)
self.router.add_route("PROPGET", "/{filename:.*}", self.handle_propget)
self.router.add_route("PROPSET", "/{filename:.*}", self.handle_propset)
self.router.add_route("PROPDEL", "/{filename:.*}", self.handle_propdel)
self.parent = parent

@property
def db(self):
return self.parent.db

@property
def services(self):
return self.parent.services

async def authenticate(self, request):
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Basic "):
return False
encoded_creds = auth_header.split("Basic ")[1]
decoded_creds = base64.b64decode(encoded_creds).decode()
username, password = decoded_creds.split(":", 1)
request["user"] = await self.services.user.authenticate(
username=username, password=password
)
try:
request["home"] = await self.services.user.get_home_folder(
request["user"]["uid"]
)
except Exception:
pass
return request["user"]

async def is_locked(self, abs_path, request):
rel_path = abs_path.relative_to(request["home"]).as_posix()
path = abs_path
while path != request["home"].parent:
rel = path.relative_to(request["home"]).as_posix()
if rel in self.locks:
lock = self.locks[rel]
if self.is_lock_expired(lock):
del self.locks[rel]
continue
if path == abs_path or lock["depth"] == "infinity":
return lock
path = path.parent
return None

async def has_descendant_locks(self, rel_path, is_dir):
if not is_dir:
return False
for lock_path in list(self.locks.keys()):
if lock_path.startswith(rel_path + "/"):
if not self.is_lock_expired(self.locks[lock_path]):
return True
else:
del self.locks[lock_path]
return False

def is_lock_expired(self, lock):
if lock["timeout"] is None:
return False
delta = datetime.datetime.utcnow() - lock["created"]
return delta.total_seconds() > lock["timeout"]

async def handle_get(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="File not found")
if abs_path.is_dir():
return aiohttp.web.Response(status=403, text="Cannot download a directory")
lock = await self.is_locked(abs_path, request)
if lock:
submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "")
if submitted != lock["token"]:
return aiohttp.web.Response(status=423, text="Locked")
content_type, _ = mimetypes.guess_type(str(abs_path))
content_type = content_type or "application/octet-stream"
etag = f'"{hashlib.sha1(str(abs_path.stat().st_mtime).encode()).hexdigest()}"'
headers = {"Content-Type": content_type, "ETag": etag}
return aiohttp.web.FileResponse(path=str(abs_path), headers=headers, chunk_size=8192)

async def handle_head(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="File not found")
if abs_path.is_dir():
return aiohttp.web.Response(status=403, text="Cannot get metadata for a directory")
lock = await self.is_locked(abs_path, request)
if lock:
submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "")
if submitted != lock["token"]:
return aiohttp.web.Response(status=423, text="Locked")
content_type, _ = mimetypes.guess_type(str(abs_path))
content_type = content_type or "application/octet-stream"
file_size = abs_path.stat().st_size
etag = f'"{hashlib.sha1(str(abs_path.stat().st_mtime).encode()).hexdigest()}"'
headers = {
"Content-Type": content_type,
"Content-Length": str(file_size),
"Last-Modified": self.get_last_modified(abs_path),
"ETag": etag,
}
return aiohttp.web.Response(status=200, headers=headers)

async def handle_put(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
lock = await self.is_locked(abs_path, request)
if lock:
submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "")
if submitted != lock["token"]:
return aiohttp.web.Response(status=423, text="Locked")
abs_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(abs_path, "wb") as f:
while chunk := await request.content.read(1024):
await f.write(chunk)
return aiohttp.web.Response(status=201, text="File uploaded")

async def handle_delete(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
lock = await self.is_locked(abs_path, request)
if lock:
submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "")
if submitted != lock["token"]:
return aiohttp.web.Response(status=423, text="Locked")
if abs_path.is_file():
abs_path.unlink()
props_file = self.get_props_file_path(abs_path)
if props_file.exists():
props_file.unlink()
rel_path = abs_path.relative_to(request["home"]).as_posix()
if rel_path in self.locks:
del self.locks[rel_path]
return aiohttp.web.Response(status=204)
elif abs_path.is_dir():
if await self.has_descendant_locks(abs_path.relative_to(request["home"]).as_posix(), True):
return aiohttp.web.Response(status=423, text="Locked")
shutil.rmtree(abs_path)
rel_path = abs_path.relative_to(request["home"]).as_posix()
if rel_path in self.locks:
del self.locks[rel_path]
return aiohttp.web.Response(status=204)
return aiohttp.web.Response(status=404, text="Not found")

async def handle_mkcol(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
lock = await self.is_locked(abs_path, request)
if lock:
submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "")
if submitted != lock["token"]:
return aiohttp.web.Response(status=423, text="Locked")
if abs_path.exists():
return aiohttp.web.Response(status=405, text="Directory already exists")
abs_path.mkdir(parents=True, exist_ok=True)
return aiohttp.web.Response(status=201, text="Directory created")

async def handle_move(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
src_path = request["home"] / request.match_info.get("filename", "")
destination = request.headers.get("Destination", "")
if not destination:
return aiohttp.web.Response(status=400, text="Destination header missing")
parsed = urlparse(destination)
if parsed.scheme and (parsed.scheme != request.scheme or parsed.netloc != request.host):
return aiohttp.web.Response(status=502, text="Bad Gateway")
dest_rel = parsed.path[len(self.relative_url)+1:] if parsed.path.startswith(self.relative_url) else ""
dest_path = request["home"] / dest_rel
if not src_path.exists():
return aiohttp.web.Response(status=404, text="Source not found")
src_lock = await self.is_locked(src_path, request)
dest_lock = await self.is_locked(dest_path, request)
submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "")
if src_lock and src_lock["token"] != submitted:
return aiohttp.web.Response(status=423, text="Source Locked")
if dest_lock and dest_lock["token"] != submitted:
return aiohttp.web.Response(status=423, text="Destination Locked")
overwrite = request.headers.get("Overwrite", "T") == "T"
if dest_path.exists() and not overwrite:
return aiohttp.web.Response(status=412, text="Precondition Failed")
if dest_path.exists():
if dest_path.is_dir():
shutil.rmtree(dest_path)
else:
dest_path.unlink()
shutil.move(str(src_path), str(dest_path))
src_props = self.get_props_file_path(src_path)
if src_props.exists():
dest_props = self.get_props_file_path(dest_path)
shutil.move(str(src_props), str(dest_props))
src_rel = src_path.relative_to(request["home"]).as_posix()
dest_rel = dest_path.relative_to(request["home"]).as_posix()
if src_rel in self.locks:
self.locks[dest_rel] = self.locks.pop(src_rel)
return aiohttp.web.Response(status=201, text="Moved successfully")

async def handle_copy(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
src_path = request["home"] / request.match_info.get("filename", "")
destination = request.headers.get("Destination", "")
if not destination:
return aiohttp.web.Response(status=400, text="Destination header missing")
parsed = urlparse(destination)
if parsed.scheme and (parsed.scheme != request.scheme or parsed.netloc != request.host):
return aiohttp.web.Response(status=502, text="Bad Gateway")
dest_rel = parsed.path[len(self.relative_url)+1:] if parsed.path.startswith(self.relative_url) else ""
dest_path = request["home"] / dest_rel
if not src_path.exists():
return aiohttp.web.Response(status=404, text="Source not found")
src_lock = await self.is_locked(src_path, request)
dest_lock = await self.is_locked(dest_path, request)
submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "")
if src_lock and src_lock["token"] != submitted:
return aiohttp.web.Response(status=423, text="Source Locked")
if dest_lock and dest_lock["token"] != submitted:
return aiohttp.web.Response(status=423, text="Destination Locked")
overwrite = request.headers.get("Overwrite", "T") == "T"
if dest_path.exists() and not overwrite:
return aiohttp.web.Response(status=412, text="Precondition Failed")
if dest_path.exists():
if dest_path.is_dir():
shutil.rmtree(dest_path)
else:
dest_path.unlink()
if src_path.is_file():
shutil.copy2(str(src_path), str(dest_path))
else:
shutil.copytree(str(src_path), str(dest_path))
src_props = self.get_props_file_path(src_path)
if src_props.exists():
dest_props = self.get_props_file_path(dest_path)
shutil.copy2(str(src_props), str(dest_props))
return aiohttp.web.Response(status=201, text="Copied successfully")

async def handle_options(self, request):
headers = {
"DAV": "1, 2",
"Allow": "OPTIONS, GET, HEAD, PUT, DELETE, MKCOL, MOVE, COPY, PROPFIND, PROPPATCH, LOCK, UNLOCK, PROPGET, PROPSET, PROPDEL",
"MS-Author-Via": "DAV",
}
return aiohttp.web.Response(status=200, headers=headers)

def get_current_utc_time(self, filepath):
if filepath.exists():
modified_time = datetime.datetime.utcfromtimestamp(filepath.stat().st_mtime)
else:
modified_time = datetime.datetime.utcnow()
return modified_time.strftime("%Y-%m-%dT%H:%M:%SZ"), modified_time.strftime(
"%a, %d %b %Y %H:%M:%S GMT"
)

@time_cache_async(10)
async def get_file_size(self, path):
loop = self.parent.loop
stat = await loop.run_in_executor(None, os.stat, path)
return stat.st_size

@time_cache_async(10)
async def get_directory_size(self, directory):
total_size = 0
for dirpath, _, filenames in os.walk(directory):
for f in filenames:
fp = pathlib.Path(dirpath) / f
if fp.exists():
total_size += await self.get_file_size(str(fp))
return total_size

@time_cache_async(30)
async def get_disk_free_space(self, path):
loop = self.parent.loop
statvfs = await loop.run_in_executor(None, os.statvfs, path)
return statvfs.f_bavail * statvfs.f_frsize

async def create_propfind_node(self, request, response_xml, full_path, depth, requested_props, propname_only, all_props):
abs_path = pathlib.Path(full_path)
relative_path = abs_path.relative_to(request["home"]).as_posix()
href_path = f"{self.relative_url}/{relative_path}".strip(".")
href_path = href_path.replace("./", "/").replace("//", "/")
response = etree.SubElement(response_xml, "{DAV:}response")
href = etree.SubElement(response, "{DAV:}href")
href.text = href_path
custom_properties = await self.load_properties(abs_path)
propstat_ok = etree.SubElement(response, "{DAV:}propstat")
prop_ok = etree.SubElement(propstat_ok, "{DAV:}prop")
found_props = []
standard_props = {
"{DAV:}resourcetype": None,
"{DAV:}creationdate": None,
"{DAV:}getlastmodified": None,
"{DAV:}displayname": full_path.name,
"{DAV:}lockdiscovery": None,
"{DAV:}supportedlock": None,
"{DAV:}quota-used-bytes": None,
"{DAV:}quota-available-bytes": None,
"{DAV:}getetag": None,
}
if full_path.is_file():
mimetype, _ = mimetypes.guess_type(full_path.name)
standard_props["{DAV:}getcontentlength"] = None
standard_props["{DAV:}getcontenttype"] = mimetype
if not all_props and requested_props:
propstat_notfound = None
prop_notfound = None
for prop_tag, prop_nsmap in requested_props:
if prop_tag in standard_props:
await self.add_standard_property(prop_ok, prop_tag, full_path, request)
found_props.append(prop_tag)
elif prop_tag in custom_properties:
if propname_only:
etree.SubElement(prop_ok, prop_tag, nsmap=prop_nsmap)
else:
elem = etree.SubElement(prop_ok, prop_tag, nsmap=prop_nsmap)
elem.text = str(custom_properties[prop_tag])
found_props.append(prop_tag)
else:
if propstat_notfound is None:
propstat_notfound = etree.SubElement(response, "{DAV:}propstat")
prop_notfound = etree.SubElement(propstat_notfound, "{DAV:}prop")
etree.SubElement(prop_notfound, prop_tag, nsmap=prop_nsmap)
if propstat_notfound is not None:
etree.SubElement(propstat_notfound, "{DAV:}status").text = "HTTP/1.1 404 Not Found"
else:
for prop_name in standard_props:
if propname_only:
etree.SubElement(prop_ok, prop_name)
else:
await self.add_standard_property(prop_ok, prop_name, full_path, request)
for prop_name, prop_value in custom_properties.items():
if propname_only:
if prop_name.startswith("{"):
ns_end = prop_name.find("}")
ns = prop_name[1:ns_end]
local_name = prop_name[ns_end+1:]
prop_nsmap = {"D": "DAV:", None: ns} if ns != "DAV:" else {"D": "DAV:"}
etree.SubElement(prop_ok, local_name, nsmap=prop_nsmap)
else:
etree.SubElement(prop_ok, prop_name)
else:
elem = etree.SubElement(prop_ok, prop_name)
elem.text = str(prop_value)
etree.SubElement(propstat_ok, "{DAV:}status").text = "HTTP/1.1 200 OK"
if abs_path.is_dir() and depth > 0:
for item in abs_path.iterdir():
if item.name.startswith(".") and item.name.endswith(".webdav_props.json"):
continue
await self.create_propfind_node(request, response_xml, item, depth - 1, requested_props, propname_only, all_props)

async def add_standard_property(self, prop_elem, prop_name, full_path, request):
if prop_name == "{DAV:}resourcetype":
res_type = etree.SubElement(prop_elem, "{DAV:}resourcetype")
if full_path.is_dir():
etree.SubElement(res_type, "{DAV:}collection")
elif prop_name == "{DAV:}creationdate":
creation_date, _ = self.get_current_utc_time(full_path)
etree.SubElement(prop_elem, "{DAV:}creationdate").text = creation_date
elif prop_name == "{DAV:}getlastmodified":
_, last_modified = self.get_current_utc_time(full_path)
etree.SubElement(prop_elem, "{DAV:}getlastmodified").text = last_modified
elif prop_name == "{DAV:}displayname":
etree.SubElement(prop_elem, "{DAV:}displayname").text = full_path.name
elif prop_name == "{DAV:}lockdiscovery":
lockdiscovery = etree.SubElement(prop_elem, "{DAV:}lockdiscovery")
rel_path = full_path.relative_to(request["home"]).as_posix()
if rel_path in self.locks:
lock = self.locks[rel_path]
if not self.is_lock_expired(lock):
activelock = etree.SubElement(lockdiscovery, "{DAV:}activelock")
locktype = etree.SubElement(activelock, "{DAV:}locktype")
etree.SubElement(locktype, f"{{DAV:}}{lock['type']}")
lockscope = etree.SubElement(activelock, "{DAV:}lockscope")
etree.SubElement(lockscope, f"{{DAV:}}{lock['scope']}")
etree.SubElement(activelock, "{DAV:}depth").text = lock['depth'].capitalize()
if lock['owner']:
owner = etree.fromstring(lock['owner'])
activelock.append(owner)
timeout_str = "Infinite" if lock['timeout'] is None else f"Second-{lock['timeout']}"
etree.SubElement(activelock, "{DAV:}timeout").text = timeout_str
locktoken = etree.SubElement(activelock, "{DAV:}locktoken")
etree.SubElement(locktoken, "{DAV:}href").text = f"opaquelocktoken:{lock['token']}"
elif prop_name == "{DAV:}supportedlock":
supported_lock = etree.SubElement(prop_elem, "{DAV:}supportedlock")
lock_entry_1 = etree.SubElement(supported_lock, "{DAV:}lockentry")
lock_scope_1 = etree.SubElement(lock_entry_1, "{DAV:}lockscope")
etree.SubElement(lock_scope_1, "{DAV:}exclusive")
lock_type_1 = etree.SubElement(lock_entry_1, "{DAV:}locktype")
etree.SubElement(lock_type_1, "{DAV:}write")
lock_entry_2 = etree.SubElement(supported_lock, "{DAV:}lockentry")
lock_scope_2 = etree.SubElement(lock_entry_2, "{DAV:}lockscope")
etree.SubElement(lock_scope_2, "{DAV:}shared")
lock_type_2 = etree.SubElement(lock_entry_2, "{DAV:}locktype")
etree.SubElement(lock_type_2, "{DAV:}write")
elif prop_name == "{DAV:}quota-used-bytes":
size = await self.get_file_size(full_path) if full_path.is_file() else await self.get_directory_size(full_path)
etree.SubElement(prop_elem, "{DAV:}quota-used-bytes").text = str(size)
elif prop_name == "{DAV:}quota-available-bytes":
free_space = await self.get_disk_free_space(str(full_path))
etree.SubElement(prop_elem, "{DAV:}quota-available-bytes").text = str(free_space)
elif prop_name == "{DAV:}getcontentlength" and full_path.is_file():
size = await self.get_file_size(full_path)
etree.SubElement(prop_elem, "{DAV:}getcontentlength").text = str(size)
elif prop_name == "{DAV:}getcontenttype" and full_path.is_file():
mimetype, _ = mimetypes.guess_type(full_path.name)
if mimetype:
etree.SubElement(prop_elem, "{DAV:}getcontenttype").text = mimetype
elif prop_name == "{DAV:}getetag" and full_path.is_file():
etag = f'"{hashlib.sha1(str(full_path.stat().st_mtime).encode()).hexdigest()}"'
etree.SubElement(prop_elem, "{DAV:}getetag").text = etag

async def handle_propfind(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
depth = request.headers.get("Depth", "0")
if depth == "infinity":
depth = float("inf")
else:
try:
depth = int(depth)
except ValueError:
depth = 0
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="Resource not found")
body = await request.read()
requested_props = []
propname_only = False
all_props = True
if body:
try:
root = etree.fromstring(body)
if root.find(".//{DAV:}propname") is not None:
propname_only = True
all_props = False
prop_elem = root.find(".//{DAV:}prop")
if prop_elem is not None and not propname_only:
all_props = False
for child in prop_elem:
requested_props.append((child.tag, child.nsmap))
if root.find(".//{DAV:}allprop") is not None:
all_props = True
except:
pass
nsmap = {"D": "DAV:"}
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
await self.create_propfind_node(request, response_xml, abs_path, depth, requested_props, propname_only, all_props)
xml_output = etree.tostring(response_xml, encoding="utf-8", xml_declaration=True).decode()
return aiohttp.web.Response(status=207, text=xml_output, content_type="application/xml")

async def handle_proppatch(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="Resource not found")
lock = await self.is_locked(abs_path, request)
if lock:
submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "")
if submitted != lock["token"]:
return aiohttp.web.Response(status=423, text="Locked")
body = await request.read()
if not body:
return aiohttp.web.Response(status=400, text="Bad Request")
try:
root = etree.fromstring(body)
properties = await self.load_properties(abs_path)
nsmap = {"D": "DAV:"}
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
response = etree.SubElement(response_xml, "{DAV:}response")
href = etree.SubElement(response, "{DAV:}href")
relative_path = abs_path.relative_to(request["home"]).as_posix()
href_path = f"{self.relative_url}/{relative_path}".strip(".").replace("./", "/").replace("//", "/")
href.text = href_path
set_elem = root.find(".//{DAV:}set")
if set_elem is not None:
prop_elem = set_elem.find("{DAV:}prop")
if prop_elem is not None:
propstat = etree.SubElement(response, "{DAV:}propstat")
prop = etree.SubElement(propstat, "{DAV:}prop")
for child in prop_elem:
if child.tag.startswith("{"):
ns_end = child.tag.find("}")
ns = child.tag[1:ns_end]
local_name = child.tag[ns_end+1:]
else:
ns = "DAV:"
local_name = child.tag
prop_name = f"{{{ns}}}{local_name}"
prop_value = child.text or ""
properties[prop_name] = prop_value
elem = etree.SubElement(prop, child.tag, nsmap=child.nsmap)
etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK"
remove_elem = root.find(".//{DAV:}remove")
if remove_elem is not None:
prop_elem = remove_elem.find("{DAV:}prop")
if prop_elem is not None:
propstat = etree.SubElement(response, "{DAV:}propstat")
prop = etree.SubElement(propstat, "{DAV:}prop")
for child in prop_elem:
if child.tag.startswith("{"):
ns_end = child.tag.find("}")
ns = child.tag[1:ns_end]
local_name = child.tag[ns_end+1:]
else:
ns = "DAV:"
local_name = child.tag
prop_name = f"{{{ns}}}{local_name}"
if prop_name in properties:
del properties[prop_name]
status = "HTTP/1.1 200 OK"
else:
status = "HTTP/1.1 404 Not Found"
elem = etree.SubElement(prop, child.tag, nsmap=child.nsmap)
etree.SubElement(propstat, "{DAV:}status").text = status
if properties:
await self.save_properties(abs_path, properties)
else:
await self.delete_properties_file(abs_path)
xml_output = etree.tostring(response_xml, encoding="utf-8", xml_declaration=True).decode()
return aiohttp.web.Response(status=207, text=xml_output, content_type="application/xml")
except Exception as e:
logging.error(f"PROPPATCH error: {e}")
return aiohttp.web.Response(status=400, text="Bad Request")

async def handle_lock(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
resource = abs_path.relative_to(request["home"]).as_posix()
body = await request.read()
timeout_str = request.headers.get("Timeout", "Second-3600")
if timeout_str == "Infinite":
timeout = None
elif timeout_str.startswith("Second-"):
try:
timeout = int(timeout_str[7:])
except:
timeout = 3600
else:
timeout = 3600
depth = request.headers.get("Depth", "infinity").lower()
if depth not in ["0", "infinity"]:
return aiohttp.web.Response(status=400, text="Invalid Depth")
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="Resource not found")
if body:
try:
root = etree.fromstring(body)
lockinfo = root.find(".//{DAV:}lockinfo")
if lockinfo is None:
return aiohttp.web.Response(status=400, text="Invalid lockinfo")
scope = "exclusive"
scope_elem = lockinfo.find("{DAV:}lockscope")
if scope_elem is not None:
if scope_elem.find("{DAV:}shared") is not None:
scope = "shared"
locktype = "write"
owner_elem = lockinfo.find("{DAV:}owner")
owner_xml = etree.tostring(owner_elem, encoding="unicode") if owner_elem is not None else ""
covering_lock = await self.is_locked(abs_path, request)
if covering_lock:
return aiohttp.web.Response(status=423, text="Locked")
if abs_path.is_dir() and depth == "infinity" and await self.has_descendant_locks(resource, True):
return aiohttp.web.Response(status=423, text="Locked")
token = str(uuid.uuid4())
lock_dict = {
"token": token,
"scope": scope,
"type": locktype,
"owner": owner_xml,
"depth": depth,
"timeout": timeout,
"created": datetime.datetime.utcnow()
}
self.locks[resource] = lock_dict
xml = await self.generate_lock_response(lock_dict)
headers = {"Lock-Token": f"<opaquelocktoken:{token}>"}
return aiohttp.web.Response(status=200, text=xml, content_type="application/xml", headers=headers)
except Exception as e:
logging.error(f"LOCK error: {e}")
return aiohttp.web.Response(status=400, text="Bad Request")
else:
if resource not in self.locks:
return aiohttp.web.Response(status=412, text="Precondition Failed")
lock = self.locks[resource]
if self.is_lock_expired(lock):
del self.locks[resource]
return aiohttp.web.Response(status=412, text="Precondition Failed")
submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "")
if submitted != lock["token"]:
return aiohttp.web.Response(status=409, text="Conflict")
lock["created"] = datetime.datetime.utcnow()
lock["timeout"] = timeout
xml = await self.generate_lock_response(lock)
headers = {"Lock-Token": f"<opaquelocktoken:{lock['token']}>"}
return aiohttp.web.Response(status=200, text=xml, content_type="application/xml", headers=headers)

async def handle_unlock(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
resource = request.match_info.get("filename", "")
if resource not in self.locks:
return aiohttp.web.Response(status=409, text="Conflict")
lock = self.locks[resource]
if self.is_lock_expired(lock):
del self.locks[resource]
return aiohttp.web.Response(status=409, text="Conflict")
submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "")
if submitted != lock["token"]:
return aiohttp.web.Response(status=400, text="Invalid Lock Token")
del self.locks[resource]
return aiohttp.web.Response(status=204)

async def generate_lock_response(self, lock_dict):
nsmap = {"D": "DAV:"}
prop = etree.Element("{DAV:}prop", nsmap=nsmap)
lockdiscovery = etree.SubElement(prop, "{DAV:}lockdiscovery")
activelock = etree.SubElement(lockdiscovery, "{DAV:}activelock")
locktype = etree.SubElement(activelock, "{DAV:}locktype")
etree.SubElement(locktype, f"{{DAV:}}{lock_dict['type']}")
lockscope = etree.SubElement(activelock, "{DAV:}lockscope")
etree.SubElement(lockscope, f"{{DAV:}}{lock_dict['scope']}")
etree.SubElement(activelock, "{DAV:}depth").text = lock_dict['depth'].capitalize()
if lock_dict['owner']:
owner = etree.fromstring(lock_dict['owner'])
activelock.append(owner)
timeout_str = "Infinite" if lock_dict['timeout'] is None else f"Second-{lock_dict['timeout']}"
etree.SubElement(activelock, "{DAV:}timeout").text = timeout_str
locktoken = etree.SubElement(activelock, "{DAV:}locktoken")
etree.SubElement(locktoken, "{DAV:}href").text = f"opaquelocktoken:{lock_dict['token']}"
return etree.tostring(prop, encoding="utf-8", xml_declaration=True).decode()

def get_last_modified(self, path):
if not path.exists():
return None
timestamp = path.stat().st_mtime
dt = datetime.datetime.utcfromtimestamp(timestamp)
return dt.strftime("%a, %d %b %Y %H:%M:%S GMT")

def get_props_file_path(self, resource_path):
if resource_path.is_dir():
return resource_path / ".webdav_props.json"
else:
return resource_path.parent / f".{resource_path.name}.webdav_props.json"

async def load_properties(self, resource_path):
props_file = self.get_props_file_path(resource_path)
if props_file.exists():
async with aiofiles.open(props_file, "r") as f:
content = await f.read()
return json.loads(content)
return {}

async def save_properties(self, resource_path, properties):
props_file = self.get_props_file_path(resource_path)
async with aiofiles.open(props_file, "w") as f:
await f.write(json.dumps(properties, indent=2))

async def delete_properties_file(self, resource_path):
props_file = self.get_props_file_path(resource_path)
if props_file.exists():
props_file.unlink()

async def handle_propget(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="Resource not found")
properties = await self.load_properties(abs_path)
body = await request.read()
requested_props = []
if body:
try:
root = etree.fromstring(body)
prop_elem = root.find(".//{DAV:}prop")
if prop_elem is not None:
for child in prop_elem:
ns = child.nsmap.get(None, "DAV:")
requested_props.append(f"{{{ns}}}{child.tag.split('}')[-1]}")
except:
pass
nsmap = {"D": "DAV:"}
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
response = etree.SubElement(response_xml, "{DAV:}response")
href = etree.SubElement(response, "{DAV:}href")
relative_path = abs_path.relative_to(request["home"]).as_posix()
href_path = f"{self.relative_url}/{relative_path}".strip(".").replace("./", "/").replace("//", "/")
href.text = href_path
propstat = etree.SubElement(response, "{DAV:}propstat")
prop = etree.SubElement(propstat, "{DAV:}prop")
if requested_props:
for prop_name in requested_props:
if prop_name in properties:
elem = etree.SubElement(prop, prop_name)
elem.text = str(properties[prop_name])
else:
for prop_name, prop_value in properties.items():
elem = etree.SubElement(prop, prop_name)
elem.text = str(prop_value)
etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK"
xml_output = etree.tostring(response_xml, encoding="utf-8", xml_declaration=True).decode()
return aiohttp.web.Response(status=207, text=xml_output, content_type="application/xml")

async def handle_propset(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="Resource not found")
lock = await self.is_locked(abs_path, request)
if lock:
submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "")
if submitted != lock["token"]:
return aiohttp.web.Response(status=423, text="Locked")
body = await request.read()
if not body:
return aiohttp.web.Response(status=400, text="Bad Request")
try:
root = etree.fromstring(body)
prop_elem = root.find(".//{DAV:}prop")
if prop_elem is None:
return aiohttp.web.Response(status=400, text="Bad Request")
properties = await self.load_properties(abs_path)
for child in prop_elem:
ns = child.nsmap.get(None, "DAV:")
prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}"
prop_value = child.text or ""
properties[prop_name] = prop_value
await self.save_properties(abs_path, properties)
nsmap = {"D": "DAV:"}
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
response = etree.SubElement(response_xml, "{DAV:}response")
href = etree.SubElement(response, "{DAV:}href")
relative_path = abs_path.relative_to(request["home"]).as_posix()
href_path = f"{self.relative_url}/{relative_path}".strip(".").replace("./", "/").replace("//", "/")
href.text = href_path
propstat = etree.SubElement(response, "{DAV:}propstat")
prop = etree.SubElement(propstat, "{DAV:}prop")
for child in prop_elem:
ns = child.nsmap.get(None, "DAV:")
prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}"
elem = etree.SubElement(prop, prop_name)
etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK"
xml_output = etree.tostring(response_xml, encoding="utf-8", xml_declaration=True).decode()
return aiohttp.web.Response(status=207, text=xml_output, content_type="application/xml")
except Exception as e:
logging.error(f"PROPSET error: {e}")
return aiohttp.web.Response(status=400, text="Bad Request")

async def handle_propdel(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="Resource not found")
lock = await self.is_locked(abs_path, request)
if lock:
submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "")
if submitted != lock["token"]:
return aiohttp.web.Response(status=423, text="Locked")
body = await request.read()
if not body:
await self.delete_properties_file(abs_path)
return aiohttp.web.Response(status=204)
try:
root = etree.fromstring(body)
prop_elem = root.find(".//{DAV:}prop")
if prop_elem is None:
return aiohttp.web.Response(status=400, text="Bad Request")
properties = await self.load_properties(abs_path)
for child in prop_elem:
ns = child.nsmap.get(None, "DAV:")
prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}"
if prop_name in properties:
del properties[prop_name]
if properties:
await self.save_properties(abs_path, properties)
else:
await self.delete_properties_file(abs_path)
nsmap = {"D": "DAV:"}
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
response = etree.SubElement(response_xml, "{DAV:}response")
href = etree.SubElement(response, "{DAV:}href")
relative_path = abs_path.relative_to(request["home"]).as_posix()
href_path = f"{self.relative_url}/{relative_path}".strip(".").replace("./", "/").replace("//", "/")
href.text = href_path
propstat = etree.SubElement(response, "{DAV:}propstat")
prop = etree.SubElement(propstat, "{DAV:}prop")
for child in prop_elem:
ns = child.nsmap.get(None, "DAV:")
prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}"
elem = etree.SubElement(prop, prop_name)
etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK"
xml_output = etree.tostring(response_xml, encoding="utf-8", xml_declaration=True).decode()
return aiohttp.web.Response(status=207, text=xml_output, content_type="application/xml")
except Exception as e:
logging.error(f"PROPDEL error: {e}")
return aiohttp.web.Response(status=400, text="Bad Request")