Python IMAP/SMTP usage

Two examples of how to use the builtin IMAP/SMTP libraries.

These examples require other source code i've written to work.

It requires:

  • AsyncioPath
  • env.py (for converting base64 encoded environment variable to a string containing a [secret] (password))

Reading messages

import os
import base64
import env

if not env.secret:
    raise Exception("Environment issue.")

import imaplib
import email
from email.header import decode_header

imap_server = "mail.molodetz.nl" 
email_user = "retoor@molodetz.nl"
email_password = env.secret

try:
    mail = imaplib.IMAP4_SSL(imap_server)
    mail.login(email_user, email_password)
    mail.select("inbox")

    while True:
        status, messages = mail.search(None, "ALL")
        email_ids = messages[0].split()

        for email_id in email_ids[-1:]:
            status, msg_data = mail.fetch(email_id, "(RFC822)")

            for response_part in msg_data:
                if isinstance(response_part, tuple):
                    msg = email.message_from_bytes(response_part[1])
                    subject, encoding = decode_header(msg["Subject"])[0]
                    if isinstance(subject, bytes):
                        subject = subject.decode(encoding or "utf-8")
                    print(f"Subject: {subject}")

                    from_ = msg.get("From")
                    print(f"From: {from_}")
                    
                    if from_ == "retoor@molodetz.nl":
                        mail.store(email_id, '+FLAGS', '\\Deleted')
                        mail.expunge()

                    if msg.is_multipart():
                        for part in msg.walk():
                            content_type = part.get_content_type()
                            content_disposition = str(part.get("Content-Disposition"))
                            if content_type == "text/plain" and "attachment" not in content_disposition:
                                body = part.get_payload(decode=True).decode()
                                print(f"Body: {body}")
                    else:
                        body = msg.get_payload(decode=True).decode()
                        print(f"Body: {body}")

    mail.logout()

except Exception as e:
    print(f"Error: {e}")

Sending email (requires my AsyncioPath made in other gist):

from env import secret 
import asyncio 

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
server = None

logged_in = False

class SMTP(AsyncioPatch):

    def __init__(self, smtp_server, port, sender_email, password):
        self.smtp_server = smtp_server 
        self.port = port
        self.sender_email = sender_email
        self.password = password
        self.server = None

    def connect(self):
        self.server = smtplib.SMTP(self.smtp_server, self.port)
        print("Connected to:",self.smtp_server)
        self.server.starttls()
        self.server.login(self.sender_email, self.password)  
        print("Logged in as:", self.sender_email)
    
    def close(self):
        self.server.quit()  
        self.server = None
        print("Server disconnected")


    def __enter__(self):
        self.connect()
        return self 

    def __exit__(self, *args, **kwargs):
        self.close()

    def send_mail(self, recipient_email, subject, body):
        msg = MIMEMultipart()
        msg["From"] = self.sender_email
        msg["To"] = recipient_email
        msg["Subject"] = subject
        msg.attach(MIMEText(body, "plain"))

        try:
            self.server.sendmail(self.sender_email, recipient_email, msg.as_string())
            print("Email sent successfully!")

        except Exception as e:
            print(f"Error: {e}")

        finally:
            pass


# Email configuration
server = SMTP(smtp_server = "mail.molodetz.nl",  # Replace with your SMTP server
        port = 587,                        # Usually 587 for TLS, 465 for SSL
        sender_email = "retoor@molodetz.nl",
        password = secret)

with server:
    for x in range(10):
        server.send_mail(
            recipient_email = "retoor@molodetz.nl",
            subject="Email nr {}".format(x),
            body="Email body nr{}".format(x)
        )

async def test_async():
    async with server:
        for x in range(10):
            await server.asend_mail(
                recipient_email = "retoor@molodetz.nl",
                subject="Email nr {}".format(x),
                body="Email body nr{}".format(x)
            )

asyncio.run(test_async())