Back to project.

Raw source file available here .

# Python IMAP/SMTP usage

Two examples of how to use the builtin IMAP/SMTP libraries.

These examples require other source code i've written to work.

It requires:

  • AsyncioPath
  • env.py (for converting base64 encoded environment variable to a string containing a [secret] (password))

Reading messages

import os
import base64
import env

if not env.secret:
raise Exception("Environment issue.")

import imaplib
import email
from email.header import decode_header

imap_server = "mail.molodetz.nl"
email_user = "retoor@molodetz.nl"
email_password = env.secret

try:
mail = imaplib.IMAP4_SSL(imap_server)
mail.login(email_user, email_password)
mail.select("inbox")

while True:
status, messages = mail.search(None, "ALL")
email_ids = messages[0].split()

for email_id in email_ids[-1:]:
status, msg_data = mail.fetch(email_id, "(RFC822)")

for response_part in msg_data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])
subject, encoding = decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding or "utf-8")
print(f"Subject: {subject}")

from_ = msg.get("From")
print(f"From: {from_}")

if from_ == "retoor@molodetz.nl":
mail.store(email_id, '+FLAGS', '\\Deleted')
mail.expunge()

if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
body = part.get_payload(decode=True).decode()
print(f"Body: {body}")
else:
body = msg.get_payload(decode=True).decode()
print(f"Body: {body}")

mail.logout()

except Exception as e:
print(f"Error: {e}")

Sending email (requires my AsyncioPath made in other gist):

from env import secret 
import asyncio

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
server = None

logged_in = False

class SMTP(AsyncioPatch):

def __init__(self, smtp_server, port, sender_email, password):
self.smtp_server = smtp_server
self.port = port
self.sender_email = sender_email
self.password = password
self.server = None

def connect(self):
self.server = smtplib.SMTP(self.smtp_server, self.port)
print("Connected to:",self.smtp_server)
self.server.starttls()
self.server.login(self.sender_email, self.password)
print("Logged in as:", self.sender_email)

def close(self):
self.server.quit()
self.server = None
print("Server disconnected")


def __enter__(self):
self.connect()
return self

def __exit__(self, *args, **kwargs):
self.close()

def send_mail(self, recipient_email, subject, body):
msg = MIMEMultipart()
msg["From"] = self.sender_email
msg["To"] = recipient_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))

try:
self.server.sendmail(self.sender_email, recipient_email, msg.as_string())
print("Email sent successfully!")

except Exception as e:
print(f"Error: {e}")

finally:
pass


# Email configuration
server = SMTP(smtp_server = "mail.molodetz.nl", # Replace with your SMTP server
port = 587, # Usually 587 for TLS, 465 for SSL
sender_email = "retoor@molodetz.nl",
password = secret)

with server:
for x in range(10):
server.send_mail(
recipient_email = "retoor@molodetz.nl",
subject="Email nr {}".format(x),
body="Email body nr{}".format(x)
)

async def test_async():
async with server:
for x in range(10):
await server.asend_mail(
recipient_email = "retoor@molodetz.nl",
subject="Email nr {}".format(x),
body="Email body nr{}".format(x)
)

asyncio.run(test_async())