#!/usr/bin/python import os import sys import yaml import getopt import imaplib2 from enum import Enum class imap_action(Enum): MAILBOX_LIST = 1 MAILBOX_LIST_SUB = 2 MAILBOX_STATS = 3 MAILBOX_CREATE = 4 MAILBOX_DELETE = 5 MAILBOX_SUBSCRIBE = 6 MAILBOX_UNSUBSCRIBE = 7 MAILBOX_RENAME = 8 MAILBOX_ARCHIVE = 9 class imap_tool(): host = "localhost" port = 143 user = "user" password = "password" archive_batch = 50 ssl = False action = imap_action.MAILBOX_LIST interactive = True usage = "Usage: " + os.path.basename(sys.argv[0]) + " [options] [actions]\n" \ "\n" \ "Options:\n" \ " -h [host]\n" \ " -u [user]\n" \ " -p [password]\n" \ " -i [port]\n" \ " -s (use ssl)\n" \ " -n (non-interactive mode)\n" \ "\n" \ "Actions:\n" \ " mailbox-list\n" \ " mailbox-list-sub\n" \ " mailbox-stats [mailbox]" \ " mailbox-create [mailbox]\n" \ " mailbox-delete [mailbox]\n" \ " mailbox-subscribe [mailbox]\n" \ " mailbox-unsubscribe [mailbox]\n" \ " mailbox-rename [mailbox] [mailbox]\n" \ " mailbox-archive [mailbox] [year]\n" # utils def print_message(self, message): if self.interactive: print(message) def error_decode(self, data): return data[0].decode() # imap-tool def action_execute(self): self.print_message("Connecting to " + self.host) if self.ssl: imap = imaplib2.IMAP4_SSL(host = self.host, port = self.port) else: imap = imaplib2.IMAP4(host = self.host, port = self.port) if "STARTTLS" not in imap.capabilities: self.print_message("Unable to create a secure TLS connection") return 1 imap.starttls() self.print_message("Authenticating with user " + self.user) imap.login(self.user, self.password) self.print_message("") try: if self.action == imap_action.MAILBOX_LIST: ret, data = imap.list() self.print_message("Mailbox list:") for entry in data: name = entry.decode().split(" ")[-1] if self.interactive: print("- " + name) else: print(name) elif self.action == imap_action.MAILBOX_LIST_SUB: ret, data = imap.lsub() self.print_message("Subscribed mailbox list:") for entry in data: name = entry.decode().split(" ")[-1] if self.interactive: print("- " + name) else: print(name) elif self.action == imap_action.MAILBOX_STATS: ret, data = imap.examine(self.mailbox) if ret != "OK": self.print_message("Error getting stats for mailbox " + self.mailbox) raise Exception() count = int(data[0]) self.print_message("Mailbox " + self.mailbox + " has " + str(count) + " entries") elif self.action == imap_action.MAILBOX_CREATE: ret, data = imap.create(self.mailbox) if ret != "OK": self.print_message("Error creating mailbox " + self.mailbox + ": " + self.error_decode(data)) raise Exception() self.print_message("Created mailbox " + self.mailbox) elif self.action == imap_action.MAILBOX_DELETE: ret, data = imap.delete(self.mailbox) if ret != "OK": self.print_message("Error deleting mailbox " + self.mailbox + ": " + self.error_decode(data)) raise Exception() self.print_message("Deleted mailbox " + self.mailbox) elif self.action == imap_action.MAILBOX_SUBSCRIBE: ret, data = imap.subscribe(self.mailbox) if ret != "OK": self.print_message("Error subscribing to mailbox " + self.mailbox + ": " + self.error_decode(data)) raise Exception() self.print_message("Subscribed to mailbox " + self.mailbox) elif self.action == imap_action.MAILBOX_UNSUBSCRIBE: ret, data = imap.unsubscribe(self.mailbox) if ret != "OK": self.print_message("Error unsubscribing from mailbox " + self.mailbox + ": " + self.error_decode(data)) raise Exception() self.print_message("Unsubscribed from mailbox " + self.mailbox) elif self.action == imap_action.MAILBOX_RENAME: ret, data = imap.rename(self.mailbox[0], self.mailbox[1]) if ret != "OK": self.print_message("Error renaming mailbox " + self.mailbox[0] + " to " + self.mailbox[1] + ": " + self.error_decode(data)) raise Exception() self.print_message("Renamed mailbox " + self.mailbox[0] + " to " + self.mailbox[1]) elif self.action == imap_action.MAILBOX_ARCHIVE: mailbox_archive = "archives/" + self.mailbox + "/" + str(self.archive_year) year_start = self.archive_year year_stop = self.archive_year + 1 ret, data = imap.select(self.mailbox) if ret != "OK": self.print_message("Error selecting mailbox " + self.mailbox + ": " + self.error_decode(data)) raise Exception() # Two ways to search: # - SINCE/BEFORE that use native dating (often fs-backed), a bit faster # - SENTSINCE/SENTBEFORE that use message dating, a bit slower ret, data = imap.uid("SEARCH", None, "(SENTSINCE \"01-Jan-" + str(year_start) + "\" SENTBEFORE \"01-Jan-" + str(year_stop) + "\")") if ret != "OK": self.print_message("Error searching mailbox " + self.mailbox + ": " + self.error_decode(data)) raise Exception() uids_search = data[0].decode("ascii").split() uids_search_count = len(uids_search) self.print_message("Found " + str(uids_search_count) + " messages from mailbox " + self.mailbox + " for year " + str(self.archive_year)) if uids_search_count == 0: raise Exception() ret, data = imap.create(mailbox_archive) if ret != "OK": if not self.error_decode(data).startswith("[ALREADYEXISTS]"): self.print_message("Error creating mailbox " + mailbox_archive + ": " + self.error_decode(data)) raise Exception() else: self.print_message("Created mailbox " + mailbox_archive) index = 0 uids = [] while index < uids_search_count: uid = uids_search[index] index += 1 uids.append(uid) if len(uids) == self.archive_batch or index == uids_search_count: ret, data = imap.uid("COPY", ",".join(uids), mailbox_archive) if ret != "OK": self.print_message("Error copying " + str(len(uids)) + " messages to mailbox " + mailbox_archive) raise Exception() ret, data = imap.uid("STORE", ",".join(uids), "+FLAGS", "\\Deleted") if ret != "OK": self.print_message("Error deleting " + str(len(uids)) + " messages from mailbox " + self.mailbox) raise Exception() imap.expunge() self.print_message("Archived " + str(len(uids)) + " messages to mailbox " + mailbox_archive) uids = [] except Exception: imap.logout() return 1 imap.logout() return 0 def main(self): options, arguments = getopt.getopt(sys.argv[1:], "h:u:p:i:sn") for key, value in options: if key == "-h": self.host = value elif key == "-u": self.user = value elif key == "-p": self.password = value elif key == "-i": self.port = int(value) elif key == "-s": self.ssl = True elif key == "-n": self.interactive = False if len(arguments) == 0: self.print_message(self.usage) return 1 action_option = arguments[0] if action_option == "mailbox-list": self.action = imap_action.MAILBOX_LIST elif action_option == "mailbox-list-sub": self.action = imap_action.MAILBOX_LIST_SUB elif action_option == "mailbox-create": self.action = imap_action.MAILBOX_CREATE self.mailbox = arguments[1] elif action_option == "mailbox-delete": self.action = imap_action.MAILBOX_DELETE self.mailbox = arguments[1] elif action_option == "mailbox-subscribe": self.action = imap_action.MAILBOX_SUBSCRIBE self.mailbox = arguments[1] elif action_option == "mailbox-unsubscribe": self.action = imap_action.MAILBOX_UNSUBSCRIBE self.mailbox = arguments[1] elif action_option == "mailbox-stats": self.action = imap_action.MAILBOX_STATS self.mailbox = arguments[1] elif action_option == "mailbox-rename": self.action = imap_action.MAILBOX_RENAME self.mailbox = [ arguments[1], arguments[2] ] elif action_option == "mailbox-archive": self.action = imap_action.MAILBOX_ARCHIVE self.mailbox = arguments[1] self.archive_year = int(arguments[2]) else: self.print_message("Invalid action specified") return 1 return self.action_execute() if __name__ == "__main__": sys.exit(imap_tool().main())