Back to project.

Raw source file available here .

# Written by retoor@molodetz.nl

# This script serves as a bot service that connects to a WebSocket server using `aiohttp` and `asyncio`.
# It logs in, retrieves channel information, processes incoming messages, and responds based on specified conditions. The bot can execute commands received from a specific user and respond to certain ping messages.

# Used external imports:
# - aiohttp: A library for handling asynchronous HTTP and WebSocket connections.
# - asyncio: A library for writing single-threaded concurrent code using coroutines.
# - agent from agent: Module to get an AI agent for communication.
# - RPC from rpc: Module to interact with the server using Remote Procedure Call method.

# MIT License

import asyncio
import logging
import traceback

import aiohttp

from snekbot.rpc import RPC

logger = logging.getLogger("snekbot")


class Bot:

def __init__(self, username, password, url="wss://snek.molodetz.nl/rpc.ws"):
self.url = url
self.username = username
self.password = password
self.user = None
self._channels = None
self.rpc = None
self.ws = None
self.joined = set()

async def on_init(self):
logger.debug("Bot initialized.")

async def on_join(self, channel_uid):
self.joined.add(channel_uid)
logger.debug("Joined channel: " + channel_uid)

async def on_leave(self, channel_uid):
self.joined.remove(channel_uid)
logger.debug("Left channel: " + channel_uid)

async def on_mention(self, username, user_nick, channel_uid, message):
logger.debug("Received mention from " + username + ": " + message)

async def on_idle(self):
logger.debug("Bot is idle.")

async def on_ping(self, username, user_nick, channel_uid, message):
logger.debug("Received ping from " + username + ": " + message)

async def on_own_message(self, channel_uid, message):
logger.debug("Received own message: " + message)

async def on_message(self, username, user_nick, channel_uid, message):
logger.debug("Received message from " + username + ": " + message)

async def run(self, reconnect=True):

while True:

try:
await self.run_once()
except:
traceback.print_exc()
await asyncio.sleep(1)

if not reconnect:
break

def has_joined(self, channel_uid):
return channel_uid in self.joined

async def send_message(self, channel_uid, message):
await self.rpc.send_message(channel_uid, message)
return True

async def get_channel(self, channel_uid=None, refresh=False):
for channel in await self.get_channels(refresh):
if channel["uid"] == channel_uid:
return channel
if not refresh:
return await self.get_channel(channel_uid, True)

async def get_channels(self, refresh=False):
if refresh or not self._channels:
self._channels = await self.rpc.get_channels()
return self._channels

async def run_once(self):
async with aiohttp.ClientSession() as session:
async with session.ws_connect(self.url) as ws:
is_initial = not self.ws

self.ws = ws
rpc = RPC(self.ws)
self.rpc = rpc

await rpc.login(self.username, self.password)
self.user = await rpc.get_user(None)
logger.debug("Logged in as: " + self.user["username"])

if is_initial:
await self.on_init()

for channel in await self.get_channels():
logger.debug("Found channel: " + channel["name"])
while True:

await self.on_idle()

message = None

while True:
data = await rpc.receive()
if not data:
return

event = "?"
try:
event = data.event
except AttributeError:
pass

try:
message = data.message.strip()
event = "message"
except AttributeError:
pass

if event == "?":
continue
elif event == "message":
if not data.is_final:
continue
break

try:
await getattr(self, "on_" + data.event)(**data.data)
except AttributeError:
logger.debug("Not implemented event: " + event)

if data.username == self.user["username"]:
await self.on_own_message(data.channel_uid, message)
elif message.startswith("ping"):
await self.on_ping(
data.username,
data.user_nick,
data.channel_uid,
data.message.lstrip("ping ").strip(),
)
elif any(
[
"@" + self.user["nick"] + " join" in data.message,
"@" + self.user["username"] + " join" in data.message,
]
):
await self.on_join(data.channel_uid)
elif any(
[
"@" + self.user["nick"] + " leave" in data.message,
"@" + self.user["username"] + " leave" in data.message,
]
):
await self.on_leave(data.channel_uid)
elif (
"@" + self.user["nick"] in data.message
or "@" + self.user["username"] in data.message
):
await self.on_mention(
data.username,
data.user_nick,
data.channel_uid,
data.message,
)
else:
await self.on_message(
data.username,
data.user_nick,
data.channel_uid,
data.message,
)