jpayne@68: # Copyright (C) 2001-2010 Python Software Foundation jpayne@68: # Author: Barry Warsaw jpayne@68: # Contact: email-sig@python.org jpayne@68: jpayne@68: """Miscellaneous utilities.""" jpayne@68: jpayne@68: __all__ = [ jpayne@68: 'collapse_rfc2231_value', jpayne@68: 'decode_params', jpayne@68: 'decode_rfc2231', jpayne@68: 'encode_rfc2231', jpayne@68: 'formataddr', jpayne@68: 'formatdate', jpayne@68: 'format_datetime', jpayne@68: 'getaddresses', jpayne@68: 'make_msgid', jpayne@68: 'mktime_tz', jpayne@68: 'parseaddr', jpayne@68: 'parsedate', jpayne@68: 'parsedate_tz', jpayne@68: 'parsedate_to_datetime', jpayne@68: 'unquote', jpayne@68: ] jpayne@68: jpayne@68: import os jpayne@68: import re jpayne@68: import time jpayne@68: import random jpayne@68: import socket jpayne@68: import datetime jpayne@68: import urllib.parse jpayne@68: jpayne@68: from email._parseaddr import quote jpayne@68: from email._parseaddr import AddressList as _AddressList jpayne@68: from email._parseaddr import mktime_tz jpayne@68: jpayne@68: from email._parseaddr import parsedate, parsedate_tz, _parsedate_tz jpayne@68: jpayne@68: # Intrapackage imports jpayne@68: from email.charset import Charset jpayne@68: jpayne@68: COMMASPACE = ', ' jpayne@68: EMPTYSTRING = '' jpayne@68: UEMPTYSTRING = '' jpayne@68: CRLF = '\r\n' jpayne@68: TICK = "'" jpayne@68: jpayne@68: specialsre = re.compile(r'[][\\()<>@,:;".]') jpayne@68: escapesre = re.compile(r'[\\"]') jpayne@68: jpayne@68: def _has_surrogates(s): jpayne@68: """Return True if s contains surrogate-escaped binary data.""" jpayne@68: # This check is based on the fact that unless there are surrogates, utf8 jpayne@68: # (Python's default encoding) can encode any string. This is the fastest jpayne@68: # way to check for surrogates, see issue 11454 for timings. jpayne@68: try: jpayne@68: s.encode() jpayne@68: return False jpayne@68: except UnicodeEncodeError: jpayne@68: return True jpayne@68: jpayne@68: # How to deal with a string containing bytes before handing it to the jpayne@68: # application through the 'normal' interface. jpayne@68: def _sanitize(string): jpayne@68: # Turn any escaped bytes into unicode 'unknown' char. If the escaped jpayne@68: # bytes happen to be utf-8 they will instead get decoded, even if they jpayne@68: # were invalid in the charset the source was supposed to be in. This jpayne@68: # seems like it is not a bad thing; a defect was still registered. jpayne@68: original_bytes = string.encode('utf-8', 'surrogateescape') jpayne@68: return original_bytes.decode('utf-8', 'replace') jpayne@68: jpayne@68: jpayne@68: jpayne@68: # Helpers jpayne@68: jpayne@68: def formataddr(pair, charset='utf-8'): jpayne@68: """The inverse of parseaddr(), this takes a 2-tuple of the form jpayne@68: (realname, email_address) and returns the string value suitable jpayne@68: for an RFC 2822 From, To or Cc header. jpayne@68: jpayne@68: If the first element of pair is false, then the second element is jpayne@68: returned unmodified. jpayne@68: jpayne@68: Optional charset if given is the character set that is used to encode jpayne@68: realname in case realname is not ASCII safe. Can be an instance of str or jpayne@68: a Charset-like object which has a header_encode method. Default is jpayne@68: 'utf-8'. jpayne@68: """ jpayne@68: name, address = pair jpayne@68: # The address MUST (per RFC) be ascii, so raise a UnicodeError if it isn't. jpayne@68: address.encode('ascii') jpayne@68: if name: jpayne@68: try: jpayne@68: name.encode('ascii') jpayne@68: except UnicodeEncodeError: jpayne@68: if isinstance(charset, str): jpayne@68: charset = Charset(charset) jpayne@68: encoded_name = charset.header_encode(name) jpayne@68: return "%s <%s>" % (encoded_name, address) jpayne@68: else: jpayne@68: quotes = '' jpayne@68: if specialsre.search(name): jpayne@68: quotes = '"' jpayne@68: name = escapesre.sub(r'\\\g<0>', name) jpayne@68: return '%s%s%s <%s>' % (quotes, name, quotes, address) jpayne@68: return address jpayne@68: jpayne@68: jpayne@68: jpayne@68: def getaddresses(fieldvalues): jpayne@68: """Return a list of (REALNAME, EMAIL) for each fieldvalue.""" jpayne@68: all = COMMASPACE.join(fieldvalues) jpayne@68: a = _AddressList(all) jpayne@68: return a.addresslist jpayne@68: jpayne@68: jpayne@68: def _format_timetuple_and_zone(timetuple, zone): jpayne@68: return '%s, %02d %s %04d %02d:%02d:%02d %s' % ( jpayne@68: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'][timetuple[6]], jpayne@68: timetuple[2], jpayne@68: ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', jpayne@68: 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][timetuple[1] - 1], jpayne@68: timetuple[0], timetuple[3], timetuple[4], timetuple[5], jpayne@68: zone) jpayne@68: jpayne@68: def formatdate(timeval=None, localtime=False, usegmt=False): jpayne@68: """Returns a date string as specified by RFC 2822, e.g.: jpayne@68: jpayne@68: Fri, 09 Nov 2001 01:08:47 -0000 jpayne@68: jpayne@68: Optional timeval if given is a floating point time value as accepted by jpayne@68: gmtime() and localtime(), otherwise the current time is used. jpayne@68: jpayne@68: Optional localtime is a flag that when True, interprets timeval, and jpayne@68: returns a date relative to the local timezone instead of UTC, properly jpayne@68: taking daylight savings time into account. jpayne@68: jpayne@68: Optional argument usegmt means that the timezone is written out as jpayne@68: an ascii string, not numeric one (so "GMT" instead of "+0000"). This jpayne@68: is needed for HTTP, and is only used when localtime==False. jpayne@68: """ jpayne@68: # Note: we cannot use strftime() because that honors the locale and RFC jpayne@68: # 2822 requires that day and month names be the English abbreviations. jpayne@68: if timeval is None: jpayne@68: timeval = time.time() jpayne@68: if localtime or usegmt: jpayne@68: dt = datetime.datetime.fromtimestamp(timeval, datetime.timezone.utc) jpayne@68: else: jpayne@68: dt = datetime.datetime.utcfromtimestamp(timeval) jpayne@68: if localtime: jpayne@68: dt = dt.astimezone() jpayne@68: usegmt = False jpayne@68: return format_datetime(dt, usegmt) jpayne@68: jpayne@68: def format_datetime(dt, usegmt=False): jpayne@68: """Turn a datetime into a date string as specified in RFC 2822. jpayne@68: jpayne@68: If usegmt is True, dt must be an aware datetime with an offset of zero. In jpayne@68: this case 'GMT' will be rendered instead of the normal +0000 required by jpayne@68: RFC2822. This is to support HTTP headers involving date stamps. jpayne@68: """ jpayne@68: now = dt.timetuple() jpayne@68: if usegmt: jpayne@68: if dt.tzinfo is None or dt.tzinfo != datetime.timezone.utc: jpayne@68: raise ValueError("usegmt option requires a UTC datetime") jpayne@68: zone = 'GMT' jpayne@68: elif dt.tzinfo is None: jpayne@68: zone = '-0000' jpayne@68: else: jpayne@68: zone = dt.strftime("%z") jpayne@68: return _format_timetuple_and_zone(now, zone) jpayne@68: jpayne@68: jpayne@68: def make_msgid(idstring=None, domain=None): jpayne@68: """Returns a string suitable for RFC 2822 compliant Message-ID, e.g: jpayne@68: jpayne@68: <142480216486.20800.16526388040877946887@nightshade.la.mastaler.com> jpayne@68: jpayne@68: Optional idstring if given is a string used to strengthen the jpayne@68: uniqueness of the message id. Optional domain if given provides the jpayne@68: portion of the message id after the '@'. It defaults to the locally jpayne@68: defined hostname. jpayne@68: """ jpayne@68: timeval = int(time.time()*100) jpayne@68: pid = os.getpid() jpayne@68: randint = random.getrandbits(64) jpayne@68: if idstring is None: jpayne@68: idstring = '' jpayne@68: else: jpayne@68: idstring = '.' + idstring jpayne@68: if domain is None: jpayne@68: domain = socket.getfqdn() jpayne@68: msgid = '<%d.%d.%d%s@%s>' % (timeval, pid, randint, idstring, domain) jpayne@68: return msgid jpayne@68: jpayne@68: jpayne@68: def parsedate_to_datetime(data): jpayne@68: *dtuple, tz = _parsedate_tz(data) jpayne@68: if tz is None: jpayne@68: return datetime.datetime(*dtuple[:6]) jpayne@68: return datetime.datetime(*dtuple[:6], jpayne@68: tzinfo=datetime.timezone(datetime.timedelta(seconds=tz))) jpayne@68: jpayne@68: jpayne@68: def parseaddr(addr): jpayne@68: """ jpayne@68: Parse addr into its constituent realname and email address parts. jpayne@68: jpayne@68: Return a tuple of realname and email address, unless the parse fails, in jpayne@68: which case return a 2-tuple of ('', ''). jpayne@68: """ jpayne@68: addrs = _AddressList(addr).addresslist jpayne@68: if not addrs: jpayne@68: return '', '' jpayne@68: return addrs[0] jpayne@68: jpayne@68: jpayne@68: # rfc822.unquote() doesn't properly de-backslash-ify in Python pre-2.3. jpayne@68: def unquote(str): jpayne@68: """Remove quotes from a string.""" jpayne@68: if len(str) > 1: jpayne@68: if str.startswith('"') and str.endswith('"'): jpayne@68: return str[1:-1].replace('\\\\', '\\').replace('\\"', '"') jpayne@68: if str.startswith('<') and str.endswith('>'): jpayne@68: return str[1:-1] jpayne@68: return str jpayne@68: jpayne@68: jpayne@68: jpayne@68: # RFC2231-related functions - parameter encoding and decoding jpayne@68: def decode_rfc2231(s): jpayne@68: """Decode string according to RFC 2231""" jpayne@68: parts = s.split(TICK, 2) jpayne@68: if len(parts) <= 2: jpayne@68: return None, None, s jpayne@68: return parts jpayne@68: jpayne@68: jpayne@68: def encode_rfc2231(s, charset=None, language=None): jpayne@68: """Encode string according to RFC 2231. jpayne@68: jpayne@68: If neither charset nor language is given, then s is returned as-is. If jpayne@68: charset is given but not language, the string is encoded using the empty jpayne@68: string for language. jpayne@68: """ jpayne@68: s = urllib.parse.quote(s, safe='', encoding=charset or 'ascii') jpayne@68: if charset is None and language is None: jpayne@68: return s jpayne@68: if language is None: jpayne@68: language = '' jpayne@68: return "%s'%s'%s" % (charset, language, s) jpayne@68: jpayne@68: jpayne@68: rfc2231_continuation = re.compile(r'^(?P\w+)\*((?P[0-9]+)\*?)?$', jpayne@68: re.ASCII) jpayne@68: jpayne@68: def decode_params(params): jpayne@68: """Decode parameters list according to RFC 2231. jpayne@68: jpayne@68: params is a sequence of 2-tuples containing (param name, string value). jpayne@68: """ jpayne@68: # Copy params so we don't mess with the original jpayne@68: params = params[:] jpayne@68: new_params = [] jpayne@68: # Map parameter's name to a list of continuations. The values are a jpayne@68: # 3-tuple of the continuation number, the string value, and a flag jpayne@68: # specifying whether a particular segment is %-encoded. jpayne@68: rfc2231_params = {} jpayne@68: name, value = params.pop(0) jpayne@68: new_params.append((name, value)) jpayne@68: while params: jpayne@68: name, value = params.pop(0) jpayne@68: if name.endswith('*'): jpayne@68: encoded = True jpayne@68: else: jpayne@68: encoded = False jpayne@68: value = unquote(value) jpayne@68: mo = rfc2231_continuation.match(name) jpayne@68: if mo: jpayne@68: name, num = mo.group('name', 'num') jpayne@68: if num is not None: jpayne@68: num = int(num) jpayne@68: rfc2231_params.setdefault(name, []).append((num, value, encoded)) jpayne@68: else: jpayne@68: new_params.append((name, '"%s"' % quote(value))) jpayne@68: if rfc2231_params: jpayne@68: for name, continuations in rfc2231_params.items(): jpayne@68: value = [] jpayne@68: extended = False jpayne@68: # Sort by number jpayne@68: continuations.sort() jpayne@68: # And now append all values in numerical order, converting jpayne@68: # %-encodings for the encoded segments. If any of the jpayne@68: # continuation names ends in a *, then the entire string, after jpayne@68: # decoding segments and concatenating, must have the charset and jpayne@68: # language specifiers at the beginning of the string. jpayne@68: for num, s, encoded in continuations: jpayne@68: if encoded: jpayne@68: # Decode as "latin-1", so the characters in s directly jpayne@68: # represent the percent-encoded octet values. jpayne@68: # collapse_rfc2231_value treats this as an octet sequence. jpayne@68: s = urllib.parse.unquote(s, encoding="latin-1") jpayne@68: extended = True jpayne@68: value.append(s) jpayne@68: value = quote(EMPTYSTRING.join(value)) jpayne@68: if extended: jpayne@68: charset, language, value = decode_rfc2231(value) jpayne@68: new_params.append((name, (charset, language, '"%s"' % value))) jpayne@68: else: jpayne@68: new_params.append((name, '"%s"' % value)) jpayne@68: return new_params jpayne@68: jpayne@68: def collapse_rfc2231_value(value, errors='replace', jpayne@68: fallback_charset='us-ascii'): jpayne@68: if not isinstance(value, tuple) or len(value) != 3: jpayne@68: return unquote(value) jpayne@68: # While value comes to us as a unicode string, we need it to be a bytes jpayne@68: # object. We do not want bytes() normal utf-8 decoder, we want a straight jpayne@68: # interpretation of the string as character bytes. jpayne@68: charset, language, text = value jpayne@68: if charset is None: jpayne@68: # Issue 17369: if charset/lang is None, decode_rfc2231 couldn't parse jpayne@68: # the value, so use the fallback_charset. jpayne@68: charset = fallback_charset jpayne@68: rawbytes = bytes(text, 'raw-unicode-escape') jpayne@68: try: jpayne@68: return str(rawbytes, charset, errors) jpayne@68: except LookupError: jpayne@68: # charset is not a known codec. jpayne@68: return unquote(text) jpayne@68: jpayne@68: jpayne@68: # jpayne@68: # datetime doesn't provide a localtime function yet, so provide one. Code jpayne@68: # adapted from the patch in issue 9527. This may not be perfect, but it is jpayne@68: # better than not having it. jpayne@68: # jpayne@68: jpayne@68: def localtime(dt=None, isdst=-1): jpayne@68: """Return local time as an aware datetime object. jpayne@68: jpayne@68: If called without arguments, return current time. Otherwise *dt* jpayne@68: argument should be a datetime instance, and it is converted to the jpayne@68: local time zone according to the system time zone database. If *dt* is jpayne@68: naive (that is, dt.tzinfo is None), it is assumed to be in local time. jpayne@68: In this case, a positive or zero value for *isdst* causes localtime to jpayne@68: presume initially that summer time (for example, Daylight Saving Time) jpayne@68: is or is not (respectively) in effect for the specified time. A jpayne@68: negative value for *isdst* causes the localtime() function to attempt jpayne@68: to divine whether summer time is in effect for the specified time. jpayne@68: jpayne@68: """ jpayne@68: if dt is None: jpayne@68: return datetime.datetime.now(datetime.timezone.utc).astimezone() jpayne@68: if dt.tzinfo is not None: jpayne@68: return dt.astimezone() jpayne@68: # We have a naive datetime. Convert to a (localtime) timetuple and pass to jpayne@68: # system mktime together with the isdst hint. System mktime will return jpayne@68: # seconds since epoch. jpayne@68: tm = dt.timetuple()[:-1] + (isdst,) jpayne@68: seconds = time.mktime(tm) jpayne@68: localtm = time.localtime(seconds) jpayne@68: try: jpayne@68: delta = datetime.timedelta(seconds=localtm.tm_gmtoff) jpayne@68: tz = datetime.timezone(delta, localtm.tm_zone) jpayne@68: except AttributeError: jpayne@68: # Compute UTC offset and compare with the value implied by tm_isdst. jpayne@68: # If the values match, use the zone name implied by tm_isdst. jpayne@68: delta = dt - datetime.datetime(*time.gmtime(seconds)[:6]) jpayne@68: dst = time.daylight and localtm.tm_isdst > 0 jpayne@68: gmtoff = -(time.altzone if dst else time.timezone) jpayne@68: if delta == datetime.timedelta(seconds=gmtoff): jpayne@68: tz = datetime.timezone(delta, time.tzname[dst]) jpayne@68: else: jpayne@68: tz = datetime.timezone(delta) jpayne@68: return dt.replace(tzinfo=tz)