#!/usr/bin/env python3 """Simple IMAP email downloader - downloads all emails to EML files.""" import argparse import email import email.utils import imaplib import io import json import os import re import sys import zipfile from datetime import datetime def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Download all emails from an IMAP server to EML files" ) parser.add_argument("--server", required=True, help="IMAP server hostname") parser.add_argument("--email", required=True, help="Email address") parser.add_argument("--user", required=True, help="Username for authentication") parser.add_argument("--password", required=True, help="Password for authentication") security = parser.add_mutually_exclusive_group() security.add_argument("--ssl", action="store_true", help="Use implicit SSL/TLS (default port 993)") security.add_argument("--starttls", action="store_true", help="Use STARTTLS (default port 143)") parser.add_argument("--port", type=int, help="Custom port (default: 993 for SSL, 143 otherwise)") parser.add_argument("--limit", type=int, help="Limit number of emails to download (for debugging)") parser.add_argument("--full", action="store_true", help="Download all emails (default: only new emails since last run)") return parser.parse_args() def decode_modified_utf7(s): """Decode IMAP modified UTF-7 folder names.""" result = [] i = 0 while i < len(s): if s[i] == '&': if i + 1 < len(s) and s[i + 1] == '-': result.append('&') i += 2 else: end = s.find('-', i + 1) if end == -1: result.append(s[i:]) break encoded = s[i + 1:end] if encoded: encoded = encoded.replace(',', '/') padding = (4 - len(encoded) % 4) % 4 encoded += '=' * padding try: import base64 decoded = base64.b64decode(encoded).decode('utf-16-be') result.append(decoded) except Exception: result.append(s[i:end + 1]) i = end + 1 else: result.append(s[i]) i += 1 return ''.join(result) def parse_folder_list(response): """Parse IMAP LIST response to extract folder names.""" folders = [] pattern = re.compile(r'\((?P.*?)\) "(?P.*)" (?P.*)') for item in response: if isinstance(item, bytes): item = item.decode('utf-8', errors='replace') match = pattern.match(item) if match: name = match.group('name') if name.startswith('"') and name.endswith('"'): name = name[1:-1] name = decode_modified_utf7(name) folders.append(name) return folders def sanitize_filename(name, max_length=50): """Sanitize a string for use as a filename.""" if not name: return "untitled" name = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', name) name = name.strip('. ') name = name[:max_length] name = name.strip('. ') return name or "untitled" def sanitize_folder_path(folder_name): """Sanitize folder path for filesystem use.""" parts = folder_name.replace('/', os.sep).replace('.', os.sep).split(os.sep) sanitized = [sanitize_filename(p, max_length=100) for p in parts if p] return os.path.join(*sanitized) if sanitized else "INBOX" def get_message_date(msg): """Extract date from email message.""" date_str = msg.get('Date') if date_str: try: parsed = email.utils.parsedate_to_datetime(date_str) return parsed.strftime('%Y%m%d_%H%M%S') except Exception: pass return datetime.now().strftime('%Y%m%d_%H%M%S') def get_message_subject(msg): """Extract and decode subject from email message.""" subject = msg.get('Subject', '') if not subject: return 'no_subject' try: decoded_parts = email.header.decode_header(subject) decoded = [] for part, charset in decoded_parts: if isinstance(part, bytes): charset = charset or 'utf-8' try: decoded.append(part.decode(charset, errors='replace')) except Exception: decoded.append(part.decode('utf-8', errors='replace')) else: decoded.append(part) return ''.join(decoded) except Exception: return str(subject) def extract_attachments(msg, eml_filepath): """Extract attachments from email and save as zip file.""" attachments = [] for part in msg.walk(): content_disposition = part.get('Content-Disposition', '') if 'attachment' in content_disposition or 'inline' in content_disposition: filename = part.get_filename() if filename: try: decoded_parts = email.header.decode_header(filename) decoded_filename = [] for data, charset in decoded_parts: if isinstance(data, bytes): charset = charset or 'utf-8' decoded_filename.append(data.decode(charset, errors='replace')) else: decoded_filename.append(data) filename = ''.join(decoded_filename) except Exception: pass payload = part.get_payload(decode=True) if payload: attachments.append((sanitize_filename(filename, max_length=100), payload)) if attachments: zip_path = os.path.splitext(eml_filepath)[0] + '.zip' with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: seen_names = {} for filename, data in attachments: if filename in seen_names: seen_names[filename] += 1 name, ext = os.path.splitext(filename) filename = f"{name}_{seen_names[filename]}{ext}" else: seen_names[filename] = 0 zf.writestr(filename, data) return len(attachments) return 0 STATE_FILE = '.imapdown_state.json' def load_state(base_dir): """Load the state file tracking last downloaded emails.""" state_path = os.path.join(base_dir, STATE_FILE) if os.path.exists(state_path): try: with open(state_path, 'r') as f: return json.load(f) except Exception: pass return {} def save_state(base_dir, state): """Save the state file.""" state_path = os.path.join(base_dir, STATE_FILE) with open(state_path, 'w') as f: json.dump(state, f, indent=2) def connect_imap(server, port, use_ssl, use_starttls): """Connect to IMAP server with appropriate security.""" if use_ssl: port = port or 993 print(f"Connecting to {server}:{port} with SSL...") return imaplib.IMAP4_SSL(server, port) else: port = port or 143 print(f"Connecting to {server}:{port}...") conn = imaplib.IMAP4(server, port) if use_starttls: print("Upgrading to TLS with STARTTLS...") conn.starttls() return conn def download_folder(conn, folder_name, base_dir, limit=None, total_so_far=0, update_mode=False, last_uid=None): """Download all emails from a folder. Returns (downloaded_count, highest_uid).""" local_path = os.path.join(base_dir, sanitize_folder_path(folder_name)) os.makedirs(local_path, exist_ok=True) try: status, _ = conn.select(f'"{folder_name}"', readonly=True) if status != 'OK': print(f" Could not select folder: {folder_name}") return 0, last_uid except Exception as e: print(f" Error selecting folder {folder_name}: {e}") return 0, last_uid if update_mode and last_uid is not None: status, data = conn.uid('SEARCH', None, f'UID {last_uid + 1}:*') else: status, data = conn.uid('SEARCH', None, 'ALL') if status != 'OK': print(f" Could not search folder: {folder_name}") return 0, last_uid uid_list = data[0].split() # Filter out UIDs <= last_uid (some servers return highest UID even when searching for higher) if update_mode and last_uid is not None: uid_list = [uid for uid in uid_list if int(uid) > last_uid] if not uid_list: print(f" {folder_name}: no new messages") return 0, last_uid if limit is not None: remaining = limit - total_so_far if remaining <= 0: return 0, last_uid uid_list = uid_list[:remaining] print(f" {folder_name}: {len(uid_list)} messages to download") downloaded = 0 highest_uid = last_uid for uid in uid_list: try: uid_int = int(uid) status, data = conn.uid('FETCH', uid, '(RFC822)') if status != 'OK': continue raw_email = None for part in data: if isinstance(part, tuple): raw_email = part[1] break if raw_email is None: continue msg = email.message_from_bytes(raw_email) date_str = get_message_date(msg) subject = sanitize_filename(get_message_subject(msg)) filename = f"{uid_int}_{date_str}_{subject}.eml" filepath = os.path.join(local_path, filename) counter = 1 base_filepath = filepath while os.path.exists(filepath): name, ext = os.path.splitext(base_filepath) filepath = f"{name}_{counter}{ext}" counter += 1 with open(filepath, 'wb') as f: f.write(raw_email) extract_attachments(msg, filepath) downloaded += 1 if highest_uid is None or uid_int > highest_uid: highest_uid = uid_int except Exception as e: print(f" Error downloading UID {uid}: {e}") return downloaded, highest_uid def main(): args = parse_args() email_folder = sanitize_filename(args.email, max_length=100) base_dir = os.path.join(os.getcwd(), 'download', email_folder) os.makedirs(base_dir, exist_ok=True) if args.full: has_emails = False for root, dirs, files in os.walk(base_dir): if any(f.endswith('.eml') for f in files): has_emails = True break if has_emails: print(f"Error: --full specified but {base_dir} already contains emails.", file=sys.stderr) print("Delete the folder first to do a full re-download, or run without --full for incremental update.", file=sys.stderr) sys.exit(1) try: conn = connect_imap(args.server, args.port, args.ssl, args.starttls) except Exception as e: print(f"Connection failed: {e}", file=sys.stderr) sys.exit(1) try: status, _ = conn.login(args.user, args.password) if status != 'OK': print("Authentication failed", file=sys.stderr) sys.exit(1) print("Logged in successfully") except Exception as e: print(f"Authentication failed: {e}", file=sys.stderr) sys.exit(1) try: status, folder_data = conn.list() if status != 'OK': print("Could not list folders", file=sys.stderr) sys.exit(1) folders = parse_folder_list(folder_data) print(f"Found {len(folders)} folders") update_mode = not args.full state = load_state(base_dir) if update_mode else {} if args.full: print("Full download mode: downloading all emails") else: print("Incremental mode: only downloading new emails (use --full to download all)") total_downloaded = 0 for folder in folders: last_uid = None if update_mode and folder in state: try: last_uid = int(state[folder]) except (ValueError, TypeError): pass downloaded, highest_uid = download_folder( conn, folder, base_dir, args.limit, total_downloaded, update_mode=update_mode, last_uid=last_uid ) total_downloaded += downloaded if highest_uid is not None: state[folder] = highest_uid if args.limit and total_downloaded >= args.limit: print(f" Reached limit of {args.limit} emails") break save_state(base_dir, state) print(f"\nDownloaded {total_downloaded} emails to {base_dir}") finally: try: conn.logout() except Exception: pass if __name__ == '__main__': main()