Raw source file available here .
# Written by retoor@molodetz.nl
# This script serves as a bot service that connects to a WebSocket server using `aiohttp` and `asyncio`.
# It logs in, retrieves channel information, processes incoming messages, and responds based on specified conditions. The bot can execute commands received from a specific user and respond to certain ping messages.
# Used external imports:
# - aiohttp: A library for handling asynchronous HTTP and WebSocket connections.
# - asyncio: A library for writing single-threaded concurrent code using coroutines.
# - agent from agent: Module to get an AI agent for communication.
# - RPC from rpc: Module to interact with the server using Remote Procedure Call method.
# MIT License
import asyncio
import logging
import traceback
import aiohttp
from snekbot.rpc import RPC
logger = logging.getLogger("snekbot")
class Bot:
def __init__(self, username, password, url="wss://snek.molodetz.nl/rpc.ws"):
self.url = url
self.username = username
self.password = password
self.user = None
self._channels = None
self.rpc = None
self.ws = None
self.joined = set()
async def on_init(self):
logger.debug("Bot initialized.")
async def on_join(self, channel_uid):
self.joined.add(channel_uid)
logger.debug("Joined channel: " + channel_uid)
async def on_leave(self, channel_uid):
self.joined.remove(channel_uid)
logger.debug("Left channel: " + channel_uid)
async def on_mention(self, username, user_nick, channel_uid, message):
logger.debug("Received mention from " + username + ": " + message)
async def on_idle(self):
logger.debug("Bot is idle.")
async def on_ping(self, username, user_nick, channel_uid, message):
logger.debug("Received ping from " + username + ": " + message)
async def on_own_message(self, channel_uid, message):
logger.debug("Received own message: " + message)
async def on_message(self, username, user_nick, channel_uid, message):
logger.debug("Received message from " + username + ": " + message)
async def run(self, reconnect=True):
while True:
try:
await self.run_once()
except:
traceback.print_exc()
await asyncio.sleep(1)
if not reconnect:
break
def has_joined(self, channel_uid):
return channel_uid in self.joined
async def send_message(self, channel_uid, message):
await self.rpc.send_message(channel_uid, message)
return True
async def get_channel(self, channel_uid=None, refresh=False):
for channel in await self.get_channels(refresh):
if channel["uid"] == channel_uid:
return channel
if not refresh:
return await self.get_channel(channel_uid, True)
async def get_channels(self, refresh=False):
if refresh or not self._channels:
self._channels = await self.rpc.get_channels()
return self._channels
async def run_once(self):
async with aiohttp.ClientSession() as session:
async with session.ws_connect(self.url) as ws:
is_initial = not self.ws
self.ws = ws
rpc = RPC(self.ws)
self.rpc = rpc
await rpc.login(self.username, self.password)
self.user = await rpc.get_user(None)
logger.debug("Logged in as: " + self.user["username"])
if is_initial:
await self.on_init()
for channel in await self.get_channels():
logger.debug("Found channel: " + channel["name"])
while True:
await self.on_idle()
message = None
while True:
data = await rpc.receive()
if not data:
return
event = "?"
try:
event = data.event
except AttributeError:
pass
try:
message = data.message.strip()
event = "message"
except AttributeError:
pass
if event == "?":
continue
elif event == "message":
if not data.is_final:
continue
break
try:
await getattr(self, "on_" + data.event)(**data.data)
except AttributeError:
logger.debug("Not implemented event: " + event)
if data.username == self.user["username"]:
await self.on_own_message(data.channel_uid, message)
elif message.startswith("ping"):
await self.on_ping(
data.username,
data.user_nick,
data.channel_uid,
data.message.lstrip("ping ").strip(),
)
elif any(
[
"@" + self.user["nick"] + " join" in data.message,
"@" + self.user["username"] + " join" in data.message,
]
):
await self.on_join(data.channel_uid)
elif any(
[
"@" + self.user["nick"] + " leave" in data.message,
"@" + self.user["username"] + " leave" in data.message,
]
):
await self.on_leave(data.channel_uid)
elif (
"@" + self.user["nick"] in data.message
or "@" + self.user["username"] in data.message
):
await self.on_mention(
data.username,
data.user_nick,
data.channel_uid,
data.message,
)
else:
await self.on_message(
data.username,
data.user_nick,
data.channel_uid,
data.message,
)