| Home | Trees | Indices | Help |
|
|---|
|
|
1 """
2 Manage pools of connections so that we can limit the number of requests per site and reuse
3 connections.
4 @since: 1.6
5 """
6
7 # Copyright (C) 2011, Thomas Leonard
8 # See the README file for details, or visit http://0install.net.
9
10 import sys
11
12 if sys.version_info[0] > 2:
13 from urllib import parse as urlparse # Python 3
14 else:
15 import urlparse
16
17 from collections import defaultdict
18 import threading
19
20 from zeroinstall import logger
21 from zeroinstall.support import tasks
22 from zeroinstall.injector import download
23
24 default_port = {
25 'http': 80,
26 'https': 443,
27 }
33
35 """Assigns (and re-assigns on redirect) Downloads to Sites, allowing per-site limits and connection pooling.
36 @since: 1.6"""
38 self._sites = defaultdict(lambda: Site()) # (scheme://host:port) -> Site
39
40 @tasks.async
42 """@type dl: L{zeroinstall.injector.download.Download}"""
43
44 # (changed if we get redirected)
45 current_url = dl.url
46
47 redirections_remaining = 10
48
49 original_exception = None
50
51 # Assign the Download to a Site based on its scheme, host and port. If the result is a redirect,
52 # reassign it to the appropriate new site. Note that proxy handling happens later; we want to group
53 # and limit by the target site, not treat everything as going to a single site (the proxy).
54 while True:
55 location_parts = urlparse.urlparse(current_url)
56
57 site_key = (location_parts.scheme,
58 location_parts.hostname,
59 location_parts.port or default_port.get(location_parts.scheme, None))
60
61 step = DownloadStep()
62 step.dl = dl
63 step.url = current_url
64 blocker = self._sites[site_key].download(step)
65 yield blocker
66
67 try:
68 tasks.check(blocker)
69 except download.DownloadError as ex:
70 if original_exception is None:
71 original_exception = ex
72 else:
73 logger.warning("%s (while trying mirror)", ex)
74 mirror_url = step.dl.get_next_mirror_url()
75 if mirror_url is None:
76 raise original_exception
77
78 # Try the mirror.
79 # There are actually two places where we try to use the mirror: this one
80 # looks to see if we have an exact copy of same file somewhere else. If this
81 # fails, Fetcher will also look for a different archive that would generate
82 # the required implementation.
83 logger.warning("%s: trying archive mirror at %s", ex, mirror_url)
84 step.redirect = mirror_url
85 redirections_remaining = 10
86
87 if not step.redirect:
88 break
89
90 current_url = step.redirect
91
92 if redirections_remaining == 0:
93 raise download.DownloadError("Too many redirections {url} -> {current}".format(
94 url = dl.url,
95 current = current_url))
96 redirections_remaining -= 1
97 # (else go around the loop again)
98
99 MAX_DOWNLOADS_PER_SITE = 5
102 """@type step: L{DownloadStep}
103 @rtype: L{zeroinstall.support.tasks.Blocker}"""
104 from ._download_child import download_in_thread
105
106 thread_blocker = tasks.Blocker("wait for thread " + step.url)
107 def notify_done(status, ex = None, redirect = None):
108 step.status = status
109 step.redirect = redirect
110 def wake_up_main():
111 child.join()
112 thread_blocker.trigger(ex)
113 return False
114 tasks.loop.call_soon_threadsafe(wake_up_main)
115 child = threading.Thread(target = lambda: download_in_thread(step.url, step.dl.tempfile, step.dl.modification_time, notify_done))
116 child.daemon = True
117 child.start()
118
119 return thread_blocker
120
122 """Represents a service accepting download requests. All requests with the same scheme, host and port are
123 handled by the same Site object, allowing it to do connection pooling and queuing, although the current
124 implementation doesn't do either."""
128
129 @tasks.async
131 """@type step: L{DownloadStep}"""
132 if self.active == MAX_DOWNLOADS_PER_SITE:
133 # Too busy to start a new download now. Queue this one and wait.
134 ticket = tasks.Blocker('queued download for ' + step.url)
135 self.queue.append(ticket)
136 yield ticket, step.dl._aborted
137 if step.dl._aborted.happened:
138 raise download.DownloadAborted()
139
140 # Start a new thread for the download
141 thread_blocker = _spawn_thread(step)
142
143 self.active += 1
144
145 # Wait for thread to complete download.
146 yield thread_blocker, step.dl._aborted
147
148 self.active -= 1
149 if self.active < MAX_DOWNLOADS_PER_SITE:
150 self.process_next() # Start next queued download, if any
151
152 if step.dl._aborted.happened:
153 # Don't wait for child to finish (might be stuck doing IO)
154 raise download.DownloadAborted()
155
156 tasks.check(thread_blocker)
157
158 if step.status == download.RESULT_REDIRECT:
159 assert step.redirect
160 return # DownloadScheduler will handle it
161
162 assert not step.redirect, step.redirect
163
164 step.dl._finish(step.status)
165
167 assert self.active < MAX_DOWNLOADS_PER_SITE
168
169 if self.queue:
170 nxt = self.queue.pop()
171 nxt.trigger()
172
| Home | Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1 on Tue Mar 26 18:14:10 2013 | http://epydoc.sourceforge.net |