1 """
2 Manage pools of connections so that we can limit the number of requests per site and reuse
3 connections.
4 @since: 1.6
5 """
6
7
8
9
10 import sys
11
12 if sys.version_info[0] > 2:
13 from urllib import parse as urlparse
14 else:
15 import urlparse
16
17 from collections import defaultdict
18 import threading
19
20 from zeroinstall import logger
21 from zeroinstall.support import tasks
22 from zeroinstall.injector import download
23
24 default_port = {
25 'http': 80,
26 'https': 443,
27 }
33
35 """Assigns (and re-assigns on redirect) Downloads to Sites, allowing per-site limits and connection pooling.
36 @since: 1.6"""
38 self._sites = defaultdict(lambda: Site())
39
40 @tasks.async
42 """@type dl: L{zeroinstall.injector.download.Download}"""
43
44
45 current_url = dl.url
46
47 redirections_remaining = 10
48
49 original_exception = None
50
51
52
53
54 while True:
55 location_parts = urlparse.urlparse(current_url)
56
57 site_key = (location_parts.scheme,
58 location_parts.hostname,
59 location_parts.port or default_port.get(location_parts.scheme, None))
60
61 step = DownloadStep()
62 step.dl = dl
63 step.url = current_url
64 blocker = self._sites[site_key].download(step)
65 yield blocker
66
67 try:
68 tasks.check(blocker)
69 except download.DownloadError as ex:
70 if original_exception is None:
71 original_exception = ex
72 else:
73 logger.warning("%s (while trying mirror)", ex)
74 mirror_url = step.dl.get_next_mirror_url()
75 if mirror_url is None:
76 raise original_exception
77
78
79
80
81
82
83 logger.warning("%s: trying archive mirror at %s", ex, mirror_url)
84 step.redirect = mirror_url
85 redirections_remaining = 10
86
87 if not step.redirect:
88 break
89
90 current_url = step.redirect
91
92 if redirections_remaining == 0:
93 raise download.DownloadError("Too many redirections {url} -> {current}".format(
94 url = dl.url,
95 current = current_url))
96 redirections_remaining -= 1
97
98
99 MAX_DOWNLOADS_PER_SITE = 5
114 tasks.loop.call_soon_threadsafe(wake_up_main)
115 child = threading.Thread(target = lambda: download_in_thread(step.url, step.dl.tempfile, step.dl.modification_time, notify_done))
116 child.daemon = True
117 child.start()
118
119 return thread_blocker
120
122 """Represents a service accepting download requests. All requests with the same scheme, host and port are
123 handled by the same Site object, allowing it to do connection pooling and queuing, although the current
124 implementation doesn't do either."""
126 self.queue = []
127 self.active = 0
128
129 @tasks.async
165
172