Update socket communicaton and removed prints.
This commit is contained in:
parent
37da903936
commit
1f8ebf71d0
@ -28,7 +28,6 @@ class ChannelMemberService(BaseService):
|
||||
model["is_read_only"] = is_read_only
|
||||
model["is_muted"] = is_muted
|
||||
model["is_banned"] = is_banned
|
||||
print(model.record, flush=True)
|
||||
if await self.save(model):
|
||||
return model
|
||||
raise Exception(f"Failed to create channel member: {model.errors}.")
|
||||
@ -42,13 +41,9 @@ class ChannelMemberService(BaseService):
|
||||
channel_member = await self.get(channel_uid=channel_uid, user_uid=user_uid)
|
||||
channel = await self.services.channel.get(uid=channel_member['channel_uid'])
|
||||
if channel["tag"] != "dm":
|
||||
print("NONT!\n", flush=True)
|
||||
return None
|
||||
print("YEAHH",flush=True)
|
||||
async for model in self.services.channel_member.find(channel_uid=channel_uid):
|
||||
print("huh!!!",model['uid'],flush=True)
|
||||
if model["uid"] != channel_member['uid']:
|
||||
print("GOOOD!!",flush=True)
|
||||
return await self.services.user.get(uid=model["user_uid"])
|
||||
|
||||
async def create_dm(self,channel_uid, from_user_uid, to_user_uid):
|
||||
|
@ -38,7 +38,6 @@ class ChannelMessageService(BaseService):
|
||||
async def to_extended_dict(self, message):
|
||||
user = await self.services.user.get(uid=message["user_uid"])
|
||||
if not user:
|
||||
print("User not found!", flush=True)
|
||||
return {}
|
||||
return {
|
||||
"uid": message["uid"],
|
||||
@ -52,10 +51,11 @@ class ChannelMessageService(BaseService):
|
||||
"username": user['username']
|
||||
}
|
||||
|
||||
async def offset(self, channel_uid, offset=0):
|
||||
async def offset(self, channel_uid, page=0, page_size=30):
|
||||
results = []
|
||||
offset = page * page_size
|
||||
try:
|
||||
async for model in self.query("SELECT * FROM channel_message WHERE channel_uid=:channel_uid ORDER BY created_at DESC LIMIT 60 OFFSET :offset",dict(channel_uid=channel_uid, offset=offset)):
|
||||
async for model in self.query("SELECT * FROM channel_message WHERE channel_uid=:channel_uid ORDER BY created_at DESC LIMIT :page_size OFFSET :offset",dict(channel_uid=channel_uid, page_size=page_size, offset=offset)):
|
||||
results.append(model)
|
||||
except:
|
||||
pass
|
||||
|
@ -1,4 +1,4 @@
|
||||
|
||||
from snek.model.user import UserModel
|
||||
|
||||
|
||||
from snek.system.service import BaseService
|
||||
@ -6,35 +6,58 @@ from snek.system.service import BaseService
|
||||
|
||||
class SocketService(BaseService):
|
||||
|
||||
def __init__(self, app):
|
||||
super().__init__(app)
|
||||
self.sockets = set()
|
||||
self.subscriptions = {}
|
||||
class Socket:
|
||||
def __init__(self, ws, user: UserModel):
|
||||
self.ws = ws
|
||||
self.is_connected = True
|
||||
self.user = user
|
||||
|
||||
async def add(self, ws):
|
||||
self.sockets.add(ws)
|
||||
|
||||
async def subscribe(self, ws, channel_uid):
|
||||
if not channel_uid in self.subscriptions:
|
||||
self.subscriptions[channel_uid] = set()
|
||||
self.subscriptions[channel_uid].add(ws)
|
||||
|
||||
async def broadcast(self, channel_uid, message):
|
||||
print("BROADCAT!",message)
|
||||
count = 0
|
||||
subscriptions = set(self.subscriptions.get(channel_uid,[]))
|
||||
for ws in subscriptions:
|
||||
async def send_json(self, data):
|
||||
if not self.is_connected:
|
||||
return False
|
||||
try:
|
||||
await ws.send_json(message)
|
||||
await self.ws.send_json(data)
|
||||
except Exception as ex:
|
||||
print(ex,flush=True)
|
||||
print("Deleting socket.",flush=True)
|
||||
self.subscriptions[channel_uid].remove(ws)
|
||||
self.is_connected = False
|
||||
return True
|
||||
|
||||
async def close(self):
|
||||
if not self.is_connected:
|
||||
return True
|
||||
|
||||
await self.ws.close()
|
||||
self.is_connected = False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def __init__(self, app):
|
||||
super().__init__(app)
|
||||
self.sockets = []
|
||||
self.subscriptions = {}
|
||||
|
||||
async def add(self, ws, user_uid):
|
||||
self.sockets.append(self.Socket(ws, await self.app.services.user.get(uid=user_uid)))
|
||||
|
||||
async def subscribe(self, ws,channel_uid, user_uid):
|
||||
if not channel_uid in self.subscriptions:
|
||||
self.subscriptions[channel_uid] = set()
|
||||
s = self.Socket(ws,await self.app.services.user.get(uid=user_uid))
|
||||
self.subscriptions[channel_uid].add(s)
|
||||
|
||||
async def broadcast(self, channel_uid, message):
|
||||
count = 0
|
||||
subscriptions = set(self.subscriptions.get(channel_uid,[]))
|
||||
for s in subscriptions:
|
||||
if not await s.send_json(message):
|
||||
self.subscriptions[channel_uid].remove(s)
|
||||
continue
|
||||
count += 1
|
||||
return count
|
||||
async def delete(self, ws):
|
||||
try:
|
||||
self.sockets.remove(ws)
|
||||
except :
|
||||
pass
|
||||
for s in self.sockets:
|
||||
if s.ws == ws:
|
||||
await s.close()
|
||||
self.sockets.remove(s)
|
||||
|
@ -86,18 +86,17 @@ async def repair_links(base_url, html_content):
|
||||
|
||||
|
||||
async def is_html_content(content: bytes):
|
||||
if not content:
|
||||
return False
|
||||
try:
|
||||
content = content.decode(errors="ignore")
|
||||
except:
|
||||
pass
|
||||
marks = ["<html", "<img", "<p", "<span", "<div"]
|
||||
try:
|
||||
content = content.lower()
|
||||
for mark in marks:
|
||||
if mark in content:
|
||||
return True
|
||||
except Exception as ex:
|
||||
print(ex)
|
||||
return False
|
||||
|
||||
|
||||
|
@ -50,9 +50,9 @@ class RPCView(BaseView):
|
||||
record = user.record
|
||||
del record['password']
|
||||
del record['deleted_at']
|
||||
await self.services.socket.add(self.ws)
|
||||
await self.services.socket.add(self.ws,self.view.request.session.get('uid'))
|
||||
async for subscription in self.services.channel_member.find(user_uid=self.view.request.session.get("uid"), deleted_at=None, is_banned=False):
|
||||
await self.services.socket.subscribe(self.ws, subscription["channel_uid"])
|
||||
await self.services.socket.subscribe(self.ws, subscription["channel_uid"], self.view.request.session.get("uid"))
|
||||
return record
|
||||
|
||||
async def search_user(self, query):
|
||||
@ -74,9 +74,7 @@ class RPCView(BaseView):
|
||||
async def get_messages(self, channel_uid, offset=0):
|
||||
self._require_login()
|
||||
messages = []
|
||||
print("Channel uid:", channel_uid, flush=True)
|
||||
for message in await self.services.channel_message.offset(channel_uid, offset):
|
||||
print(message, flush=True)
|
||||
extended_dict = await self.services.channel_message.to_extended_dict(message)
|
||||
messages.append(extended_dict)
|
||||
return messages
|
||||
@ -162,9 +160,9 @@ class RPCView(BaseView):
|
||||
ws = web.WebSocketResponse()
|
||||
await ws.prepare(self.request)
|
||||
if self.request.session.get("logged_in"):
|
||||
await self.services.socket.add(ws)
|
||||
await self.services.socket.add(ws, self.request.session.get("uid"))
|
||||
async for subscription in self.services.channel_member.find(user_uid=self.request.session.get("uid"), deleted_at=None, is_banned=False):
|
||||
await self.services.socket.subscribe(ws, subscription["channel_uid"])
|
||||
await self.services.socket.subscribe(ws, subscription["channel_uid"], self.request.session.get("uid"))
|
||||
rpc = RPCView.RPCApi(self, ws)
|
||||
async for msg in ws:
|
||||
if msg.type == web.WSMsgType.TEXT:
|
||||
|
@ -41,7 +41,6 @@ class SearchUserView(BaseFormView):
|
||||
query = self.request.query.get("query")
|
||||
if query:
|
||||
users = await self.app.services.user.search(query)
|
||||
print(users, flush=True)
|
||||
|
||||
if self.request.path.endswith(".json"):
|
||||
return await super().get()
|
||||
@ -50,6 +49,5 @@ class SearchUserView(BaseFormView):
|
||||
|
||||
async def submit(self, form):
|
||||
if await form.is_valid:
|
||||
print("YES\n")
|
||||
return {"redirect_url": "/search-user.html?query=" + form['username']}
|
||||
return {"is_valid": False}
|
@ -23,8 +23,6 @@ class UploadView(BaseView):
|
||||
async def get(self):
|
||||
uid = self.request.match_info.get("uid")
|
||||
drive_item = await self.services.drive_item.get(uid)
|
||||
|
||||
print(await drive_item.to_json(), flush=True)
|
||||
return web.FileResponse(drive_item["path"])
|
||||
|
||||
async def post(self):
|
||||
@ -37,7 +35,6 @@ class UploadView(BaseView):
|
||||
|
||||
drive = await self.services.drive.get_or_create(user_uid=self.request.session.get("uid"))
|
||||
|
||||
print(str(drive), flush=True)
|
||||
extension_types = {
|
||||
".jpg": "image",
|
||||
".gif": "image",
|
||||
@ -84,6 +81,5 @@ class UploadView(BaseView):
|
||||
await self.services.chat.send(
|
||||
self.request.session.get("uid"), channel_uid, response
|
||||
)
|
||||
print(drive_item, flush=True)
|
||||
|
||||
return web.json_response({"message": "Files uploaded successfully", "files": [str(file) for file in files], "channel_uid": channel_uid})
|
||||
|
Loading…
Reference in New Issue
Block a user