jpayne@68
|
1 # Copyright (C) 2001-2010 Python Software Foundation
|
jpayne@68
|
2 # Author: Barry Warsaw
|
jpayne@68
|
3 # Contact: email-sig@python.org
|
jpayne@68
|
4
|
jpayne@68
|
5 """Miscellaneous utilities."""
|
jpayne@68
|
6
|
jpayne@68
|
7 __all__ = [
|
jpayne@68
|
8 'collapse_rfc2231_value',
|
jpayne@68
|
9 'decode_params',
|
jpayne@68
|
10 'decode_rfc2231',
|
jpayne@68
|
11 'encode_rfc2231',
|
jpayne@68
|
12 'formataddr',
|
jpayne@68
|
13 'formatdate',
|
jpayne@68
|
14 'format_datetime',
|
jpayne@68
|
15 'getaddresses',
|
jpayne@68
|
16 'make_msgid',
|
jpayne@68
|
17 'mktime_tz',
|
jpayne@68
|
18 'parseaddr',
|
jpayne@68
|
19 'parsedate',
|
jpayne@68
|
20 'parsedate_tz',
|
jpayne@68
|
21 'parsedate_to_datetime',
|
jpayne@68
|
22 'unquote',
|
jpayne@68
|
23 ]
|
jpayne@68
|
24
|
jpayne@68
|
25 import os
|
jpayne@68
|
26 import re
|
jpayne@68
|
27 import time
|
jpayne@68
|
28 import random
|
jpayne@68
|
29 import socket
|
jpayne@68
|
30 import datetime
|
jpayne@68
|
31 import urllib.parse
|
jpayne@68
|
32
|
jpayne@68
|
33 from email._parseaddr import quote
|
jpayne@68
|
34 from email._parseaddr import AddressList as _AddressList
|
jpayne@68
|
35 from email._parseaddr import mktime_tz
|
jpayne@68
|
36
|
jpayne@68
|
37 from email._parseaddr import parsedate, parsedate_tz, _parsedate_tz
|
jpayne@68
|
38
|
jpayne@68
|
39 # Intrapackage imports
|
jpayne@68
|
40 from email.charset import Charset
|
jpayne@68
|
41
|
jpayne@68
|
42 COMMASPACE = ', '
|
jpayne@68
|
43 EMPTYSTRING = ''
|
jpayne@68
|
44 UEMPTYSTRING = ''
|
jpayne@68
|
45 CRLF = '\r\n'
|
jpayne@68
|
46 TICK = "'"
|
jpayne@68
|
47
|
jpayne@68
|
48 specialsre = re.compile(r'[][\\()<>@,:;".]')
|
jpayne@68
|
49 escapesre = re.compile(r'[\\"]')
|
jpayne@68
|
50
|
jpayne@68
|
51 def _has_surrogates(s):
|
jpayne@68
|
52 """Return True if s contains surrogate-escaped binary data."""
|
jpayne@68
|
53 # This check is based on the fact that unless there are surrogates, utf8
|
jpayne@68
|
54 # (Python's default encoding) can encode any string. This is the fastest
|
jpayne@68
|
55 # way to check for surrogates, see issue 11454 for timings.
|
jpayne@68
|
56 try:
|
jpayne@68
|
57 s.encode()
|
jpayne@68
|
58 return False
|
jpayne@68
|
59 except UnicodeEncodeError:
|
jpayne@68
|
60 return True
|
jpayne@68
|
61
|
jpayne@68
|
62 # How to deal with a string containing bytes before handing it to the
|
jpayne@68
|
63 # application through the 'normal' interface.
|
jpayne@68
|
64 def _sanitize(string):
|
jpayne@68
|
65 # Turn any escaped bytes into unicode 'unknown' char. If the escaped
|
jpayne@68
|
66 # bytes happen to be utf-8 they will instead get decoded, even if they
|
jpayne@68
|
67 # were invalid in the charset the source was supposed to be in. This
|
jpayne@68
|
68 # seems like it is not a bad thing; a defect was still registered.
|
jpayne@68
|
69 original_bytes = string.encode('utf-8', 'surrogateescape')
|
jpayne@68
|
70 return original_bytes.decode('utf-8', 'replace')
|
jpayne@68
|
71
|
jpayne@68
|
72
|
jpayne@68
|
73
|
jpayne@68
|
74 # Helpers
|
jpayne@68
|
75
|
jpayne@68
|
76 def formataddr(pair, charset='utf-8'):
|
jpayne@68
|
77 """The inverse of parseaddr(), this takes a 2-tuple of the form
|
jpayne@68
|
78 (realname, email_address) and returns the string value suitable
|
jpayne@68
|
79 for an RFC 2822 From, To or Cc header.
|
jpayne@68
|
80
|
jpayne@68
|
81 If the first element of pair is false, then the second element is
|
jpayne@68
|
82 returned unmodified.
|
jpayne@68
|
83
|
jpayne@68
|
84 Optional charset if given is the character set that is used to encode
|
jpayne@68
|
85 realname in case realname is not ASCII safe. Can be an instance of str or
|
jpayne@68
|
86 a Charset-like object which has a header_encode method. Default is
|
jpayne@68
|
87 'utf-8'.
|
jpayne@68
|
88 """
|
jpayne@68
|
89 name, address = pair
|
jpayne@68
|
90 # The address MUST (per RFC) be ascii, so raise a UnicodeError if it isn't.
|
jpayne@68
|
91 address.encode('ascii')
|
jpayne@68
|
92 if name:
|
jpayne@68
|
93 try:
|
jpayne@68
|
94 name.encode('ascii')
|
jpayne@68
|
95 except UnicodeEncodeError:
|
jpayne@68
|
96 if isinstance(charset, str):
|
jpayne@68
|
97 charset = Charset(charset)
|
jpayne@68
|
98 encoded_name = charset.header_encode(name)
|
jpayne@68
|
99 return "%s <%s>" % (encoded_name, address)
|
jpayne@68
|
100 else:
|
jpayne@68
|
101 quotes = ''
|
jpayne@68
|
102 if specialsre.search(name):
|
jpayne@68
|
103 quotes = '"'
|
jpayne@68
|
104 name = escapesre.sub(r'\\\g<0>', name)
|
jpayne@68
|
105 return '%s%s%s <%s>' % (quotes, name, quotes, address)
|
jpayne@68
|
106 return address
|
jpayne@68
|
107
|
jpayne@68
|
108
|
jpayne@68
|
109
|
jpayne@68
|
110 def getaddresses(fieldvalues):
|
jpayne@68
|
111 """Return a list of (REALNAME, EMAIL) for each fieldvalue."""
|
jpayne@68
|
112 all = COMMASPACE.join(fieldvalues)
|
jpayne@68
|
113 a = _AddressList(all)
|
jpayne@68
|
114 return a.addresslist
|
jpayne@68
|
115
|
jpayne@68
|
116
|
jpayne@68
|
117 def _format_timetuple_and_zone(timetuple, zone):
|
jpayne@68
|
118 return '%s, %02d %s %04d %02d:%02d:%02d %s' % (
|
jpayne@68
|
119 ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'][timetuple[6]],
|
jpayne@68
|
120 timetuple[2],
|
jpayne@68
|
121 ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
|
jpayne@68
|
122 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][timetuple[1] - 1],
|
jpayne@68
|
123 timetuple[0], timetuple[3], timetuple[4], timetuple[5],
|
jpayne@68
|
124 zone)
|
jpayne@68
|
125
|
jpayne@68
|
126 def formatdate(timeval=None, localtime=False, usegmt=False):
|
jpayne@68
|
127 """Returns a date string as specified by RFC 2822, e.g.:
|
jpayne@68
|
128
|
jpayne@68
|
129 Fri, 09 Nov 2001 01:08:47 -0000
|
jpayne@68
|
130
|
jpayne@68
|
131 Optional timeval if given is a floating point time value as accepted by
|
jpayne@68
|
132 gmtime() and localtime(), otherwise the current time is used.
|
jpayne@68
|
133
|
jpayne@68
|
134 Optional localtime is a flag that when True, interprets timeval, and
|
jpayne@68
|
135 returns a date relative to the local timezone instead of UTC, properly
|
jpayne@68
|
136 taking daylight savings time into account.
|
jpayne@68
|
137
|
jpayne@68
|
138 Optional argument usegmt means that the timezone is written out as
|
jpayne@68
|
139 an ascii string, not numeric one (so "GMT" instead of "+0000"). This
|
jpayne@68
|
140 is needed for HTTP, and is only used when localtime==False.
|
jpayne@68
|
141 """
|
jpayne@68
|
142 # Note: we cannot use strftime() because that honors the locale and RFC
|
jpayne@68
|
143 # 2822 requires that day and month names be the English abbreviations.
|
jpayne@68
|
144 if timeval is None:
|
jpayne@68
|
145 timeval = time.time()
|
jpayne@68
|
146 if localtime or usegmt:
|
jpayne@68
|
147 dt = datetime.datetime.fromtimestamp(timeval, datetime.timezone.utc)
|
jpayne@68
|
148 else:
|
jpayne@68
|
149 dt = datetime.datetime.utcfromtimestamp(timeval)
|
jpayne@68
|
150 if localtime:
|
jpayne@68
|
151 dt = dt.astimezone()
|
jpayne@68
|
152 usegmt = False
|
jpayne@68
|
153 return format_datetime(dt, usegmt)
|
jpayne@68
|
154
|
jpayne@68
|
155 def format_datetime(dt, usegmt=False):
|
jpayne@68
|
156 """Turn a datetime into a date string as specified in RFC 2822.
|
jpayne@68
|
157
|
jpayne@68
|
158 If usegmt is True, dt must be an aware datetime with an offset of zero. In
|
jpayne@68
|
159 this case 'GMT' will be rendered instead of the normal +0000 required by
|
jpayne@68
|
160 RFC2822. This is to support HTTP headers involving date stamps.
|
jpayne@68
|
161 """
|
jpayne@68
|
162 now = dt.timetuple()
|
jpayne@68
|
163 if usegmt:
|
jpayne@68
|
164 if dt.tzinfo is None or dt.tzinfo != datetime.timezone.utc:
|
jpayne@68
|
165 raise ValueError("usegmt option requires a UTC datetime")
|
jpayne@68
|
166 zone = 'GMT'
|
jpayne@68
|
167 elif dt.tzinfo is None:
|
jpayne@68
|
168 zone = '-0000'
|
jpayne@68
|
169 else:
|
jpayne@68
|
170 zone = dt.strftime("%z")
|
jpayne@68
|
171 return _format_timetuple_and_zone(now, zone)
|
jpayne@68
|
172
|
jpayne@68
|
173
|
jpayne@68
|
174 def make_msgid(idstring=None, domain=None):
|
jpayne@68
|
175 """Returns a string suitable for RFC 2822 compliant Message-ID, e.g:
|
jpayne@68
|
176
|
jpayne@68
|
177 <142480216486.20800.16526388040877946887@nightshade.la.mastaler.com>
|
jpayne@68
|
178
|
jpayne@68
|
179 Optional idstring if given is a string used to strengthen the
|
jpayne@68
|
180 uniqueness of the message id. Optional domain if given provides the
|
jpayne@68
|
181 portion of the message id after the '@'. It defaults to the locally
|
jpayne@68
|
182 defined hostname.
|
jpayne@68
|
183 """
|
jpayne@68
|
184 timeval = int(time.time()*100)
|
jpayne@68
|
185 pid = os.getpid()
|
jpayne@68
|
186 randint = random.getrandbits(64)
|
jpayne@68
|
187 if idstring is None:
|
jpayne@68
|
188 idstring = ''
|
jpayne@68
|
189 else:
|
jpayne@68
|
190 idstring = '.' + idstring
|
jpayne@68
|
191 if domain is None:
|
jpayne@68
|
192 domain = socket.getfqdn()
|
jpayne@68
|
193 msgid = '<%d.%d.%d%s@%s>' % (timeval, pid, randint, idstring, domain)
|
jpayne@68
|
194 return msgid
|
jpayne@68
|
195
|
jpayne@68
|
196
|
jpayne@68
|
197 def parsedate_to_datetime(data):
|
jpayne@68
|
198 *dtuple, tz = _parsedate_tz(data)
|
jpayne@68
|
199 if tz is None:
|
jpayne@68
|
200 return datetime.datetime(*dtuple[:6])
|
jpayne@68
|
201 return datetime.datetime(*dtuple[:6],
|
jpayne@68
|
202 tzinfo=datetime.timezone(datetime.timedelta(seconds=tz)))
|
jpayne@68
|
203
|
jpayne@68
|
204
|
jpayne@68
|
205 def parseaddr(addr):
|
jpayne@68
|
206 """
|
jpayne@68
|
207 Parse addr into its constituent realname and email address parts.
|
jpayne@68
|
208
|
jpayne@68
|
209 Return a tuple of realname and email address, unless the parse fails, in
|
jpayne@68
|
210 which case return a 2-tuple of ('', '').
|
jpayne@68
|
211 """
|
jpayne@68
|
212 addrs = _AddressList(addr).addresslist
|
jpayne@68
|
213 if not addrs:
|
jpayne@68
|
214 return '', ''
|
jpayne@68
|
215 return addrs[0]
|
jpayne@68
|
216
|
jpayne@68
|
217
|
jpayne@68
|
218 # rfc822.unquote() doesn't properly de-backslash-ify in Python pre-2.3.
|
jpayne@68
|
219 def unquote(str):
|
jpayne@68
|
220 """Remove quotes from a string."""
|
jpayne@68
|
221 if len(str) > 1:
|
jpayne@68
|
222 if str.startswith('"') and str.endswith('"'):
|
jpayne@68
|
223 return str[1:-1].replace('\\\\', '\\').replace('\\"', '"')
|
jpayne@68
|
224 if str.startswith('<') and str.endswith('>'):
|
jpayne@68
|
225 return str[1:-1]
|
jpayne@68
|
226 return str
|
jpayne@68
|
227
|
jpayne@68
|
228
|
jpayne@68
|
229
|
jpayne@68
|
230 # RFC2231-related functions - parameter encoding and decoding
|
jpayne@68
|
231 def decode_rfc2231(s):
|
jpayne@68
|
232 """Decode string according to RFC 2231"""
|
jpayne@68
|
233 parts = s.split(TICK, 2)
|
jpayne@68
|
234 if len(parts) <= 2:
|
jpayne@68
|
235 return None, None, s
|
jpayne@68
|
236 return parts
|
jpayne@68
|
237
|
jpayne@68
|
238
|
jpayne@68
|
239 def encode_rfc2231(s, charset=None, language=None):
|
jpayne@68
|
240 """Encode string according to RFC 2231.
|
jpayne@68
|
241
|
jpayne@68
|
242 If neither charset nor language is given, then s is returned as-is. If
|
jpayne@68
|
243 charset is given but not language, the string is encoded using the empty
|
jpayne@68
|
244 string for language.
|
jpayne@68
|
245 """
|
jpayne@68
|
246 s = urllib.parse.quote(s, safe='', encoding=charset or 'ascii')
|
jpayne@68
|
247 if charset is None and language is None:
|
jpayne@68
|
248 return s
|
jpayne@68
|
249 if language is None:
|
jpayne@68
|
250 language = ''
|
jpayne@68
|
251 return "%s'%s'%s" % (charset, language, s)
|
jpayne@68
|
252
|
jpayne@68
|
253
|
jpayne@68
|
254 rfc2231_continuation = re.compile(r'^(?P<name>\w+)\*((?P<num>[0-9]+)\*?)?$',
|
jpayne@68
|
255 re.ASCII)
|
jpayne@68
|
256
|
jpayne@68
|
257 def decode_params(params):
|
jpayne@68
|
258 """Decode parameters list according to RFC 2231.
|
jpayne@68
|
259
|
jpayne@68
|
260 params is a sequence of 2-tuples containing (param name, string value).
|
jpayne@68
|
261 """
|
jpayne@68
|
262 # Copy params so we don't mess with the original
|
jpayne@68
|
263 params = params[:]
|
jpayne@68
|
264 new_params = []
|
jpayne@68
|
265 # Map parameter's name to a list of continuations. The values are a
|
jpayne@68
|
266 # 3-tuple of the continuation number, the string value, and a flag
|
jpayne@68
|
267 # specifying whether a particular segment is %-encoded.
|
jpayne@68
|
268 rfc2231_params = {}
|
jpayne@68
|
269 name, value = params.pop(0)
|
jpayne@68
|
270 new_params.append((name, value))
|
jpayne@68
|
271 while params:
|
jpayne@68
|
272 name, value = params.pop(0)
|
jpayne@68
|
273 if name.endswith('*'):
|
jpayne@68
|
274 encoded = True
|
jpayne@68
|
275 else:
|
jpayne@68
|
276 encoded = False
|
jpayne@68
|
277 value = unquote(value)
|
jpayne@68
|
278 mo = rfc2231_continuation.match(name)
|
jpayne@68
|
279 if mo:
|
jpayne@68
|
280 name, num = mo.group('name', 'num')
|
jpayne@68
|
281 if num is not None:
|
jpayne@68
|
282 num = int(num)
|
jpayne@68
|
283 rfc2231_params.setdefault(name, []).append((num, value, encoded))
|
jpayne@68
|
284 else:
|
jpayne@68
|
285 new_params.append((name, '"%s"' % quote(value)))
|
jpayne@68
|
286 if rfc2231_params:
|
jpayne@68
|
287 for name, continuations in rfc2231_params.items():
|
jpayne@68
|
288 value = []
|
jpayne@68
|
289 extended = False
|
jpayne@68
|
290 # Sort by number
|
jpayne@68
|
291 continuations.sort()
|
jpayne@68
|
292 # And now append all values in numerical order, converting
|
jpayne@68
|
293 # %-encodings for the encoded segments. If any of the
|
jpayne@68
|
294 # continuation names ends in a *, then the entire string, after
|
jpayne@68
|
295 # decoding segments and concatenating, must have the charset and
|
jpayne@68
|
296 # language specifiers at the beginning of the string.
|
jpayne@68
|
297 for num, s, encoded in continuations:
|
jpayne@68
|
298 if encoded:
|
jpayne@68
|
299 # Decode as "latin-1", so the characters in s directly
|
jpayne@68
|
300 # represent the percent-encoded octet values.
|
jpayne@68
|
301 # collapse_rfc2231_value treats this as an octet sequence.
|
jpayne@68
|
302 s = urllib.parse.unquote(s, encoding="latin-1")
|
jpayne@68
|
303 extended = True
|
jpayne@68
|
304 value.append(s)
|
jpayne@68
|
305 value = quote(EMPTYSTRING.join(value))
|
jpayne@68
|
306 if extended:
|
jpayne@68
|
307 charset, language, value = decode_rfc2231(value)
|
jpayne@68
|
308 new_params.append((name, (charset, language, '"%s"' % value)))
|
jpayne@68
|
309 else:
|
jpayne@68
|
310 new_params.append((name, '"%s"' % value))
|
jpayne@68
|
311 return new_params
|
jpayne@68
|
312
|
jpayne@68
|
313 def collapse_rfc2231_value(value, errors='replace',
|
jpayne@68
|
314 fallback_charset='us-ascii'):
|
jpayne@68
|
315 if not isinstance(value, tuple) or len(value) != 3:
|
jpayne@68
|
316 return unquote(value)
|
jpayne@68
|
317 # While value comes to us as a unicode string, we need it to be a bytes
|
jpayne@68
|
318 # object. We do not want bytes() normal utf-8 decoder, we want a straight
|
jpayne@68
|
319 # interpretation of the string as character bytes.
|
jpayne@68
|
320 charset, language, text = value
|
jpayne@68
|
321 if charset is None:
|
jpayne@68
|
322 # Issue 17369: if charset/lang is None, decode_rfc2231 couldn't parse
|
jpayne@68
|
323 # the value, so use the fallback_charset.
|
jpayne@68
|
324 charset = fallback_charset
|
jpayne@68
|
325 rawbytes = bytes(text, 'raw-unicode-escape')
|
jpayne@68
|
326 try:
|
jpayne@68
|
327 return str(rawbytes, charset, errors)
|
jpayne@68
|
328 except LookupError:
|
jpayne@68
|
329 # charset is not a known codec.
|
jpayne@68
|
330 return unquote(text)
|
jpayne@68
|
331
|
jpayne@68
|
332
|
jpayne@68
|
333 #
|
jpayne@68
|
334 # datetime doesn't provide a localtime function yet, so provide one. Code
|
jpayne@68
|
335 # adapted from the patch in issue 9527. This may not be perfect, but it is
|
jpayne@68
|
336 # better than not having it.
|
jpayne@68
|
337 #
|
jpayne@68
|
338
|
jpayne@68
|
339 def localtime(dt=None, isdst=-1):
|
jpayne@68
|
340 """Return local time as an aware datetime object.
|
jpayne@68
|
341
|
jpayne@68
|
342 If called without arguments, return current time. Otherwise *dt*
|
jpayne@68
|
343 argument should be a datetime instance, and it is converted to the
|
jpayne@68
|
344 local time zone according to the system time zone database. If *dt* is
|
jpayne@68
|
345 naive (that is, dt.tzinfo is None), it is assumed to be in local time.
|
jpayne@68
|
346 In this case, a positive or zero value for *isdst* causes localtime to
|
jpayne@68
|
347 presume initially that summer time (for example, Daylight Saving Time)
|
jpayne@68
|
348 is or is not (respectively) in effect for the specified time. A
|
jpayne@68
|
349 negative value for *isdst* causes the localtime() function to attempt
|
jpayne@68
|
350 to divine whether summer time is in effect for the specified time.
|
jpayne@68
|
351
|
jpayne@68
|
352 """
|
jpayne@68
|
353 if dt is None:
|
jpayne@68
|
354 return datetime.datetime.now(datetime.timezone.utc).astimezone()
|
jpayne@68
|
355 if dt.tzinfo is not None:
|
jpayne@68
|
356 return dt.astimezone()
|
jpayne@68
|
357 # We have a naive datetime. Convert to a (localtime) timetuple and pass to
|
jpayne@68
|
358 # system mktime together with the isdst hint. System mktime will return
|
jpayne@68
|
359 # seconds since epoch.
|
jpayne@68
|
360 tm = dt.timetuple()[:-1] + (isdst,)
|
jpayne@68
|
361 seconds = time.mktime(tm)
|
jpayne@68
|
362 localtm = time.localtime(seconds)
|
jpayne@68
|
363 try:
|
jpayne@68
|
364 delta = datetime.timedelta(seconds=localtm.tm_gmtoff)
|
jpayne@68
|
365 tz = datetime.timezone(delta, localtm.tm_zone)
|
jpayne@68
|
366 except AttributeError:
|
jpayne@68
|
367 # Compute UTC offset and compare with the value implied by tm_isdst.
|
jpayne@68
|
368 # If the values match, use the zone name implied by tm_isdst.
|
jpayne@68
|
369 delta = dt - datetime.datetime(*time.gmtime(seconds)[:6])
|
jpayne@68
|
370 dst = time.daylight and localtm.tm_isdst > 0
|
jpayne@68
|
371 gmtoff = -(time.altzone if dst else time.timezone)
|
jpayne@68
|
372 if delta == datetime.timedelta(seconds=gmtoff):
|
jpayne@68
|
373 tz = datetime.timezone(delta, time.tzname[dst])
|
jpayne@68
|
374 else:
|
jpayne@68
|
375 tz = datetime.timezone(delta)
|
jpayne@68
|
376 return dt.replace(tzinfo=tz)
|