Files
imapdown/imapdown.py
T

400 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""Simple IMAP email downloader - downloads all emails to EML files."""
import argparse
import email
import email.utils
import imaplib
import io
import json
import os
import re
import sys
import zipfile
from datetime import datetime
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Download all emails from an IMAP server to EML files"
)
parser.add_argument("--server", required=True, help="IMAP server hostname")
parser.add_argument("--email", required=True, help="Email address")
parser.add_argument("--user", required=True, help="Username for authentication")
parser.add_argument("--password", required=True, help="Password for authentication")
security = parser.add_mutually_exclusive_group()
security.add_argument("--ssl", action="store_true", help="Use implicit SSL/TLS (default port 993)")
security.add_argument("--starttls", action="store_true", help="Use STARTTLS (default port 143)")
parser.add_argument("--port", type=int, help="Custom port (default: 993 for SSL, 143 otherwise)")
parser.add_argument("--limit", type=int, help="Limit number of emails to download (for debugging)")
parser.add_argument("--full", action="store_true", help="Download all emails (default: only new emails since last run)")
parser.add_argument("--store", type=str, help="Directory to store downloaded emails (default: ./download)")
return parser.parse_args()
def decode_modified_utf7(s):
"""Decode IMAP modified UTF-7 folder names."""
result = []
i = 0
while i < len(s):
if s[i] == '&':
if i + 1 < len(s) and s[i + 1] == '-':
result.append('&')
i += 2
else:
end = s.find('-', i + 1)
if end == -1:
result.append(s[i:])
break
encoded = s[i + 1:end]
if encoded:
encoded = encoded.replace(',', '/')
padding = (4 - len(encoded) % 4) % 4
encoded += '=' * padding
try:
import base64
decoded = base64.b64decode(encoded).decode('utf-16-be')
result.append(decoded)
except Exception:
result.append(s[i:end + 1])
i = end + 1
else:
result.append(s[i])
i += 1
return ''.join(result)
def parse_folder_list(response):
"""Parse IMAP LIST response to extract folder names."""
folders = []
pattern = re.compile(r'\((?P<flags>.*?)\) "(?P<delimiter>.*)" (?P<name>.*)')
for item in response:
if isinstance(item, bytes):
item = item.decode('utf-8', errors='replace')
match = pattern.match(item)
if match:
name = match.group('name')
if name.startswith('"') and name.endswith('"'):
name = name[1:-1]
name = decode_modified_utf7(name)
folders.append(name)
return folders
def sanitize_filename(name, max_length=50):
"""Sanitize a string for use as a filename."""
if not name:
return "untitled"
name = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', name)
name = name.strip('. ')
name = name[:max_length]
name = name.strip('. ')
return name or "untitled"
def sanitize_folder_path(folder_name):
"""Sanitize folder path for filesystem use."""
parts = folder_name.replace('/', os.sep).replace('.', os.sep).split(os.sep)
sanitized = [sanitize_filename(p, max_length=100) for p in parts if p]
return os.path.join(*sanitized) if sanitized else "INBOX"
def get_message_date(msg):
"""Extract date from email message."""
date_str = msg.get('Date')
if date_str:
try:
parsed = email.utils.parsedate_to_datetime(date_str)
return parsed.strftime('%Y%m%d_%H%M%S')
except Exception:
pass
return datetime.now().strftime('%Y%m%d_%H%M%S')
def get_message_subject(msg):
"""Extract and decode subject from email message."""
subject = msg.get('Subject', '')
if not subject:
return 'no_subject'
try:
decoded_parts = email.header.decode_header(subject)
decoded = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
charset = charset or 'utf-8'
try:
decoded.append(part.decode(charset, errors='replace'))
except Exception:
decoded.append(part.decode('utf-8', errors='replace'))
else:
decoded.append(part)
return ''.join(decoded)
except Exception:
return str(subject)
def extract_attachments(msg, eml_filepath):
"""Extract attachments from email and save as zip file."""
attachments = []
for part in msg.walk():
content_disposition = part.get('Content-Disposition', '')
if 'attachment' in content_disposition or 'inline' in content_disposition:
filename = part.get_filename()
if filename:
try:
decoded_parts = email.header.decode_header(filename)
decoded_filename = []
for data, charset in decoded_parts:
if isinstance(data, bytes):
charset = charset or 'utf-8'
decoded_filename.append(data.decode(charset, errors='replace'))
else:
decoded_filename.append(data)
filename = ''.join(decoded_filename)
except Exception:
pass
payload = part.get_payload(decode=True)
if payload:
attachments.append((sanitize_filename(filename, max_length=100), payload))
if attachments:
zip_path = os.path.splitext(eml_filepath)[0] + '.zip'
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
seen_names = {}
for filename, data in attachments:
if filename in seen_names:
seen_names[filename] += 1
name, ext = os.path.splitext(filename)
filename = f"{name}_{seen_names[filename]}{ext}"
else:
seen_names[filename] = 0
zf.writestr(filename, data)
return len(attachments)
return 0
STATE_FILE = '.imapdown_state.json'
def load_state(base_dir):
"""Load the state file tracking last downloaded emails."""
state_path = os.path.join(base_dir, STATE_FILE)
if os.path.exists(state_path):
try:
with open(state_path, 'r') as f:
return json.load(f)
except Exception:
pass
return {}
def save_state(base_dir, state):
"""Save the state file."""
state_path = os.path.join(base_dir, STATE_FILE)
with open(state_path, 'w') as f:
json.dump(state, f, indent=2)
def connect_imap(server, port, use_ssl, use_starttls):
"""Connect to IMAP server with appropriate security."""
if use_ssl:
port = port or 993
print(f"Connecting to {server}:{port} with SSL...")
return imaplib.IMAP4_SSL(server, port)
else:
port = port or 143
print(f"Connecting to {server}:{port}...")
conn = imaplib.IMAP4(server, port)
if use_starttls:
print("Upgrading to TLS with STARTTLS...")
conn.starttls()
return conn
def download_folder(conn, folder_name, base_dir, limit=None, total_so_far=0, update_mode=False, last_uid=None):
"""Download all emails from a folder. Returns (downloaded_count, highest_uid)."""
local_path = os.path.join(base_dir, sanitize_folder_path(folder_name))
os.makedirs(local_path, exist_ok=True)
try:
status, _ = conn.select(f'"{folder_name}"', readonly=True)
if status != 'OK':
print(f" Could not select folder: {folder_name}")
return 0, last_uid
except Exception as e:
print(f" Error selecting folder {folder_name}: {e}")
return 0, last_uid
if update_mode and last_uid is not None:
status, data = conn.uid('SEARCH', None, f'UID {last_uid + 1}:*')
else:
status, data = conn.uid('SEARCH', None, 'ALL')
if status != 'OK':
print(f" Could not search folder: {folder_name}")
return 0, last_uid
uid_list = data[0].split()
# Filter out UIDs <= last_uid (some servers return highest UID even when searching for higher)
if update_mode and last_uid is not None:
uid_list = [uid for uid in uid_list if int(uid) > last_uid]
if not uid_list:
print(f" {folder_name}: no new messages")
return 0, last_uid
if limit is not None:
remaining = limit - total_so_far
if remaining <= 0:
return 0, last_uid
uid_list = uid_list[:remaining]
print(f" {folder_name}: {len(uid_list)} messages to download")
downloaded = 0
highest_uid = last_uid
for uid in uid_list:
try:
uid_int = int(uid)
status, data = conn.uid('FETCH', uid, '(RFC822)')
if status != 'OK':
continue
raw_email = None
for part in data:
if isinstance(part, tuple):
raw_email = part[1]
break
if raw_email is None:
continue
msg = email.message_from_bytes(raw_email)
date_str = get_message_date(msg)
subject = sanitize_filename(get_message_subject(msg))
filename = f"{uid_int}_{date_str}_{subject}.eml"
filepath = os.path.join(local_path, filename)
counter = 1
base_filepath = filepath
while os.path.exists(filepath):
name, ext = os.path.splitext(base_filepath)
filepath = f"{name}_{counter}{ext}"
counter += 1
with open(filepath, 'wb') as f:
f.write(raw_email)
extract_attachments(msg, filepath)
downloaded += 1
if highest_uid is None or uid_int > highest_uid:
highest_uid = uid_int
except Exception as e:
print(f" Error downloading UID {uid}: {e}")
return downloaded, highest_uid
def main():
args = parse_args()
email_folder = sanitize_filename(args.email, max_length=100)
if args.store:
base_dir = os.path.join(args.store, email_folder)
else:
base_dir = os.path.join(os.getcwd(), 'download', email_folder)
os.makedirs(base_dir, exist_ok=True)
if args.full:
has_emails = False
for root, dirs, files in os.walk(base_dir):
if any(f.endswith('.eml') for f in files):
has_emails = True
break
if has_emails:
print(f"Error: --full specified but {base_dir} already contains emails.", file=sys.stderr)
print("Delete the folder first to do a full re-download, or run without --full for incremental update.", file=sys.stderr)
sys.exit(1)
try:
conn = connect_imap(args.server, args.port, args.ssl, args.starttls)
except Exception as e:
print(f"Connection failed: {e}", file=sys.stderr)
sys.exit(1)
try:
status, _ = conn.login(args.user, args.password)
if status != 'OK':
print("Authentication failed", file=sys.stderr)
sys.exit(1)
print("Logged in successfully")
except Exception as e:
print(f"Authentication failed: {e}", file=sys.stderr)
sys.exit(1)
try:
status, folder_data = conn.list()
if status != 'OK':
print("Could not list folders", file=sys.stderr)
sys.exit(1)
folders = parse_folder_list(folder_data)
print(f"Found {len(folders)} folders")
update_mode = not args.full
state = load_state(base_dir) if update_mode else {}
if args.full:
print("Full download mode: downloading all emails")
else:
print("Incremental mode: only downloading new emails (use --full to download all)")
total_downloaded = 0
for folder in folders:
last_uid = None
if update_mode and folder in state:
try:
last_uid = int(state[folder])
except (ValueError, TypeError):
pass
downloaded, highest_uid = download_folder(
conn, folder, base_dir, args.limit, total_downloaded,
update_mode=update_mode, last_uid=last_uid
)
total_downloaded += downloaded
if highest_uid is not None:
state[folder] = highest_uid
if args.limit and total_downloaded >= args.limit:
print(f" Reached limit of {args.limit} emails")
break
save_state(base_dir, state)
print(f"\nDownloaded {total_downloaded} emails to {base_dir}")
finally:
try:
conn.logout()
except Exception:
pass
if __name__ == '__main__':
main()