1 """
2 Python interface to GnuPG.
3
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
5
6 @see: L{iface_cache.PendingFeed}
7 """
8
9
10
11
12 from zeroinstall import _, logger
13 import subprocess
14 import base64, re
15 import os
16 import tempfile
17
18 from zeroinstall.support import find_in_path, basedir
19 from zeroinstall.injector.trust import trust_db
20 from zeroinstall.injector.model import SafeException
21
22 _gnupg_options = None
36
38 """Abstract base class for signature check results.
39 @ivar status: the raw data returned by GPG
40 @ivar messages: any messages printed by GPG which may be relevant to this signature
41 """
42 status = None
43 messages = None
44
48
50 """Whether this signature is trusted by the user.
51 @rtype: bool"""
52 return False
53
55 """Returns the ID of the key that must be downloaded to check this signature."""
56 return None
57
59 """A valid signature check result."""
60 FINGERPRINT = 0
61 TIMESTAMP = 2
62
64 """@rtype: str"""
65 return "Valid signature from " + self.status[self.FINGERPRINT]
66
72
74 """Get the time this signature was made.
75 @rtype: int"""
76 return int(self.status[self.TIMESTAMP])
77
78 fingerprint = property(lambda self: self.status[self.FINGERPRINT])
79
81 """Call 'gpg --list-keys' and return the results split into lines and columns.
82 @rtype: [[str]]"""
83
84 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys', self.fingerprint], stdout = subprocess.PIPE)
85 cout, unused = child.communicate()
86 if child.returncode:
87 logger.info(_("GPG exited with code %d") % child.returncode)
88 details = []
89 for line in cout.split('\n'):
90 details.append(line.split(':'))
91 return details
92
94 """A bad signature (doesn't match the message)."""
95 KEYID = 0
96
98 """@rtype: str"""
99 return _("BAD signature by %s (the message has been tampered with)") \
100 % self.status[self.KEYID]
101
103 """Error while checking a signature."""
104 KEYID = 0
105 ALG = 1
106 RC = -1
107
109 """@rtype: str"""
110 msg = _("ERROR signature by %s: ") % self.status[self.KEYID]
111 rc = int(self.status[self.RC])
112 if rc == 4:
113 msg += _("Unknown or unsupported algorithm '%s'") % self.status[self.ALG]
114 elif rc == 9:
115 msg += _("Unknown key. Try 'gpg --recv-key %s'") % self.status[self.KEYID]
116 else:
117 msg += _("Unknown reason code %d") % rc
118 return msg
119
121 """@rtype: str | None"""
122 rc = int(self.status[self.RC])
123 if rc == 9:
124 return self.status[self.KEYID]
125 return None
126
128 """A GPG key.
129 @since: 0.27
130 @ivar fingerprint: the fingerprint of the key
131 @type fingerprint: str
132 @ivar name: a short name for the key, extracted from the full name
133 @type name: str
134 """
139
141 return self.name.split(' (', 1)[0].split(' <', 1)[0]
142
144 """Load a set of keys at once.
145 This is much more efficient than making individual calls to L{load_key}.
146 @type fingerprints: [str]
147 @return: a list of loaded keys, indexed by fingerprint
148 @rtype: {str: L{Key}}
149 @since: 0.27"""
150 import codecs
151
152 keys = {}
153
154
155 if not fingerprints: return keys
156
157 for fp in fingerprints:
158 keys[fp] = Key(fp)
159
160 current_fpr = None
161 current_uid = None
162
163 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys',
164 '--with-fingerprint', '--with-fingerprint'] + fingerprints, stdout = subprocess.PIPE)
165 try:
166 for line in child.stdout:
167 if line.startswith('pub:'):
168 current_fpr = None
169 current_uid = None
170 if line.startswith('fpr:'):
171 current_fpr = line.split(':')[9]
172 if current_fpr in keys and current_uid:
173
174
175
176 try:
177 keys[current_fpr].name = codecs.decode(current_uid, 'utf-8')
178 except:
179 logger.warning("Not UTF-8: %s", current_uid)
180 keys[current_fpr].name = current_uid
181 if line.startswith('uid:'):
182 assert current_fpr is not None
183
184 if current_uid: continue
185 parts = line.split(':')
186 current_uid = parts[9]
187 if current_fpr in keys:
188 keys[current_fpr].name = current_uid
189 finally:
190 child.stdout.close()
191
192 if child.wait():
193 logger.warning(_("gpg --list-keys failed with exit code %d") % child.returncode)
194
195 return keys
196
198 """Query gpg for information about this key.
199 @return: a new key
200 @rtype: L{Key}
201 @since: 0.27"""
202 return load_keys([fingerprint])[fingerprint]
203
205 """Run C{gpg --import} with this stream as stdin.
206 @type stream: file"""
207 with tempfile.TemporaryFile(mode = 'w+t') as errors:
208 child = _run_gpg(['--quiet', '--import', '--batch'],
209 stdin = stream, stderr = errors)
210
211 status = child.wait()
212
213 errors.seek(0)
214 error_messages = errors.read().strip()
215
216 if status != 0:
217 if error_messages:
218 raise SafeException(_("Errors from 'gpg --import':\n%s") % error_messages)
219 else:
220 raise SafeException(_("Non-zero exit code %d from 'gpg --import'") % status)
221 elif error_messages:
222 logger.warning(_("Warnings from 'gpg --import':\n%s") % error_messages)
223
225 """@type stream: file
226 @rtype: (file, [L{Signature}])"""
227 xml_comment_start = b'<!-- Base64 Signature'
228
229 data_to_check = stream.read()
230
231 last_comment = data_to_check.rfind(b'\n' + xml_comment_start)
232 if last_comment < 0:
233 raise SafeException(_("No signature block in XML. Maybe this file isn't signed?"))
234 last_comment += 1
235
236
237
238
239 with tempfile.TemporaryFile(mode = 'w+b') as data:
240 data.write(data_to_check[:last_comment])
241 data.flush()
242 os.lseek(data.fileno(), 0, 0)
243
244 with tempfile.TemporaryFile('w+t') as errors:
245 sig_lines = data_to_check[last_comment:].split(b'\n')
246 if sig_lines[0].strip() != xml_comment_start:
247 raise SafeException(_('Bad signature block: extra data on comment line'))
248 while sig_lines and not sig_lines[-1].strip():
249 del sig_lines[-1]
250 if sig_lines[-1].strip() != b'-->':
251 raise SafeException(_('Bad signature block: last line is not end-of-comment'))
252 sig_data = b'\n'.join(sig_lines[1:-1])
253
254 if re.match(b'^[ A-Za-z0-9+/=\n]+$', sig_data) is None:
255 raise SafeException(_("Invalid characters found in base 64 encoded signature"))
256 try:
257 if hasattr(base64, 'decodebytes'):
258 sig_data = base64.decodebytes(sig_data)
259 else:
260 sig_data = base64.decodestring(sig_data)
261 except Exception as ex:
262 raise SafeException(_("Invalid base 64 encoded signature: %s") % str(ex))
263
264 with tempfile.NamedTemporaryFile(prefix = 'injector-sig-', mode = 'wb', delete = False) as sig_file:
265 sig_file.write(sig_data)
266
267 try:
268
269 child = _run_gpg([
270
271 '--batch',
272
273 '--status-fd', '1',
274
275 '--keyserver-options', 'no-auto-key-retrieve',
276 '--verify', sig_file.name, '-'],
277 stdin = data,
278 stdout = subprocess.PIPE,
279 stderr = errors)
280
281 try:
282 sigs = _get_sigs_from_gpg_status_stream(child.stdout, child, errors)
283 finally:
284 os.lseek(stream.fileno(), 0, 0)
285 errors.close()
286 child.stdout.close()
287 child.wait()
288 stream.seek(0)
289 finally:
290 os.unlink(sig_file.name)
291 return (stream, sigs)
292
294 """Verify the GPG signature at the end of stream.
295 stream must be seekable.
296 @type stream: file
297 @return: (stream, [Signatures])
298 @rtype: (file, [L{Signature}])"""
299
300 stream.seek(0)
301
302 start = stream.read(6)
303 stream.seek(0)
304 if start == b"<?xml ":
305 return _check_xml_stream(stream)
306 elif start == b'-----B':
307 raise SafeException(_("Plain GPG-signed feeds no longer supported"))
308 else:
309 raise SafeException(_("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s") % repr(stream.read(120)))
310
312 """Read messages from status_r and collect signatures from it.
313 When done, reap 'child'.
314 If there are no signatures, throw SafeException (using errors
315 for the error message if non-empty).
316 @type status_r: file
317 @type child: L{subprocess.Popen}
318 @type errors: file
319 @rtype: [L{Signature}]"""
320 sigs = []
321
322
323
324
325 for line in status_r:
326 assert line.endswith('\n')
327 if not line.startswith('[GNUPG:] '):
328
329
330 logger.warning("Invalid output from GnuPG: %r", line)
331 continue
332
333 line = line[9:-1]
334 split_line = line.split(' ')
335 code = split_line[0]
336 args = split_line[1:]
337 if code == 'VALIDSIG':
338 sigs.append(ValidSig(args))
339 elif code == 'BADSIG':
340 sigs.append(BadSig(args))
341 elif code == 'ERRSIG':
342 sigs.append(ErrSig(args))
343
344 errors.seek(0)
345
346 error_messages = errors.read().strip()
347
348 if not sigs:
349 if error_messages:
350 raise SafeException(_("No signatures found. Errors from GPG:\n%s") % error_messages)
351 else:
352 raise SafeException(_("No signatures found. No error messages from GPG."))
353 elif error_messages:
354
355 for s in sigs:
356 s.messages = error_messages
357
358 return sigs
359