2024-12-04 20:29:39 +00:00
|
|
|
# Python IMAP/SMTP usage
|
2024-12-04 20:26:01 +00:00
|
|
|
|
2024-12-04 20:29:39 +00:00
|
|
|
Two examples of how to use the builtin IMAP/SMTP libraries.
|
|
|
|
|
|
|
|
These examples require other source code i've written to work.
|
|
|
|
|
|
|
|
It requires:
|
|
|
|
- AsyncioPath
|
|
|
|
- env.py (for converting base64 encoded environment variable to a string containing a [secret] (password))
|
2024-12-04 20:26:01 +00:00
|
|
|
|
|
|
|
## Reading messages
|
|
|
|
|
|
|
|
```
|
|
|
|
import os
|
|
|
|
import base64
|
|
|
|
import env
|
|
|
|
|
|
|
|
if not env.secret:
|
|
|
|
raise Exception("Environment issue.")
|
|
|
|
|
|
|
|
import imaplib
|
|
|
|
import email
|
|
|
|
from email.header import decode_header
|
|
|
|
|
|
|
|
imap_server = "mail.molodetz.nl"
|
|
|
|
email_user = "retoor@molodetz.nl"
|
|
|
|
email_password = env.secret
|
|
|
|
|
|
|
|
try:
|
|
|
|
mail = imaplib.IMAP4_SSL(imap_server)
|
|
|
|
mail.login(email_user, email_password)
|
|
|
|
mail.select("inbox")
|
|
|
|
|
|
|
|
while True:
|
|
|
|
status, messages = mail.search(None, "ALL")
|
|
|
|
email_ids = messages[0].split()
|
|
|
|
|
|
|
|
for email_id in email_ids[-1:]:
|
|
|
|
status, msg_data = mail.fetch(email_id, "(RFC822)")
|
|
|
|
|
|
|
|
for response_part in msg_data:
|
|
|
|
if isinstance(response_part, tuple):
|
|
|
|
msg = email.message_from_bytes(response_part[1])
|
|
|
|
subject, encoding = decode_header(msg["Subject"])[0]
|
|
|
|
if isinstance(subject, bytes):
|
|
|
|
subject = subject.decode(encoding or "utf-8")
|
|
|
|
print(f"Subject: {subject}")
|
|
|
|
|
|
|
|
from_ = msg.get("From")
|
|
|
|
print(f"From: {from_}")
|
|
|
|
|
|
|
|
if from_ == "retoor@molodetz.nl":
|
|
|
|
mail.store(email_id, '+FLAGS', '\\Deleted')
|
|
|
|
mail.expunge()
|
|
|
|
|
|
|
|
if msg.is_multipart():
|
|
|
|
for part in msg.walk():
|
|
|
|
content_type = part.get_content_type()
|
|
|
|
content_disposition = str(part.get("Content-Disposition"))
|
|
|
|
if content_type == "text/plain" and "attachment" not in content_disposition:
|
|
|
|
body = part.get_payload(decode=True).decode()
|
|
|
|
print(f"Body: {body}")
|
|
|
|
else:
|
|
|
|
body = msg.get_payload(decode=True).decode()
|
|
|
|
print(f"Body: {body}")
|
|
|
|
|
|
|
|
mail.logout()
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
print(f"Error: {e}")
|
|
|
|
|
|
|
|
```
|
|
|
|
|
2024-12-04 20:26:26 +00:00
|
|
|
## Sending email (requires my AsyncioPath made in other gist):
|
2024-12-04 20:26:01 +00:00
|
|
|
|
|
|
|
```
|
2024-12-04 20:27:22 +00:00
|
|
|
from env import secret
|
|
|
|
import asyncio
|
|
|
|
|
|
|
|
import smtplib
|
|
|
|
from email.mime.text import MIMEText
|
|
|
|
from email.mime.multipart import MIMEMultipart
|
|
|
|
server = None
|
|
|
|
|
|
|
|
logged_in = False
|
|
|
|
|
2024-12-04 20:26:01 +00:00
|
|
|
class SMTP(AsyncioPatch):
|
|
|
|
|
|
|
|
def __init__(self, smtp_server, port, sender_email, password):
|
|
|
|
self.smtp_server = smtp_server
|
|
|
|
self.port = port
|
|
|
|
self.sender_email = sender_email
|
|
|
|
self.password = password
|
|
|
|
self.server = None
|
|
|
|
|
|
|
|
def connect(self):
|
|
|
|
self.server = smtplib.SMTP(self.smtp_server, self.port)
|
|
|
|
print("Connected to:",self.smtp_server)
|
|
|
|
self.server.starttls()
|
|
|
|
self.server.login(self.sender_email, self.password)
|
|
|
|
print("Logged in as:", self.sender_email)
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
self.server.quit()
|
|
|
|
self.server = None
|
|
|
|
print("Server disconnected")
|
|
|
|
|
|
|
|
|
|
|
|
def __enter__(self):
|
|
|
|
self.connect()
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __exit__(self, *args, **kwargs):
|
|
|
|
self.close()
|
|
|
|
|
|
|
|
def send_mail(self, recipient_email, subject, body):
|
|
|
|
msg = MIMEMultipart()
|
|
|
|
msg["From"] = self.sender_email
|
|
|
|
msg["To"] = recipient_email
|
|
|
|
msg["Subject"] = subject
|
|
|
|
msg.attach(MIMEText(body, "plain"))
|
|
|
|
|
|
|
|
try:
|
|
|
|
self.server.sendmail(self.sender_email, recipient_email, msg.as_string())
|
|
|
|
print("Email sent successfully!")
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
print(f"Error: {e}")
|
|
|
|
|
|
|
|
finally:
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
# Email configuration
|
|
|
|
server = SMTP(smtp_server = "mail.molodetz.nl", # Replace with your SMTP server
|
|
|
|
port = 587, # Usually 587 for TLS, 465 for SSL
|
|
|
|
sender_email = "retoor@molodetz.nl",
|
|
|
|
password = secret)
|
|
|
|
|
|
|
|
with server:
|
|
|
|
for x in range(10):
|
|
|
|
server.send_mail(
|
|
|
|
recipient_email = "retoor@molodetz.nl",
|
|
|
|
subject="Email nr {}".format(x),
|
|
|
|
body="Email body nr{}".format(x)
|
|
|
|
)
|
|
|
|
|
|
|
|
async def test_async():
|
|
|
|
async with server:
|
|
|
|
for x in range(10):
|
|
|
|
await server.asend_mail(
|
|
|
|
recipient_email = "retoor@molodetz.nl",
|
|
|
|
subject="Email nr {}".format(x),
|
|
|
|
body="Email body nr{}".format(x)
|
|
|
|
)
|
|
|
|
|
|
|
|
asyncio.run(test_async())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```
|