jpayne@68: # Copyright (C) 2002-2007 Python Software Foundation jpayne@68: # Contact: email-sig@python.org jpayne@68: jpayne@68: """Email address parsing code. jpayne@68: jpayne@68: Lifted directly from rfc822.py. This should eventually be rewritten. jpayne@68: """ jpayne@68: jpayne@68: __all__ = [ jpayne@68: 'mktime_tz', jpayne@68: 'parsedate', jpayne@68: 'parsedate_tz', jpayne@68: 'quote', jpayne@68: ] jpayne@68: jpayne@68: import time, calendar jpayne@68: jpayne@68: SPACE = ' ' jpayne@68: EMPTYSTRING = '' jpayne@68: COMMASPACE = ', ' jpayne@68: jpayne@68: # Parse a date field jpayne@68: _monthnames = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', jpayne@68: 'aug', 'sep', 'oct', 'nov', 'dec', jpayne@68: 'january', 'february', 'march', 'april', 'may', 'june', 'july', jpayne@68: 'august', 'september', 'october', 'november', 'december'] jpayne@68: jpayne@68: _daynames = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'] jpayne@68: jpayne@68: # The timezone table does not include the military time zones defined jpayne@68: # in RFC822, other than Z. According to RFC1123, the description in jpayne@68: # RFC822 gets the signs wrong, so we can't rely on any such time jpayne@68: # zones. RFC1123 recommends that numeric timezone indicators be used jpayne@68: # instead of timezone names. jpayne@68: jpayne@68: _timezones = {'UT':0, 'UTC':0, 'GMT':0, 'Z':0, jpayne@68: 'AST': -400, 'ADT': -300, # Atlantic (used in Canada) jpayne@68: 'EST': -500, 'EDT': -400, # Eastern jpayne@68: 'CST': -600, 'CDT': -500, # Central jpayne@68: 'MST': -700, 'MDT': -600, # Mountain jpayne@68: 'PST': -800, 'PDT': -700 # Pacific jpayne@68: } jpayne@68: jpayne@68: jpayne@68: def parsedate_tz(data): jpayne@68: """Convert a date string to a time tuple. jpayne@68: jpayne@68: Accounts for military timezones. jpayne@68: """ jpayne@68: res = _parsedate_tz(data) jpayne@68: if not res: jpayne@68: return jpayne@68: if res[9] is None: jpayne@68: res[9] = 0 jpayne@68: return tuple(res) jpayne@68: jpayne@68: def _parsedate_tz(data): jpayne@68: """Convert date to extended time tuple. jpayne@68: jpayne@68: The last (additional) element is the time zone offset in seconds, except if jpayne@68: the timezone was specified as -0000. In that case the last element is jpayne@68: None. This indicates a UTC timestamp that explicitly declaims knowledge of jpayne@68: the source timezone, as opposed to a +0000 timestamp that indicates the jpayne@68: source timezone really was UTC. jpayne@68: jpayne@68: """ jpayne@68: if not data: jpayne@68: return jpayne@68: data = data.split() jpayne@68: # The FWS after the comma after the day-of-week is optional, so search and jpayne@68: # adjust for this. jpayne@68: if data[0].endswith(',') or data[0].lower() in _daynames: jpayne@68: # There's a dayname here. Skip it jpayne@68: del data[0] jpayne@68: else: jpayne@68: i = data[0].rfind(',') jpayne@68: if i >= 0: jpayne@68: data[0] = data[0][i+1:] jpayne@68: if len(data) == 3: # RFC 850 date, deprecated jpayne@68: stuff = data[0].split('-') jpayne@68: if len(stuff) == 3: jpayne@68: data = stuff + data[1:] jpayne@68: if len(data) == 4: jpayne@68: s = data[3] jpayne@68: i = s.find('+') jpayne@68: if i == -1: jpayne@68: i = s.find('-') jpayne@68: if i > 0: jpayne@68: data[3:] = [s[:i], s[i:]] jpayne@68: else: jpayne@68: data.append('') # Dummy tz jpayne@68: if len(data) < 5: jpayne@68: return None jpayne@68: data = data[:5] jpayne@68: [dd, mm, yy, tm, tz] = data jpayne@68: mm = mm.lower() jpayne@68: if mm not in _monthnames: jpayne@68: dd, mm = mm, dd.lower() jpayne@68: if mm not in _monthnames: jpayne@68: return None jpayne@68: mm = _monthnames.index(mm) + 1 jpayne@68: if mm > 12: jpayne@68: mm -= 12 jpayne@68: if dd[-1] == ',': jpayne@68: dd = dd[:-1] jpayne@68: i = yy.find(':') jpayne@68: if i > 0: jpayne@68: yy, tm = tm, yy jpayne@68: if yy[-1] == ',': jpayne@68: yy = yy[:-1] jpayne@68: if not yy[0].isdigit(): jpayne@68: yy, tz = tz, yy jpayne@68: if tm[-1] == ',': jpayne@68: tm = tm[:-1] jpayne@68: tm = tm.split(':') jpayne@68: if len(tm) == 2: jpayne@68: [thh, tmm] = tm jpayne@68: tss = '0' jpayne@68: elif len(tm) == 3: jpayne@68: [thh, tmm, tss] = tm jpayne@68: elif len(tm) == 1 and '.' in tm[0]: jpayne@68: # Some non-compliant MUAs use '.' to separate time elements. jpayne@68: tm = tm[0].split('.') jpayne@68: if len(tm) == 2: jpayne@68: [thh, tmm] = tm jpayne@68: tss = 0 jpayne@68: elif len(tm) == 3: jpayne@68: [thh, tmm, tss] = tm jpayne@68: else: jpayne@68: return None jpayne@68: try: jpayne@68: yy = int(yy) jpayne@68: dd = int(dd) jpayne@68: thh = int(thh) jpayne@68: tmm = int(tmm) jpayne@68: tss = int(tss) jpayne@68: except ValueError: jpayne@68: return None jpayne@68: # Check for a yy specified in two-digit format, then convert it to the jpayne@68: # appropriate four-digit format, according to the POSIX standard. RFC 822 jpayne@68: # calls for a two-digit yy, but RFC 2822 (which obsoletes RFC 822) jpayne@68: # mandates a 4-digit yy. For more information, see the documentation for jpayne@68: # the time module. jpayne@68: if yy < 100: jpayne@68: # The year is between 1969 and 1999 (inclusive). jpayne@68: if yy > 68: jpayne@68: yy += 1900 jpayne@68: # The year is between 2000 and 2068 (inclusive). jpayne@68: else: jpayne@68: yy += 2000 jpayne@68: tzoffset = None jpayne@68: tz = tz.upper() jpayne@68: if tz in _timezones: jpayne@68: tzoffset = _timezones[tz] jpayne@68: else: jpayne@68: try: jpayne@68: tzoffset = int(tz) jpayne@68: except ValueError: jpayne@68: pass jpayne@68: if tzoffset==0 and tz.startswith('-'): jpayne@68: tzoffset = None jpayne@68: # Convert a timezone offset into seconds ; -0500 -> -18000 jpayne@68: if tzoffset: jpayne@68: if tzoffset < 0: jpayne@68: tzsign = -1 jpayne@68: tzoffset = -tzoffset jpayne@68: else: jpayne@68: tzsign = 1 jpayne@68: tzoffset = tzsign * ( (tzoffset//100)*3600 + (tzoffset % 100)*60) jpayne@68: # Daylight Saving Time flag is set to -1, since DST is unknown. jpayne@68: return [yy, mm, dd, thh, tmm, tss, 0, 1, -1, tzoffset] jpayne@68: jpayne@68: jpayne@68: def parsedate(data): jpayne@68: """Convert a time string to a time tuple.""" jpayne@68: t = parsedate_tz(data) jpayne@68: if isinstance(t, tuple): jpayne@68: return t[:9] jpayne@68: else: jpayne@68: return t jpayne@68: jpayne@68: jpayne@68: def mktime_tz(data): jpayne@68: """Turn a 10-tuple as returned by parsedate_tz() into a POSIX timestamp.""" jpayne@68: if data[9] is None: jpayne@68: # No zone info, so localtime is better assumption than GMT jpayne@68: return time.mktime(data[:8] + (-1,)) jpayne@68: else: jpayne@68: t = calendar.timegm(data) jpayne@68: return t - data[9] jpayne@68: jpayne@68: jpayne@68: def quote(str): jpayne@68: """Prepare string to be used in a quoted string. jpayne@68: jpayne@68: Turns backslash and double quote characters into quoted pairs. These jpayne@68: are the only characters that need to be quoted inside a quoted string. jpayne@68: Does not add the surrounding double quotes. jpayne@68: """ jpayne@68: return str.replace('\\', '\\\\').replace('"', '\\"') jpayne@68: jpayne@68: jpayne@68: class AddrlistClass: jpayne@68: """Address parser class by Ben Escoto. jpayne@68: jpayne@68: To understand what this class does, it helps to have a copy of RFC 2822 in jpayne@68: front of you. jpayne@68: jpayne@68: Note: this class interface is deprecated and may be removed in the future. jpayne@68: Use email.utils.AddressList instead. jpayne@68: """ jpayne@68: jpayne@68: def __init__(self, field): jpayne@68: """Initialize a new instance. jpayne@68: jpayne@68: `field' is an unparsed address header field, containing jpayne@68: one or more addresses. jpayne@68: """ jpayne@68: self.specials = '()<>@,:;.\"[]' jpayne@68: self.pos = 0 jpayne@68: self.LWS = ' \t' jpayne@68: self.CR = '\r\n' jpayne@68: self.FWS = self.LWS + self.CR jpayne@68: self.atomends = self.specials + self.LWS + self.CR jpayne@68: # Note that RFC 2822 now specifies `.' as obs-phrase, meaning that it jpayne@68: # is obsolete syntax. RFC 2822 requires that we recognize obsolete jpayne@68: # syntax, so allow dots in phrases. jpayne@68: self.phraseends = self.atomends.replace('.', '') jpayne@68: self.field = field jpayne@68: self.commentlist = [] jpayne@68: jpayne@68: def gotonext(self): jpayne@68: """Skip white space and extract comments.""" jpayne@68: wslist = [] jpayne@68: while self.pos < len(self.field): jpayne@68: if self.field[self.pos] in self.LWS + '\n\r': jpayne@68: if self.field[self.pos] not in '\n\r': jpayne@68: wslist.append(self.field[self.pos]) jpayne@68: self.pos += 1 jpayne@68: elif self.field[self.pos] == '(': jpayne@68: self.commentlist.append(self.getcomment()) jpayne@68: else: jpayne@68: break jpayne@68: return EMPTYSTRING.join(wslist) jpayne@68: jpayne@68: def getaddrlist(self): jpayne@68: """Parse all addresses. jpayne@68: jpayne@68: Returns a list containing all of the addresses. jpayne@68: """ jpayne@68: result = [] jpayne@68: while self.pos < len(self.field): jpayne@68: ad = self.getaddress() jpayne@68: if ad: jpayne@68: result += ad jpayne@68: else: jpayne@68: result.append(('', '')) jpayne@68: return result jpayne@68: jpayne@68: def getaddress(self): jpayne@68: """Parse the next address.""" jpayne@68: self.commentlist = [] jpayne@68: self.gotonext() jpayne@68: jpayne@68: oldpos = self.pos jpayne@68: oldcl = self.commentlist jpayne@68: plist = self.getphraselist() jpayne@68: jpayne@68: self.gotonext() jpayne@68: returnlist = [] jpayne@68: jpayne@68: if self.pos >= len(self.field): jpayne@68: # Bad email address technically, no domain. jpayne@68: if plist: jpayne@68: returnlist = [(SPACE.join(self.commentlist), plist[0])] jpayne@68: jpayne@68: elif self.field[self.pos] in '.@': jpayne@68: # email address is just an addrspec jpayne@68: # this isn't very efficient since we start over jpayne@68: self.pos = oldpos jpayne@68: self.commentlist = oldcl jpayne@68: addrspec = self.getaddrspec() jpayne@68: returnlist = [(SPACE.join(self.commentlist), addrspec)] jpayne@68: jpayne@68: elif self.field[self.pos] == ':': jpayne@68: # address is a group jpayne@68: returnlist = [] jpayne@68: jpayne@68: fieldlen = len(self.field) jpayne@68: self.pos += 1 jpayne@68: while self.pos < len(self.field): jpayne@68: self.gotonext() jpayne@68: if self.pos < fieldlen and self.field[self.pos] == ';': jpayne@68: self.pos += 1 jpayne@68: break jpayne@68: returnlist = returnlist + self.getaddress() jpayne@68: jpayne@68: elif self.field[self.pos] == '<': jpayne@68: # Address is a phrase then a route addr jpayne@68: routeaddr = self.getrouteaddr() jpayne@68: jpayne@68: if self.commentlist: jpayne@68: returnlist = [(SPACE.join(plist) + ' (' + jpayne@68: ' '.join(self.commentlist) + ')', routeaddr)] jpayne@68: else: jpayne@68: returnlist = [(SPACE.join(plist), routeaddr)] jpayne@68: jpayne@68: else: jpayne@68: if plist: jpayne@68: returnlist = [(SPACE.join(self.commentlist), plist[0])] jpayne@68: elif self.field[self.pos] in self.specials: jpayne@68: self.pos += 1 jpayne@68: jpayne@68: self.gotonext() jpayne@68: if self.pos < len(self.field) and self.field[self.pos] == ',': jpayne@68: self.pos += 1 jpayne@68: return returnlist jpayne@68: jpayne@68: def getrouteaddr(self): jpayne@68: """Parse a route address (Return-path value). jpayne@68: jpayne@68: This method just skips all the route stuff and returns the addrspec. jpayne@68: """ jpayne@68: if self.field[self.pos] != '<': jpayne@68: return jpayne@68: jpayne@68: expectroute = False jpayne@68: self.pos += 1 jpayne@68: self.gotonext() jpayne@68: adlist = '' jpayne@68: while self.pos < len(self.field): jpayne@68: if expectroute: jpayne@68: self.getdomain() jpayne@68: expectroute = False jpayne@68: elif self.field[self.pos] == '>': jpayne@68: self.pos += 1 jpayne@68: break jpayne@68: elif self.field[self.pos] == '@': jpayne@68: self.pos += 1 jpayne@68: expectroute = True jpayne@68: elif self.field[self.pos] == ':': jpayne@68: self.pos += 1 jpayne@68: else: jpayne@68: adlist = self.getaddrspec() jpayne@68: self.pos += 1 jpayne@68: break jpayne@68: self.gotonext() jpayne@68: jpayne@68: return adlist jpayne@68: jpayne@68: def getaddrspec(self): jpayne@68: """Parse an RFC 2822 addr-spec.""" jpayne@68: aslist = [] jpayne@68: jpayne@68: self.gotonext() jpayne@68: while self.pos < len(self.field): jpayne@68: preserve_ws = True jpayne@68: if self.field[self.pos] == '.': jpayne@68: if aslist and not aslist[-1].strip(): jpayne@68: aslist.pop() jpayne@68: aslist.append('.') jpayne@68: self.pos += 1 jpayne@68: preserve_ws = False jpayne@68: elif self.field[self.pos] == '"': jpayne@68: aslist.append('"%s"' % quote(self.getquote())) jpayne@68: elif self.field[self.pos] in self.atomends: jpayne@68: if aslist and not aslist[-1].strip(): jpayne@68: aslist.pop() jpayne@68: break jpayne@68: else: jpayne@68: aslist.append(self.getatom()) jpayne@68: ws = self.gotonext() jpayne@68: if preserve_ws and ws: jpayne@68: aslist.append(ws) jpayne@68: jpayne@68: if self.pos >= len(self.field) or self.field[self.pos] != '@': jpayne@68: return EMPTYSTRING.join(aslist) jpayne@68: jpayne@68: aslist.append('@') jpayne@68: self.pos += 1 jpayne@68: self.gotonext() jpayne@68: domain = self.getdomain() jpayne@68: if not domain: jpayne@68: # Invalid domain, return an empty address instead of returning a jpayne@68: # local part to denote failed parsing. jpayne@68: return EMPTYSTRING jpayne@68: return EMPTYSTRING.join(aslist) + domain jpayne@68: jpayne@68: def getdomain(self): jpayne@68: """Get the complete domain name from an address.""" jpayne@68: sdlist = [] jpayne@68: while self.pos < len(self.field): jpayne@68: if self.field[self.pos] in self.LWS: jpayne@68: self.pos += 1 jpayne@68: elif self.field[self.pos] == '(': jpayne@68: self.commentlist.append(self.getcomment()) jpayne@68: elif self.field[self.pos] == '[': jpayne@68: sdlist.append(self.getdomainliteral()) jpayne@68: elif self.field[self.pos] == '.': jpayne@68: self.pos += 1 jpayne@68: sdlist.append('.') jpayne@68: elif self.field[self.pos] == '@': jpayne@68: # bpo-34155: Don't parse domains with two `@` like jpayne@68: # `a@malicious.org@important.com`. jpayne@68: return EMPTYSTRING jpayne@68: elif self.field[self.pos] in self.atomends: jpayne@68: break jpayne@68: else: jpayne@68: sdlist.append(self.getatom()) jpayne@68: return EMPTYSTRING.join(sdlist) jpayne@68: jpayne@68: def getdelimited(self, beginchar, endchars, allowcomments=True): jpayne@68: """Parse a header fragment delimited by special characters. jpayne@68: jpayne@68: `beginchar' is the start character for the fragment. jpayne@68: If self is not looking at an instance of `beginchar' then jpayne@68: getdelimited returns the empty string. jpayne@68: jpayne@68: `endchars' is a sequence of allowable end-delimiting characters. jpayne@68: Parsing stops when one of these is encountered. jpayne@68: jpayne@68: If `allowcomments' is non-zero, embedded RFC 2822 comments are allowed jpayne@68: within the parsed fragment. jpayne@68: """ jpayne@68: if self.field[self.pos] != beginchar: jpayne@68: return '' jpayne@68: jpayne@68: slist = [''] jpayne@68: quote = False jpayne@68: self.pos += 1 jpayne@68: while self.pos < len(self.field): jpayne@68: if quote: jpayne@68: slist.append(self.field[self.pos]) jpayne@68: quote = False jpayne@68: elif self.field[self.pos] in endchars: jpayne@68: self.pos += 1 jpayne@68: break jpayne@68: elif allowcomments and self.field[self.pos] == '(': jpayne@68: slist.append(self.getcomment()) jpayne@68: continue # have already advanced pos from getcomment jpayne@68: elif self.field[self.pos] == '\\': jpayne@68: quote = True jpayne@68: else: jpayne@68: slist.append(self.field[self.pos]) jpayne@68: self.pos += 1 jpayne@68: jpayne@68: return EMPTYSTRING.join(slist) jpayne@68: jpayne@68: def getquote(self): jpayne@68: """Get a quote-delimited fragment from self's field.""" jpayne@68: return self.getdelimited('"', '"\r', False) jpayne@68: jpayne@68: def getcomment(self): jpayne@68: """Get a parenthesis-delimited fragment from self's field.""" jpayne@68: return self.getdelimited('(', ')\r', True) jpayne@68: jpayne@68: def getdomainliteral(self): jpayne@68: """Parse an RFC 2822 domain-literal.""" jpayne@68: return '[%s]' % self.getdelimited('[', ']\r', False) jpayne@68: jpayne@68: def getatom(self, atomends=None): jpayne@68: """Parse an RFC 2822 atom. jpayne@68: jpayne@68: Optional atomends specifies a different set of end token delimiters jpayne@68: (the default is to use self.atomends). This is used e.g. in jpayne@68: getphraselist() since phrase endings must not include the `.' (which jpayne@68: is legal in phrases).""" jpayne@68: atomlist = [''] jpayne@68: if atomends is None: jpayne@68: atomends = self.atomends jpayne@68: jpayne@68: while self.pos < len(self.field): jpayne@68: if self.field[self.pos] in atomends: jpayne@68: break jpayne@68: else: jpayne@68: atomlist.append(self.field[self.pos]) jpayne@68: self.pos += 1 jpayne@68: jpayne@68: return EMPTYSTRING.join(atomlist) jpayne@68: jpayne@68: def getphraselist(self): jpayne@68: """Parse a sequence of RFC 2822 phrases. jpayne@68: jpayne@68: A phrase is a sequence of words, which are in turn either RFC 2822 jpayne@68: atoms or quoted-strings. Phrases are canonicalized by squeezing all jpayne@68: runs of continuous whitespace into one space. jpayne@68: """ jpayne@68: plist = [] jpayne@68: jpayne@68: while self.pos < len(self.field): jpayne@68: if self.field[self.pos] in self.FWS: jpayne@68: self.pos += 1 jpayne@68: elif self.field[self.pos] == '"': jpayne@68: plist.append(self.getquote()) jpayne@68: elif self.field[self.pos] == '(': jpayne@68: self.commentlist.append(self.getcomment()) jpayne@68: elif self.field[self.pos] in self.phraseends: jpayne@68: break jpayne@68: else: jpayne@68: plist.append(self.getatom(self.phraseends)) jpayne@68: jpayne@68: return plist jpayne@68: jpayne@68: class AddressList(AddrlistClass): jpayne@68: """An AddressList encapsulates a list of parsed RFC 2822 addresses.""" jpayne@68: def __init__(self, field): jpayne@68: AddrlistClass.__init__(self, field) jpayne@68: if field: jpayne@68: self.addresslist = self.getaddrlist() jpayne@68: else: jpayne@68: self.addresslist = [] jpayne@68: jpayne@68: def __len__(self): jpayne@68: return len(self.addresslist) jpayne@68: jpayne@68: def __add__(self, other): jpayne@68: # Set union jpayne@68: newaddr = AddressList(None) jpayne@68: newaddr.addresslist = self.addresslist[:] jpayne@68: for x in other.addresslist: jpayne@68: if not x in self.addresslist: jpayne@68: newaddr.addresslist.append(x) jpayne@68: return newaddr jpayne@68: jpayne@68: def __iadd__(self, other): jpayne@68: # Set union, in-place jpayne@68: for x in other.addresslist: jpayne@68: if not x in self.addresslist: jpayne@68: self.addresslist.append(x) jpayne@68: return self jpayne@68: jpayne@68: def __sub__(self, other): jpayne@68: # Set difference jpayne@68: newaddr = AddressList(None) jpayne@68: for x in self.addresslist: jpayne@68: if not x in other.addresslist: jpayne@68: newaddr.addresslist.append(x) jpayne@68: return newaddr jpayne@68: jpayne@68: def __isub__(self, other): jpayne@68: # Set difference, in-place jpayne@68: for x in other.addresslist: jpayne@68: if x in self.addresslist: jpayne@68: self.addresslist.remove(x) jpayne@68: return self jpayne@68: jpayne@68: def __getitem__(self, index): jpayne@68: # Make indexing, slices, and 'in' work jpayne@68: return self.addresslist[index]