Package zeroinstall :: Package injector :: Module scheduler
[frames] | no frames]

Source Code for Module zeroinstall.injector.scheduler

  1  """ 
  2  Manage pools of connections so that we can limit the number of requests per site and reuse 
  3  connections. 
  4  @since: 1.6 
  5  """ 
  6   
  7  # Copyright (C) 2011, Thomas Leonard 
  8  # See the README file for details, or visit http://0install.net. 
  9   
 10  import sys 
 11   
 12  if sys.version_info[0] > 2: 
 13          from urllib import parse as urlparse    # Python 3 
 14  else: 
 15          import urlparse 
 16   
 17  from collections import defaultdict 
 18  import threading 
 19   
 20  from zeroinstall import logger 
 21  from zeroinstall.support import tasks 
 22  from zeroinstall.injector import download 
 23   
 24  default_port = { 
 25          'http': 80, 
 26          'https': 443, 
 27  } 
28 29 -class DownloadStep(object):
30 url = None 31 status = None 32 redirect = None
33
34 -class DownloadScheduler(object):
35 """Assigns (and re-assigns on redirect) Downloads to Sites, allowing per-site limits and connection pooling. 36 @since: 1.6"""
37 - def __init__(self):
38 self._sites = defaultdict(lambda: Site()) # (scheme://host:port) -> Site
39 40 @tasks.async
41 - def download(self, dl):
42 """@type dl: L{zeroinstall.injector.download.Download}""" 43 44 # (changed if we get redirected) 45 current_url = dl.url 46 47 redirections_remaining = 10 48 49 original_exception = None 50 51 # Assign the Download to a Site based on its scheme, host and port. If the result is a redirect, 52 # reassign it to the appropriate new site. Note that proxy handling happens later; we want to group 53 # and limit by the target site, not treat everything as going to a single site (the proxy). 54 while True: 55 location_parts = urlparse.urlparse(current_url) 56 57 site_key = (location_parts.scheme, 58 location_parts.hostname, 59 location_parts.port or default_port.get(location_parts.scheme, None)) 60 61 step = DownloadStep() 62 step.dl = dl 63 step.url = current_url 64 blocker = self._sites[site_key].download(step) 65 yield blocker 66 67 try: 68 tasks.check(blocker) 69 except download.DownloadError as ex: 70 if original_exception is None: 71 original_exception = ex 72 else: 73 logger.warning("%s (while trying mirror)", ex) 74 mirror_url = step.dl.get_next_mirror_url() 75 if mirror_url is None: 76 raise original_exception 77 78 # Try the mirror. 79 # There are actually two places where we try to use the mirror: this one 80 # looks to see if we have an exact copy of same file somewhere else. If this 81 # fails, Fetcher will also look for a different archive that would generate 82 # the required implementation. 83 logger.warning("%s: trying archive mirror at %s", ex, mirror_url) 84 step.redirect = mirror_url 85 redirections_remaining = 10 86 87 if not step.redirect: 88 break 89 90 current_url = step.redirect 91 92 if redirections_remaining == 0: 93 raise download.DownloadError("Too many redirections {url} -> {current}".format( 94 url = dl.url, 95 current = current_url)) 96 redirections_remaining -= 1
97 # (else go around the loop again) 98 99 MAX_DOWNLOADS_PER_SITE = 5
100 101 -def _spawn_thread(step):
102 """@type step: L{DownloadStep} 103 @rtype: L{zeroinstall.support.tasks.Blocker}""" 104 from ._download_child import download_in_thread 105 106 thread_blocker = tasks.Blocker("wait for thread " + step.url) 107 def notify_done(status, ex = None, redirect = None): 108 step.status = status 109 step.redirect = redirect 110 def wake_up_main(): 111 child.join() 112 thread_blocker.trigger(ex) 113 return False
114 tasks.loop.call_soon_threadsafe(wake_up_main) 115 child = threading.Thread(target = lambda: download_in_thread(step.url, step.dl.tempfile, step.dl.modification_time, notify_done)) 116 child.daemon = True 117 child.start() 118 119 return thread_blocker 120
121 -class Site(object):
122 """Represents a service accepting download requests. All requests with the same scheme, host and port are 123 handled by the same Site object, allowing it to do connection pooling and queuing, although the current 124 implementation doesn't do either."""
125 - def __init__(self):
126 self.queue = [] 127 self.active = 0
128 129 @tasks.async
130 - def download(self, step):
131 """@type step: L{DownloadStep}""" 132 if self.active == MAX_DOWNLOADS_PER_SITE: 133 # Too busy to start a new download now. Queue this one and wait. 134 ticket = tasks.Blocker('queued download for ' + step.url) 135 self.queue.append(ticket) 136 yield ticket, step.dl._aborted 137 if step.dl._aborted.happened: 138 raise download.DownloadAborted() 139 140 # Start a new thread for the download 141 thread_blocker = _spawn_thread(step) 142 143 self.active += 1 144 145 # Wait for thread to complete download. 146 yield thread_blocker, step.dl._aborted 147 148 self.active -= 1 149 if self.active < MAX_DOWNLOADS_PER_SITE: 150 self.process_next() # Start next queued download, if any 151 152 if step.dl._aborted.happened: 153 # Don't wait for child to finish (might be stuck doing IO) 154 raise download.DownloadAborted() 155 156 tasks.check(thread_blocker) 157 158 if step.status == download.RESULT_REDIRECT: 159 assert step.redirect 160 return # DownloadScheduler will handle it 161 162 assert not step.redirect, step.redirect 163 164 step.dl._finish(step.status)
165
166 - def process_next(self):
167 assert self.active < MAX_DOWNLOADS_PER_SITE 168 169 if self.queue: 170 nxt = self.queue.pop() 171 nxt.trigger()
172