Package zeroinstall :: Package injector :: Module gpg
[frames] | no frames]

Source Code for Module zeroinstall.injector.gpg

  1  """ 
  2  Python interface to GnuPG. 
  3   
  4  This module is used to invoke GnuPG to check the digital signatures on interfaces. 
  5   
  6  @see: L{iface_cache.PendingFeed} 
  7  """ 
  8   
  9  # Copyright (C) 2009, Thomas Leonard 
 10  # See the README file for details, or visit http://0install.net. 
 11   
 12  from zeroinstall import _, logger 
 13  import subprocess 
 14  import base64, re 
 15  import os 
 16  import tempfile 
 17   
 18  from zeroinstall.support import find_in_path, basedir 
 19  from zeroinstall.injector.trust import trust_db 
 20  from zeroinstall.injector.model import SafeException 
 21   
 22  _gnupg_options = None 
23 -def _run_gpg(args, **kwargs):
24 """@type args: [str] 25 @rtype: subprocess.Popen""" 26 global _gnupg_options 27 if _gnupg_options is None: 28 gpg_path = os.environ.get('ZEROINSTALL_GPG') or find_in_path('gpg') or find_in_path('gpg2') or 'gpg' 29 _gnupg_options = [gpg_path, '--no-secmem-warning'] 30 31 if hasattr(os, 'geteuid') and os.geteuid() == 0 and 'GNUPGHOME' not in os.environ: 32 _gnupg_options += ['--homedir', os.path.join(basedir.home, '.gnupg')] 33 logger.info(_("Running as root, so setting GnuPG home to %s"), _gnupg_options[-1]) 34 35 return subprocess.Popen(_gnupg_options + args, universal_newlines = True, **kwargs)
36
37 -class Signature(object):
38 """Abstract base class for signature check results. 39 @ivar status: the raw data returned by GPG 40 @ivar messages: any messages printed by GPG which may be relevant to this signature 41 """ 42 status = None 43 messages = None 44
45 - def __init__(self, status):
46 """@type status: [str]""" 47 self.status = status
48
49 - def is_trusted(self, domain = None):
50 """Whether this signature is trusted by the user. 51 @rtype: bool""" 52 return False
53
54 - def need_key(self):
55 """Returns the ID of the key that must be downloaded to check this signature.""" 56 return None
57
58 -class ValidSig(Signature):
59 """A valid signature check result.""" 60 FINGERPRINT = 0 61 TIMESTAMP = 2 62
63 - def __str__(self):
64 """@rtype: str""" 65 return "Valid signature from " + self.status[self.FINGERPRINT]
66
67 - def is_trusted(self, domain = None):
68 """Asks the L{trust.trust_db}. 69 @type domain: str | None 70 @rtype: bool""" 71 return trust_db.is_trusted(self.status[self.FINGERPRINT], domain)
72
73 - def get_timestamp(self):
74 """Get the time this signature was made. 75 @rtype: int""" 76 return int(self.status[self.TIMESTAMP])
77 78 fingerprint = property(lambda self: self.status[self.FINGERPRINT]) 79
80 - def get_details(self):
81 """Call 'gpg --list-keys' and return the results split into lines and columns. 82 @rtype: [[str]]""" 83 # Note: GnuPG 2 always uses --fixed-list-mode 84 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys', self.fingerprint], stdout = subprocess.PIPE) 85 cout, unused = child.communicate() 86 if child.returncode: 87 logger.info(_("GPG exited with code %d") % child.returncode) 88 details = [] 89 for line in cout.split('\n'): 90 details.append(line.split(':')) 91 return details
92
93 -class BadSig(Signature):
94 """A bad signature (doesn't match the message).""" 95 KEYID = 0 96
97 - def __str__(self):
98 """@rtype: str""" 99 return _("BAD signature by %s (the message has been tampered with)") \ 100 % self.status[self.KEYID]
101
102 -class ErrSig(Signature):
103 """Error while checking a signature.""" 104 KEYID = 0 105 ALG = 1 106 RC = -1 107
108 - def __str__(self):
109 """@rtype: str""" 110 msg = _("ERROR signature by %s: ") % self.status[self.KEYID] 111 rc = int(self.status[self.RC]) 112 if rc == 4: 113 msg += _("Unknown or unsupported algorithm '%s'") % self.status[self.ALG] 114 elif rc == 9: 115 msg += _("Unknown key. Try 'gpg --recv-key %s'") % self.status[self.KEYID] 116 else: 117 msg += _("Unknown reason code %d") % rc 118 return msg
119
120 - def need_key(self):
121 """@rtype: str | None""" 122 rc = int(self.status[self.RC]) 123 if rc == 9: 124 return self.status[self.KEYID] 125 return None
126
127 -class Key(object):
128 """A GPG key. 129 @since: 0.27 130 @ivar fingerprint: the fingerprint of the key 131 @type fingerprint: str 132 @ivar name: a short name for the key, extracted from the full name 133 @type name: str 134 """
135 - def __init__(self, fingerprint):
136 """@type fingerprint: str""" 137 self.fingerprint = fingerprint 138 self.name = '(unknown)'
139
140 - def get_short_name(self):
141 return self.name.split(' (', 1)[0].split(' <', 1)[0]
142
143 -def load_keys(fingerprints):
144 """Load a set of keys at once. 145 This is much more efficient than making individual calls to L{load_key}. 146 @type fingerprints: [str] 147 @return: a list of loaded keys, indexed by fingerprint 148 @rtype: {str: L{Key}} 149 @since: 0.27""" 150 import codecs 151 152 keys = {} 153 154 # Otherwise GnuPG returns everything... 155 if not fingerprints: return keys 156 157 for fp in fingerprints: 158 keys[fp] = Key(fp) 159 160 current_fpr = None 161 current_uid = None 162 163 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys', 164 '--with-fingerprint', '--with-fingerprint'] + fingerprints, stdout = subprocess.PIPE) 165 try: 166 for line in child.stdout: 167 if line.startswith('pub:'): 168 current_fpr = None 169 current_uid = None 170 if line.startswith('fpr:'): 171 current_fpr = line.split(':')[9] 172 if current_fpr in keys and current_uid: 173 # This is probably a subordinate key, where the fingerprint 174 # comes after the uid, not before. Note: we assume the subkey is 175 # cross-certified, as recent always ones are. 176 try: 177 keys[current_fpr].name = codecs.decode(current_uid, 'utf-8') 178 except: 179 logger.warning("Not UTF-8: %s", current_uid) 180 keys[current_fpr].name = current_uid 181 if line.startswith('uid:'): 182 assert current_fpr is not None 183 # Only take primary UID 184 if current_uid: continue 185 parts = line.split(':') 186 current_uid = parts[9] 187 if current_fpr in keys: 188 keys[current_fpr].name = current_uid 189 finally: 190 child.stdout.close() 191 192 if child.wait(): 193 logger.warning(_("gpg --list-keys failed with exit code %d") % child.returncode) 194 195 return keys
196
197 -def load_key(fingerprint):
198 """Query gpg for information about this key. 199 @return: a new key 200 @rtype: L{Key} 201 @since: 0.27""" 202 return load_keys([fingerprint])[fingerprint]
203
204 -def import_key(stream):
205 """Run C{gpg --import} with this stream as stdin. 206 @type stream: file""" 207 with tempfile.TemporaryFile(mode = 'w+t') as errors: 208 child = _run_gpg(['--quiet', '--import', '--batch'], 209 stdin = stream, stderr = errors) 210 211 status = child.wait() 212 213 errors.seek(0) 214 error_messages = errors.read().strip() 215 216 if status != 0: 217 if error_messages: 218 raise SafeException(_("Errors from 'gpg --import':\n%s") % error_messages) 219 else: 220 raise SafeException(_("Non-zero exit code %d from 'gpg --import'") % status) 221 elif error_messages: 222 logger.warning(_("Warnings from 'gpg --import':\n%s") % error_messages)
223
224 -def _check_xml_stream(stream):
225 """@type stream: file 226 @rtype: (file, [L{Signature}])""" 227 xml_comment_start = b'<!-- Base64 Signature' 228 229 data_to_check = stream.read() 230 231 last_comment = data_to_check.rfind(b'\n' + xml_comment_start) 232 if last_comment < 0: 233 raise SafeException(_("No signature block in XML. Maybe this file isn't signed?")) 234 last_comment += 1 # Include new-line in data 235 236 # Copy the file to 'data', without the signature 237 # Copy the signature to 'sig' 238 239 with tempfile.TemporaryFile(mode = 'w+b') as data: 240 data.write(data_to_check[:last_comment]) 241 data.flush() 242 os.lseek(data.fileno(), 0, 0) 243 244 with tempfile.TemporaryFile('w+t') as errors: 245 sig_lines = data_to_check[last_comment:].split(b'\n') 246 if sig_lines[0].strip() != xml_comment_start: 247 raise SafeException(_('Bad signature block: extra data on comment line')) 248 while sig_lines and not sig_lines[-1].strip(): 249 del sig_lines[-1] 250 if sig_lines[-1].strip() != b'-->': 251 raise SafeException(_('Bad signature block: last line is not end-of-comment')) 252 sig_data = b'\n'.join(sig_lines[1:-1]) 253 254 if re.match(b'^[ A-Za-z0-9+/=\n]+$', sig_data) is None: 255 raise SafeException(_("Invalid characters found in base 64 encoded signature")) 256 try: 257 if hasattr(base64, 'decodebytes'): 258 sig_data = base64.decodebytes(sig_data) # Python 3 259 else: 260 sig_data = base64.decodestring(sig_data) # Python 2 261 except Exception as ex: 262 raise SafeException(_("Invalid base 64 encoded signature: %s") % str(ex)) 263 264 with tempfile.NamedTemporaryFile(prefix = 'injector-sig-', mode = 'wb', delete = False) as sig_file: 265 sig_file.write(sig_data) 266 267 try: 268 # Note: Should ideally close status_r in the child, but we want to support Windows too 269 child = _run_gpg([# Not all versions support this: 270 #'--max-output', str(1024 * 1024), 271 '--batch', 272 # Windows GPG can only cope with "1" here 273 '--status-fd', '1', 274 # Don't try to download missing keys; we'll do that 275 '--keyserver-options', 'no-auto-key-retrieve', 276 '--verify', sig_file.name, '-'], 277 stdin = data, 278 stdout = subprocess.PIPE, 279 stderr = errors) 280 281 try: 282 sigs = _get_sigs_from_gpg_status_stream(child.stdout, child, errors) 283 finally: 284 os.lseek(stream.fileno(), 0, 0) 285 errors.close() 286 child.stdout.close() 287 child.wait() 288 stream.seek(0) 289 finally: 290 os.unlink(sig_file.name) 291 return (stream, sigs)
292
293 -def check_stream(stream):
294 """Verify the GPG signature at the end of stream. 295 stream must be seekable. 296 @type stream: file 297 @return: (stream, [Signatures]) 298 @rtype: (file, [L{Signature}])""" 299 300 stream.seek(0) 301 302 start = stream.read(6) 303 stream.seek(0) 304 if start == b"<?xml ": 305 return _check_xml_stream(stream) 306 elif start == b'-----B': 307 raise SafeException(_("Plain GPG-signed feeds no longer supported")) 308 else: 309 raise SafeException(_("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s") % repr(stream.read(120)))
310
311 -def _get_sigs_from_gpg_status_stream(status_r, child, errors):
312 """Read messages from status_r and collect signatures from it. 313 When done, reap 'child'. 314 If there are no signatures, throw SafeException (using errors 315 for the error message if non-empty). 316 @type status_r: file 317 @type child: L{subprocess.Popen} 318 @type errors: file 319 @rtype: [L{Signature}]""" 320 sigs = [] 321 322 # Should we error out on bad signatures, even if there's a good 323 # signature too? 324 325 for line in status_r: 326 assert line.endswith('\n') 327 if not line.startswith('[GNUPG:] '): 328 # The docs says every line starts with this, but if auto-key-retrieve 329 # is on then they might not. See bug #3420548 330 logger.warning("Invalid output from GnuPG: %r", line) 331 continue 332 333 line = line[9:-1] 334 split_line = line.split(' ') 335 code = split_line[0] 336 args = split_line[1:] 337 if code == 'VALIDSIG': 338 sigs.append(ValidSig(args)) 339 elif code == 'BADSIG': 340 sigs.append(BadSig(args)) 341 elif code == 'ERRSIG': 342 sigs.append(ErrSig(args)) 343 344 errors.seek(0) 345 346 error_messages = errors.read().strip() 347 348 if not sigs: 349 if error_messages: 350 raise SafeException(_("No signatures found. Errors from GPG:\n%s") % error_messages) 351 else: 352 raise SafeException(_("No signatures found. No error messages from GPG.")) 353 elif error_messages: 354 # Attach the warnings to all the signatures, in case they're useful. 355 for s in sigs: 356 s.messages = error_messages 357 358 return sigs
359