A Python library for code common to TripleO CLI and TripleO UI.
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

443 wiersze
15KB

  1. # Copyright 2015 Red Hat, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. #
  15. import abc
  16. import json
  17. import logging
  18. import multiprocessing
  19. import netifaces
  20. import os
  21. import requests
  22. import six
  23. from six.moves import urllib
  24. import subprocess
  25. import tenacity
  26. import docker
  27. try:
  28. from docker import APIClient as Client
  29. except ImportError:
  30. from docker import Client
  31. from oslo_concurrency import processutils
  32. from tripleo_common.image.base import BaseImageManager
  33. from tripleo_common.image.exception import ImageNotFoundException
  34. from tripleo_common.image.exception import ImageUploaderException
  35. LOG = logging.getLogger(__name__)
  36. SECURE_REGISTRIES = (
  37. 'trunk.registry.rdoproject.org',
  38. 'docker.io',
  39. )
  40. class ImageUploadManager(BaseImageManager):
  41. """Manage the uploading of image files
  42. Manage the uploading of images from a config file specified in YAML
  43. syntax. Multiple config files can be specified. They will be merged.
  44. """
  45. def __init__(self, config_files=None, verbose=False, debug=False):
  46. if config_files is None:
  47. config_files = []
  48. super(ImageUploadManager, self).__init__(config_files)
  49. self.uploaders = {}
  50. def discover_image_tag(self, image, tag_from_label=None):
  51. uploader = self.uploader('docker')
  52. return uploader.discover_image_tag(
  53. image, tag_from_label=tag_from_label)
  54. def uploader(self, uploader):
  55. if uploader not in self.uploaders:
  56. self.uploaders[uploader] = ImageUploader.get_uploader(uploader)
  57. return self.uploaders[uploader]
  58. def upload(self):
  59. """Start the upload process"""
  60. LOG.info('Using config files: %s' % self.config_files)
  61. uploads = self.load_config_files(self.UPLOADS) or []
  62. container_images = self.load_config_files(self.CONTAINER_IMAGES) or []
  63. upload_images = uploads + container_images
  64. default_push_destination = self.get_ctrl_plane_ip() + ':8787'
  65. for item in upload_images:
  66. image_name = item.get('imagename')
  67. uploader = item.get('uploader', 'docker')
  68. pull_source = item.get('pull_source')
  69. push_destination = item.get('push_destination',
  70. default_push_destination)
  71. # This updates the parsed upload_images dict with real values
  72. item['push_destination'] = push_destination
  73. self.uploader(uploader).add_upload_task(
  74. image_name, pull_source, push_destination)
  75. for uploader in self.uploaders.values():
  76. uploader.run_tasks()
  77. return upload_images # simply to make test validation easier
  78. def get_ctrl_plane_ip(self):
  79. addr = 'localhost'
  80. if 'br-ctlplane' in netifaces.interfaces():
  81. addrs = netifaces.ifaddresses('br-ctlplane')
  82. if netifaces.AF_INET in addrs and addrs[netifaces.AF_INET]:
  83. addr = addrs[netifaces.AF_INET][0].get('addr', 'localhost')
  84. return addr
  85. @six.add_metaclass(abc.ABCMeta)
  86. class ImageUploader(object):
  87. """Base representation of an image uploading method"""
  88. @staticmethod
  89. def get_uploader(uploader):
  90. if uploader == 'docker':
  91. return DockerImageUploader()
  92. raise ImageUploaderException('Unknown image uploader type')
  93. @abc.abstractmethod
  94. def run_tasks(self):
  95. """Run all tasks"""
  96. pass
  97. @abc.abstractmethod
  98. def add_upload_task(self, image_name, pull_source, push_destination):
  99. """Add an upload task to be executed later"""
  100. pass
  101. @abc.abstractmethod
  102. def discover_image_tag(self, image, tag_from_label=None):
  103. """Discover a versioned tag for an image"""
  104. pass
  105. @abc.abstractmethod
  106. def cleanup(self):
  107. """Remove unused images or temporary files from upload"""
  108. pass
  109. @abc.abstractmethod
  110. def is_insecure_registry(self, registry_host):
  111. """Detect whether a registry host is not configured with TLS"""
  112. pass
  113. class DockerImageUploader(ImageUploader):
  114. """Upload images using docker push"""
  115. def __init__(self):
  116. self.upload_tasks = []
  117. self.secure_registries = set()
  118. self.insecure_registries = set()
  119. @staticmethod
  120. def upload_image(image_name, pull_source, push_destination,
  121. insecure_registries):
  122. LOG.info('imagename: %s' % image_name)
  123. dockerc = Client(base_url='unix://var/run/docker.sock', version='auto')
  124. if ':' in image_name:
  125. image = image_name.rpartition(':')[0]
  126. tag = image_name.rpartition(':')[2]
  127. else:
  128. image = image_name
  129. tag = 'latest'
  130. if pull_source:
  131. repo = pull_source + '/' + image
  132. else:
  133. repo = image
  134. full_image = repo + ':' + tag
  135. new_repo = push_destination + '/' + repo.partition('/')[2]
  136. full_new_repo = new_repo + ':' + tag
  137. if DockerImageUploader._images_match(full_image, full_new_repo,
  138. insecure_registries):
  139. LOG.info('Skipping upload for image %s' % image_name)
  140. return []
  141. DockerImageUploader._pull(dockerc, repo, tag=tag)
  142. response = dockerc.tag(image=full_image, repository=new_repo,
  143. tag=tag, force=True)
  144. LOG.debug(response)
  145. DockerImageUploader._push(dockerc, new_repo, tag=tag)
  146. LOG.info('Completed upload for image %s' % image_name)
  147. return full_image, full_new_repo
  148. @staticmethod
  149. @tenacity.retry( # Retry up to 5 times with jittered exponential backoff
  150. reraise=True,
  151. wait=tenacity.wait_random_exponential(multiplier=1, max=10),
  152. stop=tenacity.stop_after_attempt(5)
  153. )
  154. def _pull(dockerc, image, tag=None):
  155. LOG.debug('Pulling %s' % image)
  156. for line in dockerc.pull(image, tag=tag, stream=True):
  157. status = json.loads(line)
  158. if 'error' in status:
  159. LOG.warning('docker pull failed: %s' % status['error'])
  160. raise ImageUploaderException('Could not pull image %s' % image)
  161. @staticmethod
  162. @tenacity.retry( # Retry up to 5 times with jittered exponential backoff
  163. reraise=True,
  164. wait=tenacity.wait_random_exponential(multiplier=1, max=10),
  165. stop=tenacity.stop_after_attempt(5)
  166. )
  167. def _push(dockerc, image, tag=None):
  168. LOG.debug('Pushing %s' % image)
  169. for line in dockerc.push(image, tag=tag, stream=True):
  170. status = json.loads(line)
  171. if 'error' in status:
  172. LOG.warning('docker push failed: %s' % status['error'])
  173. raise ImageUploaderException('Could not push image %s' % image)
  174. @staticmethod
  175. def _images_match(image1, image2, insecure_registries):
  176. try:
  177. image1_digest = DockerImageUploader._image_digest(
  178. image1, insecure_registries)
  179. except Exception:
  180. return False
  181. try:
  182. image2_digest = DockerImageUploader._image_digest(
  183. image2, insecure_registries)
  184. except Exception:
  185. return False
  186. # missing digest, no way to know if they match
  187. if not image1_digest or not image2_digest:
  188. return False
  189. return image1_digest == image2_digest
  190. @staticmethod
  191. def _image_digest(image, insecure_registries):
  192. image_url = DockerImageUploader._image_to_url(image)
  193. insecure = image_url.netloc in insecure_registries
  194. i = DockerImageUploader._inspect(image_url.geturl(), insecure)
  195. return i.get('Digest')
  196. @staticmethod
  197. @tenacity.retry( # Retry up to 5 times with jittered exponential backoff
  198. reraise=True,
  199. retry=tenacity.retry_if_exception_type(ImageUploaderException),
  200. wait=tenacity.wait_random_exponential(multiplier=1, max=10),
  201. stop=tenacity.stop_after_attempt(5)
  202. )
  203. def _inspect(image, insecure=False):
  204. cmd = ['skopeo', 'inspect']
  205. if insecure:
  206. cmd.append('--tls-verify=false')
  207. cmd.append(image)
  208. LOG.info('Running %s' % ' '.join(cmd))
  209. env = os.environ.copy()
  210. try:
  211. process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE,
  212. stderr=subprocess.PIPE)
  213. out, err = process.communicate()
  214. if process.returncode != 0:
  215. not_found_msgs = (
  216. 'manifest unknown',
  217. # returned by docker.io
  218. 'requested access to the resource is denied'
  219. )
  220. if any(n in err for n in not_found_msgs):
  221. raise ImageNotFoundException('Not found image: %s\n%s' %
  222. (image, err))
  223. raise ImageUploaderException('Error inspecting image: %s\n%s' %
  224. (image, err))
  225. except KeyboardInterrupt:
  226. raise Exception('Action interrupted with ctrl+c')
  227. return json.loads(out)
  228. @staticmethod
  229. def _image_to_url(image):
  230. if '://' not in image:
  231. image = 'docker://' + image
  232. return urllib.parse.urlparse(image)
  233. @staticmethod
  234. def _discover_tag_from_inspect(i, image, tag_from_label=None,
  235. fallback_tag=None):
  236. labels = i.get('Labels', {})
  237. label_keys = ', '.join(labels.keys())
  238. if not tag_from_label:
  239. raise ImageUploaderException(
  240. 'No label specified. Available labels: %s' % label_keys
  241. )
  242. if "{" in tag_from_label:
  243. try:
  244. tag_label = tag_from_label.format(**labels)
  245. except ValueError as e:
  246. raise ImageUploaderException(e)
  247. except KeyError as e:
  248. if fallback_tag:
  249. tag_label = fallback_tag
  250. else:
  251. raise ImageUploaderException(
  252. 'Image %s %s. Available labels: %s' %
  253. (image, e, label_keys)
  254. )
  255. else:
  256. tag_label = labels.get(tag_from_label)
  257. if tag_label is None:
  258. if fallback_tag:
  259. tag_label = fallback_tag
  260. else:
  261. raise ImageUploaderException(
  262. 'Image %s has no label %s. Available labels: %s' %
  263. (image, tag_from_label, label_keys)
  264. )
  265. # confirm the tag exists by checking for an entry in RepoTags
  266. repo_tags = i.get('RepoTags', [])
  267. if tag_label not in repo_tags:
  268. raise ImageUploaderException(
  269. 'Image %s has no tag %s.\nAvailable tags: %s' %
  270. (image, tag_label, ', '.join(repo_tags))
  271. )
  272. return tag_label
  273. def discover_image_tags(self, images, tag_from_label=None):
  274. image_urls = [self._image_to_url(i) for i in images]
  275. # prime self.insecure_registries by testing every image
  276. for url in image_urls:
  277. self.is_insecure_registry(url.netloc)
  278. discover_args = []
  279. for image in images:
  280. discover_args.append((image, tag_from_label,
  281. self.insecure_registries))
  282. p = multiprocessing.Pool(16)
  283. versioned_images = {}
  284. for image, versioned_image in p.map(discover_tag_from_inspect,
  285. discover_args):
  286. versioned_images[image] = versioned_image
  287. return versioned_images
  288. def discover_image_tag(self, image, tag_from_label=None,
  289. fallback_tag=None):
  290. image_url = self._image_to_url(image)
  291. insecure = self.is_insecure_registry(image_url.netloc)
  292. i = self._inspect(image_url.geturl(), insecure)
  293. return self._discover_tag_from_inspect(i, image, tag_from_label,
  294. fallback_tag)
  295. def cleanup(self, local_images):
  296. dockerc = Client(base_url='unix://var/run/docker.sock', version='auto')
  297. for image in sorted(local_images):
  298. if not image:
  299. continue
  300. LOG.info('Removing local copy of %s' % image)
  301. try:
  302. dockerc.remove_image(image)
  303. except docker.errors.APIError as e:
  304. if e.explanation:
  305. LOG.warning(e.explanation)
  306. else:
  307. LOG.warning(e)
  308. def add_upload_task(self, image_name, pull_source, push_destination):
  309. # prime self.insecure_registries
  310. if pull_source:
  311. self.is_insecure_registry(self._image_to_url(pull_source).netloc)
  312. else:
  313. self.is_insecure_registry(self._image_to_url(image_name).netloc)
  314. self.is_insecure_registry(self._image_to_url(push_destination).netloc)
  315. self.upload_tasks.append((image_name, pull_source, push_destination,
  316. self.insecure_registries))
  317. def run_tasks(self):
  318. if not self.upload_tasks:
  319. return
  320. local_images = []
  321. # Pull a single image first, to avoid duplicate pulls of the
  322. # same base layers
  323. first = self.upload_tasks.pop()
  324. result = self.upload_image(*first)
  325. local_images.extend(result)
  326. # workers will be half the CPU count, to a minimum of 2
  327. workers = max(2, processutils.get_worker_count() // 2)
  328. p = multiprocessing.Pool(workers)
  329. for result in p.map(docker_upload, self.upload_tasks):
  330. local_images.extend(result)
  331. LOG.info('result %s' % local_images)
  332. # Do cleanup after all the uploads so common layers don't get deleted
  333. # repeatedly
  334. self.cleanup(local_images)
  335. def is_insecure_registry(self, registry_host):
  336. if registry_host in self.secure_registries:
  337. return False
  338. if registry_host in self.insecure_registries:
  339. return True
  340. try:
  341. requests.get('https://%s/' % registry_host)
  342. except requests.exceptions.SSLError:
  343. self.insecure_registries.add(registry_host)
  344. return True
  345. except Exception:
  346. # for any other error assume it is a secure registry, because:
  347. # - it is secure registry
  348. # - the host is not accessible
  349. pass
  350. self.secure_registries.add(registry_host)
  351. return False
  352. def docker_upload(args):
  353. return DockerImageUploader.upload_image(*args)
  354. def discover_tag_from_inspect(args):
  355. image, tag_from_label, insecure_registries = args
  356. image_url = DockerImageUploader._image_to_url(image)
  357. insecure = image_url.netloc in insecure_registries
  358. i = DockerImageUploader._inspect(image_url.geturl(), insecure)
  359. if ':' in image_url.path:
  360. # break out the tag from the url to be the fallback tag
  361. path = image.rpartition(':')
  362. fallback_tag = path[2]
  363. image = path[0]
  364. else:
  365. fallback_tag = None
  366. return image, DockerImageUploader._discover_tag_from_inspect(
  367. i, image, tag_from_label, fallback_tag)