Browse Source

Merge "Timeout connections when watching K8s API"

tags/1.1.0
Zuul 1 week ago
parent
commit
039f2aaa3d

+ 12
- 0
kuryr_kubernetes/config.py View File

@@ -155,6 +155,18 @@ k8s_opts = [
155 155
     cfg.IntOpt('watch_retry_timeout',
156 156
                help=_('Time (in seconds) the watcher retries watching for.'),
157 157
                default=60),
158
+    cfg.IntOpt('watch_connection_timeout',
159
+               help=_('TCP connection timeout (in seconds) for the watcher '
160
+                      'connections to K8s API.'),
161
+               default=30),
162
+    cfg.IntOpt('watch_read_timeout',
163
+               help=_('TCP read timeout (in seconds) for the watcher '
164
+                      'connections to K8s API. This affects reaction to time '
165
+                      'when there are no events being streamed from K8s API. '
166
+                      'When too low, Kuryr will reconnect more often. When '
167
+                      'too high, Kuryr will take longer to reconnect when K8s '
168
+                      'API stream was being silently broken.'),
169
+               default=60),
158 170
     cfg.ListOpt('enabled_handlers',
159 171
                 help=_("The comma-separated handlers that should be "
160 172
                        "registered for watching in the pipeline."),

+ 32
- 12
kuryr_kubernetes/k8s_client.py View File

@@ -15,6 +15,7 @@
15 15
 import contextlib
16 16
 import itertools
17 17
 import os
18
+import ssl
18 19
 
19 20
 from oslo_log import log as logging
20 21
 from oslo_serialization import jsonutils
@@ -25,6 +26,7 @@ from kuryr_kubernetes import config
25 26
 from kuryr_kubernetes import constants
26 27
 from kuryr_kubernetes import exceptions as exc
27 28
 
29
+CONF = config.CONF
28 30
 LOG = logging.getLogger(__name__)
29 31
 
30 32
 
@@ -240,21 +242,39 @@ class K8sClient(object):
240 242
                 raise exc.K8sClientException(response.text)
241 243
 
242 244
     def watch(self, path):
243
-        params = {'watch': 'true'}
244 245
         url = self._base_url + path
246
+        resource_version = None
245 247
         header = {}
248
+        timeouts = (CONF.kubernetes.watch_connection_timeout,
249
+                    CONF.kubernetes.watch_read_timeout)
246 250
         if self.token:
247 251
             header.update({'Authorization': 'Bearer %s' % self.token})
248 252
 
249
-        # TODO(ivc): handle connection errors and retry on failure
250 253
         while True:
251
-            with contextlib.closing(
252
-                    requests.get(url, params=params, stream=True,
253
-                                 cert=self.cert, verify=self.verify_server,
254
-                                 headers=header)) as response:
255
-                if not response.ok:
256
-                    raise exc.K8sClientException(response.text)
257
-                for line in response.iter_lines():
258
-                    line = line.decode('utf-8').strip()
259
-                    if line:
260
-                        yield jsonutils.loads(line)
254
+            try:
255
+                params = {'watch': 'true'}
256
+                if resource_version:
257
+                    params['resourceVersion'] = resource_version
258
+                with contextlib.closing(
259
+                        requests.get(
260
+                            url, params=params, stream=True, cert=self.cert,
261
+                            verify=self.verify_server, headers=header,
262
+                            timeout=timeouts)) as response:
263
+                    if not response.ok:
264
+                        raise exc.K8sClientException(response.text)
265
+                    for line in response.iter_lines():
266
+                        line = line.decode('utf-8').strip()
267
+                        if line:
268
+                            line_dict = jsonutils.loads(line)
269
+                            yield line_dict
270
+                            # Saving the resourceVersion in case of a restart.
271
+                            # At this point it's safely passed to handler.
272
+                            m = line_dict.get('object', {}).get('metadata', {})
273
+                            resource_version = m.get('resourceVersion', None)
274
+            except (requests.ReadTimeout, ssl.SSLError) as e:
275
+                if isinstance(e, ssl.SSLError) and e.args != ('timed out',):
276
+                    raise
277
+
278
+                LOG.warning('%ds without data received from watching %s. '
279
+                            'Retrying the connection with resourceVersion=%s.',
280
+                            timeouts[1], path, params.get('resourceVersion'))

+ 28
- 1
kuryr_kubernetes/tests/unit/test_k8s_client.py View File

@@ -335,7 +335,34 @@ class TestK8sClient(test_base.TestCase):
335 335
         self.assertEqual(cycles, m_resp.close.call_count)
336 336
         m_get.assert_called_with(self.base_url + path, headers={}, stream=True,
337 337
                                  params={'watch': 'true'}, cert=(None, None),
338
-                                 verify=False)
338
+                                 verify=False, timeout=(30, 60))
339
+
340
+    @mock.patch('requests.get')
341
+    def test_watch_restart(self, m_get):
342
+        path = '/test'
343
+        data = [{'object': {'metadata': {'name': 'obj%s' % i,
344
+                                         'resourceVersion': i}}}
345
+                for i in range(3)]
346
+        lines = [jsonutils.dump_as_bytes(i) for i in data]
347
+
348
+        m_resp = mock.MagicMock()
349
+        m_resp.ok = True
350
+        m_resp.iter_lines.side_effect = [lines, requests.ReadTimeout, lines]
351
+        m_get.return_value = m_resp
352
+
353
+        self.assertEqual(data * 2,
354
+                         list(itertools.islice(self.client.watch(path),
355
+                                               len(data) * 2)))
356
+        self.assertEqual(3, m_get.call_count)
357
+        self.assertEqual(3, m_resp.close.call_count)
358
+        m_get.assert_any_call(
359
+            self.base_url + path, headers={}, stream=True,
360
+            params={"watch": "true"}, cert=(None, None), verify=False,
361
+            timeout=(30, 60))
362
+        m_get.assert_any_call(
363
+            self.base_url + path, headers={}, stream=True,
364
+            params={"watch": "true", "resourceVersion": 2}, cert=(None, None),
365
+            verify=False, timeout=(30, 60))
339 366
 
340 367
     @mock.patch('requests.get')
341 368
     def test_watch_exception(self, m_get):

Loading…
Cancel
Save