Bump pool_maxsize for K8sClient to 1000

We're running kuryr-controller in highly multithreading way. Turns out
in that cases requests' default for pool_maxsize=10 is not enough and we
start to see warnings like:

 WARNING:requests.packages.urllib3.connectionpool:HttpConnectionPool is
 full, discarding connection:

This patch attempts to get rid of them by bumping pool_maxsize to 1000.

Change-Id: I7f4e150ed9f9e26db5222014b6ccff013882ecfa
This commit is contained in:
Michał Dulko 2019-11-22 15:19:14 +01:00
parent 28b27c5de2
commit f24d8242ff
2 changed files with 48 additions and 39 deletions

View File

@ -16,10 +16,12 @@ import contextlib
import itertools
import os
import ssl
from urllib import parse
from oslo_log import log as logging
from oslo_serialization import jsonutils
import requests
from requests import adapters
from kuryr.lib._i18n import _
from kuryr_kubernetes import config
@ -43,6 +45,12 @@ class K8sClient(object):
token_file = config.CONF.kubernetes.token_file
self.token = None
self.cert = (None, None)
# Setting higher numbers regarding connection pools as we're running
# with max of 1000 green threads.
self.session = requests.Session()
prefix = '%s://' % parse.urlparse(base_url).scheme
self.session.mount(prefix, adapters.HTTPAdapter(pool_maxsize=1000))
if token_file:
if os.path.exists(token_file):
with open(token_file, 'r') as f:
@ -76,9 +84,9 @@ class K8sClient(object):
header.update({'Authorization': 'Bearer %s' % self.token})
if headers:
header.update(headers)
response = requests.get(url, cert=self.cert,
verify=self.verify_server,
headers=header)
response = self.session.get(url, cert=self.cert,
verify=self.verify_server,
headers=header)
if response.status_code == requests.codes.not_found:
raise exc.K8sResourceNotFound(response.text)
if not response.ok:
@ -102,9 +110,9 @@ class K8sClient(object):
path = path + '/' + str(field)
content_type = 'application/merge-patch+json'
url, header = self._get_url_and_header(path, content_type)
response = requests.patch(url, json={field: data},
headers=header, cert=self.cert,
verify=self.verify_server)
response = self.session.patch(url, json={field: data},
headers=header, cert=self.cert,
verify=self.verify_server)
if response.ok:
return response.json().get('status')
raise exc.K8sClientException(response.text)
@ -121,9 +129,9 @@ class K8sClient(object):
LOG.debug("Patch %(path)s: %(data)s", {
'path': path, 'data': data})
response = requests.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert,
verify=self.verify_server)
response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert,
verify=self.verify_server)
if response.ok:
return response.json().get('status')
raise exc.K8sClientException(response.text)
@ -138,9 +146,9 @@ class K8sClient(object):
'path': '/metadata/annotations/{}'.format(annotation_name),
'value': value}]
response = requests.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert,
verify=self.verify_server)
response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert,
verify=self.verify_server)
if response.ok:
return response.json().get('status')
raise exc.K8sClientException(response.text)
@ -153,9 +161,9 @@ class K8sClient(object):
data = [{'op': 'remove',
'path': '/metadata/annotations/{}'.format(annotation_name)}]
response = requests.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert,
verify=self.verify_server)
response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert,
verify=self.verify_server)
if response.ok:
return response.json().get('status')
raise exc.K8sClientException(response.text)
@ -167,8 +175,8 @@ class K8sClient(object):
if self.token:
header.update({'Authorization': 'Bearer %s' % self.token})
response = requests.post(url, json=body, cert=self.cert,
verify=self.verify_server, headers=header)
response = self.session.post(url, json=body, cert=self.cert,
verify=self.verify_server, headers=header)
if response.ok:
return response.json()
raise exc.K8sClientException(response)
@ -180,8 +188,9 @@ class K8sClient(object):
if self.token:
header.update({'Authorization': 'Bearer %s' % self.token})
response = requests.delete(url, cert=self.cert,
verify=self.verify_server, headers=header)
response = self.session.delete(url, cert=self.cert,
verify=self.verify_server,
headers=header)
if response.ok:
return response.json()
else:
@ -208,9 +217,9 @@ class K8sClient(object):
if resource_version:
metadata['resourceVersion'] = resource_version
data = jsonutils.dumps({"metadata": metadata}, sort_keys=True)
response = requests.patch(url, data=data,
headers=header, cert=self.cert,
verify=self.verify_server)
response = self.session.patch(url, data=data,
headers=header, cert=self.cert,
verify=self.verify_server)
if response.ok:
return response.json()['metadata']['annotations']
if response.status_code == requests.codes.conflict:
@ -256,7 +265,7 @@ class K8sClient(object):
if resource_version:
params['resourceVersion'] = resource_version
with contextlib.closing(
requests.get(
self.session.get(
url, params=params, stream=True, cert=self.cert,
verify=self.verify_server, headers=header,
timeout=timeouts)) as response:

View File

@ -68,7 +68,7 @@ class TestK8sClient(test_base.TestCase):
m_exist.return_value = True
self.assertRaises(RuntimeError, k8s_client.K8sClient, self.base_url)
@mock.patch('requests.get')
@mock.patch('requests.sessions.Session.get')
@mock.patch('kuryr_kubernetes.config.CONF')
def test_bearer_token(self, m_cfg, m_get):
token_content = (
@ -104,7 +104,7 @@ class TestK8sClient(test_base.TestCase):
finally:
os.unlink(m_cfg.kubernetes.token_file)
@mock.patch('requests.get')
@mock.patch('requests.sessions.Session.get')
def test_get(self, m_get):
path = '/test'
ret = {'test': 'value'}
@ -119,7 +119,7 @@ class TestK8sClient(test_base.TestCase):
self.base_url + path,
cert=(None, None), headers={}, verify=False)
@mock.patch('requests.get')
@mock.patch('requests.sessions.Session.get')
def test_get_exception(self, m_get):
path = '/test'
@ -130,7 +130,7 @@ class TestK8sClient(test_base.TestCase):
self.assertRaises(exc.K8sClientException, self.client.get, path)
@mock.patch('itertools.count')
@mock.patch('requests.patch')
@mock.patch('requests.sessions.Session.patch')
def test_annotate(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
@ -152,7 +152,7 @@ class TestK8sClient(test_base.TestCase):
cert=(None, None), verify=False)
@mock.patch('itertools.count')
@mock.patch('requests.patch')
@mock.patch('requests.sessions.Session.patch')
def test_annotate_exception(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
@ -165,7 +165,7 @@ class TestK8sClient(test_base.TestCase):
path, {})
@mock.patch('itertools.count')
@mock.patch('requests.patch')
@mock.patch('requests.sessions.Session.patch')
def test_annotate_diff_resource_vers_no_conflict(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
@ -200,7 +200,7 @@ class TestK8sClient(test_base.TestCase):
cert=(None, None), verify=False)])
@mock.patch('itertools.count')
@mock.patch('requests.patch')
@mock.patch('requests.sessions.Session.patch')
def test_annotate_diff_resource_vers_no_annotation(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
@ -244,7 +244,7 @@ class TestK8sClient(test_base.TestCase):
cert=(None, None), verify=False)])
@mock.patch('itertools.count')
@mock.patch('requests.patch')
@mock.patch('requests.sessions.Session.patch')
def test_annotate_diff_resource_vers_conflict(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
@ -288,7 +288,7 @@ class TestK8sClient(test_base.TestCase):
cert=(None, None), verify=False)])
@mock.patch('itertools.count')
@mock.patch('requests.patch')
@mock.patch('requests.sessions.Session.patch')
def test_annotate_resource_not_found(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
@ -314,7 +314,7 @@ class TestK8sClient(test_base.TestCase):
headers=mock.ANY,
cert=(None, None), verify=False)
@mock.patch('requests.get')
@mock.patch('requests.sessions.Session.get')
def test_watch(self, m_get):
path = '/test'
data = [{'obj': 'obj%s' % i} for i in range(3)]
@ -337,7 +337,7 @@ class TestK8sClient(test_base.TestCase):
params={'watch': 'true'}, cert=(None, None),
verify=False, timeout=(30, 60))
@mock.patch('requests.get')
@mock.patch('requests.sessions.Session.get')
def test_watch_restart(self, m_get):
path = '/test'
data = [{'object': {'metadata': {'name': 'obj%s' % i,
@ -364,7 +364,7 @@ class TestK8sClient(test_base.TestCase):
params={"watch": "true", "resourceVersion": 2}, cert=(None, None),
verify=False, timeout=(30, 60))
@mock.patch('requests.get')
@mock.patch('requests.sessions.Session.get')
def test_watch_exception(self, m_get):
path = '/test'
@ -375,7 +375,7 @@ class TestK8sClient(test_base.TestCase):
self.assertRaises(exc.K8sClientException, next,
self.client.watch(path))
@mock.patch('requests.post')
@mock.patch('requests.sessions.Session.post')
def test_post(self, m_post):
path = '/test'
body = {'test': 'body'}
@ -391,7 +391,7 @@ class TestK8sClient(test_base.TestCase):
headers=mock.ANY, cert=(None, None),
verify=False)
@mock.patch('requests.post')
@mock.patch('requests.sessions.Session.post')
def test_post_exception(self, m_post):
path = '/test'
body = {'test': 'body'}
@ -403,7 +403,7 @@ class TestK8sClient(test_base.TestCase):
self.assertRaises(exc.K8sClientException,
self.client.post, path, body)
@mock.patch('requests.delete')
@mock.patch('requests.sessions.Session.delete')
def test_delete(self, m_delete):
path = '/test'
ret = {'test': 'value'}
@ -418,7 +418,7 @@ class TestK8sClient(test_base.TestCase):
headers=mock.ANY, cert=(None, None),
verify=False)
@mock.patch('requests.delete')
@mock.patch('requests.sessions.Session.delete')
def test_delete_exception(self, m_delete):
path = '/test'