Package zeroinstall :: Package injector :: Module trust
[frames] | no frames]

Source Code for Module zeroinstall.injector.trust

  1  """ 
  2  Records who we trust to sign feeds. 
  3   
  4  Trust is divided up into domains, so that it is possible to trust a key 
  5  in some cases and not others. 
  6   
  7  @var trust_db: Singleton trust database instance. 
  8  """ 
  9   
 10  # Copyright (C) 2009, Thomas Leonard 
 11  # See the README file for details, or visit http://0install.net. 
 12   
 13  from zeroinstall import _, SafeException, logger 
 14  import os 
 15   
 16  from zeroinstall import support 
 17  from zeroinstall.support import basedir, tasks 
 18  from .namespaces import config_site, config_prog, XMLNS_TRUST 
 19   
 20  KEY_INFO_TIMEOUT = 10   # Maximum time to wait for response from key-info-server 
21 22 -class TrustDB(object):
23 """A database of trusted keys. 24 @ivar keys: maps trusted key fingerprints to a set of domains for which where it is trusted 25 @type keys: {str: set(str)} 26 @ivar watchers: callbacks invoked by L{notify} 27 @see: L{trust_db} - the singleton instance of this class""" 28 __slots__ = ['keys', 'watchers', '_dry_run'] 29
30 - def __init__(self):
31 self.keys = None 32 self.watchers = [] 33 self._dry_run = False
34
35 - def is_trusted(self, fingerprint, domain = None):
36 """@type fingerprint: str 37 @type domain: str | None 38 @rtype: bool""" 39 self.ensure_uptodate() 40 41 domains = self.keys.get(fingerprint, None) 42 if not domains: return False # Unknown key 43 44 if domain is None: 45 return True # Deprecated 46 47 return domain in domains or '*' in domains
48
49 - def get_trust_domains(self, fingerprint):
50 """Return the set of domains in which this key is trusted. 51 If the list includes '*' then the key is trusted everywhere. 52 @type fingerprint: str 53 @rtype: {str} 54 @since: 0.27""" 55 self.ensure_uptodate() 56 return self.keys.get(fingerprint, set())
57
58 - def get_keys_for_domain(self, domain):
59 """Return the set of keys trusted for this domain. 60 @type domain: str 61 @rtype: {str} 62 @since: 0.27""" 63 self.ensure_uptodate() 64 return set([fp for fp in self.keys 65 if domain in self.keys[fp]])
66
67 - def trust_key(self, fingerprint, domain = '*'):
68 """Add key to the list of trusted fingerprints. 69 @param fingerprint: base 16 fingerprint without any spaces 70 @type fingerprint: str 71 @param domain: domain in which key is to be trusted 72 @type domain: str 73 @note: call L{notify} after trusting one or more new keys""" 74 if self.is_trusted(fingerprint, domain): return 75 76 if self._dry_run: 77 print(_("[dry-run] would trust key {key} for {domain}").format(key = fingerprint, domain = domain)) 78 79 int(fingerprint, 16) # Ensure fingerprint is valid 80 81 if fingerprint not in self.keys: 82 self.keys[fingerprint] = set() 83 84 #if domain == '*': 85 # warn("Calling trust_key() without a domain is deprecated") 86 87 self.keys[fingerprint].add(domain) 88 self.save()
89
90 - def untrust_key(self, key, domain = '*'):
91 """@type key: str 92 @type domain: str""" 93 if self._dry_run: 94 print(_("[dry-run] would untrust key {key} for {domain}").format(key = key, domain = domain)) 95 self.ensure_uptodate() 96 self.keys[key].remove(domain) 97 98 if not self.keys[key]: 99 # No more domains for this key 100 del self.keys[key] 101 102 self.save()
103
104 - def save(self):
105 d = basedir.save_config_path(config_site, config_prog) 106 db_file = os.path.join(d, 'trustdb.xml') 107 if self._dry_run: 108 print(_("[dry-run] would update trust database {file}").format(file = db_file)) 109 return 110 from xml.dom import minidom 111 import tempfile 112 113 doc = minidom.Document() 114 root = doc.createElementNS(XMLNS_TRUST, 'trusted-keys') 115 root.setAttribute('xmlns', XMLNS_TRUST) 116 doc.appendChild(root) 117 118 for fingerprint in self.keys: 119 keyelem = doc.createElementNS(XMLNS_TRUST, 'key') 120 root.appendChild(keyelem) 121 keyelem.setAttribute('fingerprint', fingerprint) 122 for domain in self.keys[fingerprint]: 123 domainelem = doc.createElementNS(XMLNS_TRUST, 'domain') 124 domainelem.setAttribute('value', domain) 125 keyelem.appendChild(domainelem) 126 127 with tempfile.NamedTemporaryFile(dir = d, prefix = 'trust-', delete = False, mode = 'wt') as tmp: 128 doc.writexml(tmp, indent = "", addindent = " ", newl = "\n", encoding = 'utf-8') 129 support.portable_rename(tmp.name, db_file)
130
131 - def notify(self):
132 """Call all watcher callbacks. 133 This should be called after trusting or untrusting one or more new keys. 134 @since: 0.25""" 135 for w in self.watchers: w()
136
137 - def ensure_uptodate(self):
138 if self._dry_run: 139 if self.keys is None: self.keys = {} 140 return 141 from xml.dom import minidom 142 143 # This is a bit inefficient... (could cache things) 144 self.keys = {} 145 146 trust = basedir.load_first_config(config_site, config_prog, 'trustdb.xml') 147 if trust: 148 keys = minidom.parse(trust).documentElement 149 for key in keys.getElementsByTagNameNS(XMLNS_TRUST, 'key'): 150 domains = set() 151 self.keys[key.getAttribute('fingerprint')] = domains 152 for domain in key.getElementsByTagNameNS(XMLNS_TRUST, 'domain'): 153 domains.add(domain.getAttribute('value')) 154 else: 155 # Convert old database to XML format 156 trust = basedir.load_first_config(config_site, config_prog, 'trust') 157 if trust: 158 #print "Loading trust from", trust_db 159 with open(trust, 'rt') as stream: 160 for key in stream: 161 if key: 162 self.keys[key] = set(['*'])
163
164 -def domain_from_url(url):
165 """Extract the trust domain for a URL. 166 @param url: the feed's URL 167 @type url: str 168 @return: the trust domain 169 @rtype: str 170 @since: 0.27 171 @raise SafeException: the URL can't be parsed""" 172 try: 173 import urlparse 174 except ImportError: 175 from urllib import parse as urlparse # Python 3 176 177 if os.path.isabs(url): 178 raise SafeException(_("Can't get domain from a local path: '%s'") % url) 179 domain = urlparse.urlparse(url)[1] 180 if domain and domain != '*': 181 return domain 182 raise SafeException(_("Can't extract domain from URL '%s'") % url)
183 184 trust_db = TrustDB()
185 186 -class TrustMgr(object):
187 """A TrustMgr handles the process of deciding whether to trust new keys 188 (contacting the key information server, prompting the user, accepting automatically, etc) 189 @since: 0.53""" 190 191 __slots__ = ['config', '_current_confirm'] 192
193 - def __init__(self, config):
194 """@type config: L{zeroinstall.injector.config.Config}""" 195 self.config = config 196 self._current_confirm = None # (a lock to prevent asking the user multiple questions at once)
197 198 @tasks.async
199 - def confirm_keys(self, pending):
200 """We don't trust any of the signatures yet. Collect information about them and add the keys to the 201 trusted list, possibly after confirming with the user (via config.handler). 202 Updates the L{trust} database, and then calls L{trust.TrustDB.notify}. 203 @param pending: an object holding details of the updated feed 204 @type pending: L{PendingFeed} 205 @return: A blocker that triggers when the user has chosen, or None if already done. 206 @rtype: None | L{Blocker} 207 @since: 0.53""" 208 209 assert pending.sigs 210 211 from zeroinstall.injector import gpg 212 valid_sigs = [s for s in pending.sigs if isinstance(s, gpg.ValidSig)] 213 if not valid_sigs: 214 def format_sig(sig): 215 msg = str(sig) 216 if sig.messages: 217 msg += "\nMessages from GPG:\n" + sig.messages 218 return msg
219 raise SafeException(_('No valid signatures found on "%(url)s". Signatures:%(signatures)s') % 220 {'url': pending.url, 'signatures': ''.join(['\n- ' + format_sig(s) for s in pending.sigs])}) 221 222 # Start downloading information about the keys... 223 fetcher = self.config.fetcher 224 kfs = {} 225 for sig in valid_sigs: 226 kfs[sig] = fetcher.fetch_key_info(sig.fingerprint) 227 228 # Wait up to KEY_INFO_TIMEOUT seconds for key information to arrive. Avoids having the dialog 229 # box update while the user is looking at it, and may allow it to be skipped completely in some 230 # cases. 231 timeout = tasks.TimeoutBlocker(KEY_INFO_TIMEOUT, "key info timeout") 232 while True: 233 key_info_blockers = [sig_info.blocker for sig_info in kfs.values() if sig_info.blocker is not None] 234 if not key_info_blockers: 235 break 236 logger.info("Waiting for response from key-info server: %s", key_info_blockers) 237 yield [timeout] + key_info_blockers 238 if timeout.happened: 239 logger.info("Timeout waiting for key info response") 240 break 241 242 # If we're already confirming something else, wait for that to finish... 243 while self._current_confirm is not None: 244 logger.info("Waiting for previous key confirmations to finish") 245 yield self._current_confirm 246 247 domain = domain_from_url(pending.url) 248 249 if self.config.auto_approve_keys: 250 existing_feed = self.config.iface_cache.get_feed(pending.url) 251 if not existing_feed: 252 changes = False 253 trust_db._dry_run = self.config.handler.dry_run 254 for sig, kf in kfs.items(): 255 for key_info in kf.info: 256 if key_info.getAttribute("vote") == "good": 257 logger.info(_("Automatically approving key for new feed %s based on response from key info server"), pending.url) 258 trust_db.trust_key(sig.fingerprint, domain) 259 changes = True 260 if changes: 261 trust_db.notify() 262 263 # Check whether we still need to confirm. The user may have 264 # already approved one of the keys while dealing with another 265 # feed, or we may have just auto-approved it. 266 for sig in kfs: 267 is_trusted = trust_db.is_trusted(sig.fingerprint, domain) 268 if is_trusted: 269 return 270 271 # Take the lock and confirm this feed 272 self._current_confirm = lock = tasks.Blocker('confirm key lock') 273 try: 274 done = self.config.handler.confirm_import_feed(pending, kfs) 275 if done is not None: 276 yield done 277 tasks.check(done) 278 finally: 279 self._current_confirm = None 280 lock.trigger()
281