Ignore or replace already deployed entities

If --force flag is provided, ZooKeeper is cleaned and all
apps and jobs are erased and then runned again.

Without any flags, already running apps and jobs will be
just ignored.

Change-Id: I05bd55662618bb791a63523f7a1252551e65e69d
Closes-Bug: #1523997
This commit is contained in:
Michal Rostecki 2015-12-08 17:52:36 +01:00
parent c05b25f457
commit c76ac4abcd
15 changed files with 427 additions and 34 deletions

View File

@ -14,12 +14,23 @@
# remove this module when possible.
import json
import logging
import operator
from oslo_config import cfg
import requests
import six
from six.moves.urllib import parse
from kolla_mesos.common import retry_utils
from kolla_mesos import exception
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)
CONF = cfg.CONF
CONF.import_group('chronos', 'kolla_mesos.config.chronos')
class Client(object):
@ -39,18 +50,45 @@ class Client(object):
"""
return parse.urljoin(CONF.chronos.host, path)
@retry_utils.retry_if_not_rollback(stop_max_attempt_number=5,
wait_fixed=1000)
def add_job(self, job_resource):
"""Add job to Chronos.
:param job_resource: data about job to run on Chronos
:type job_resource: dict
"""
url = self._create_url('scheduler/iso8601')
response = requests.post(url, data=json.dumps(job_resource),
timeout=CONF.chronos.timeout,
headers={'Content-Type': 'application/json'})
job_name = job_resource['name']
assert response.status_code in [200, 204]
old_job = self.get_job(job_name)
if old_job is None:
url = self._create_url('scheduler/iso8601')
response = requests.post(url, data=json.dumps(job_resource),
timeout=CONF.chronos.timeout,
headers={'Content-Type':
'application/json'})
if response.status_code not in [200, 204]:
raise exception.ChronosException('Failed to add job')
else:
if CONF.force:
LOG.info('Deployment found and --force flag is used. '
'Destroying previous deployment and re-creating it.')
raise exception.ChronosRollback()
else:
LOG.info('Job %s is already added. If you want to replace it, '
'please use --force flag', job_name)
return old_job
def get_job(self, job_name):
"""Get job from Chronos by name.
:param job_name: id of job to get
:type job_name: str
"""
jobs = self.get_jobs()
return next((job for job in jobs if job['name'] == job_name), None)
def get_jobs(self):
"""Get list of running jobs in Chronos"""
@ -58,3 +96,35 @@ class Client(object):
response = requests.get(url, timeout=CONF.chronos.timeout)
return response.json()
def remove_job(self, job_name):
"""Remove job from Chronos.
:param job_name: name of job to delete
:type job_name: str
"""
url = self._create_url('scheduler/job/{}'.format(job_name))
response = requests.delete(url, timeout=CONF.chronos.timeout)
if response.status_code not in [200, 204]:
raise exception.ChronosException('Failed to remove job')
def remove_job_tasks(self, job_name):
"""Remove all tasks for a job.
:param job_name: name of job to delete tasks from
:type job_name: str
"""
url = self._create_url('scheduler/task/kill/{}'.format(job_name))
response = requests.delete(url, timeout=CONF.chronos.timeout)
if response.status_code not in [200, 204]:
raise exception.ChronosException('Failed to remove tasks from job')
def remove_all_jobs(self, with_tasks=True):
job_names = six.moves.map(operator.itemgetter('name'), self.get_jobs())
for job_name in job_names:
if with_tasks:
self.remove_job_tasks(job_name)
self.remove_job(job_name)

View File

@ -32,7 +32,7 @@ CONF.import_opt('show', 'kolla_mesos.config.config_cli')
def main():
CONF(sys.argv[1:], project='kolla-mesos')
with zk_utils.connection(CONF.zookeeper.host) as zk:
with zk_utils.connection() as zk:
if CONF.show:
zk_utils.cat(zk, CONF.path)
else:

View File

@ -22,6 +22,7 @@ import tempfile
import time
from oslo_config import cfg
import retrying
import shutil
from six.moves import configparser
from six.moves import cStringIO
@ -31,6 +32,7 @@ from kolla_mesos import chronos
from kolla_mesos.common import file_utils
from kolla_mesos.common import jinja_utils
from kolla_mesos.common import zk_utils
from kolla_mesos import exception
from kolla_mesos import marathon
@ -46,6 +48,8 @@ CONF.import_group('profiles', 'kolla_mesos.config.profiles')
CONF.import_group('zookeeper', 'kolla_mesos.config.zookeeper')
CONF.import_group('marathon', 'kolla_mesos.config.marathon')
CONF.import_group('chronos', 'kolla_mesos.config.chronos')
CONF.import_opt('update', 'kolla_mesos.config.deploy_cli')
CONF.import_opt('force', 'kolla_mesos.config.deploy_cli')
class KollaDirNotFoundException(Exception):
@ -59,7 +63,7 @@ class KollaWorker(object):
self.config_dir = os.path.join(self.base_dir, 'config')
LOG.debug("Kolla-Mesos base directory: " + self.base_dir)
self.required_vars = {}
self.marathon_client = marathon.create_client()
self.marathon_client = marathon.Client()
self.chronos_client = chronos.Client()
def setup_working_dir(self):
@ -208,14 +212,19 @@ class KollaWorker(object):
f.write(content)
LOG.info('Written OpenStack env to "openrc"')
def cleanup(self):
def cleanup_temp_files(self):
"""Remove temp files"""
shutil.rmtree(self.temp_dir)
def cleanup(self):
with zk_utils.connection() as zk:
zk_utils.clean(zk)
self.marathon_client.remove_all_apps()
self.chronos_client.remove_all_jobs()
def write_to_zookeeper(self):
with zk_utils.connection(CONF.zookeeper.host) as zk:
# to clean these up, uncomment
zk.delete('/kolla', recursive=True)
with zk_utils.connection() as zk:
zk_utils.clean(zk)
self.write_config_to_zookeeper(zk)
@ -235,29 +244,42 @@ class KollaWorker(object):
except Exception as te:
LOG.error('%s=%s -> %s' % (var_path, var_value, te))
def _start_marathon_app(self, app_resource):
if CONF.update:
LOG.info('Applications upgrade is not implemented '
'yet!')
else:
try:
return self.marathon_client.add_app(app_resource)
except exception.MarathonRollback as e:
self.cleanup()
self.write_to_zookeeper()
raise e
def _start_chronos_job(self, job_resource):
if CONF.update:
LOG.info('Bootstrap tasks for upgrade are not implemented yet!')
else:
try:
return self.chronos_client.add_job(job_resource)
except exception.ChronosRollback as e:
self.cleanup()
self.write_to_zookeeper()
raise e
@retrying.retry(stop_max_attempt_number=5)
def start(self):
# find all marathon files and run.
# find all cronos files and run.
marathon_api = CONF.marathon.host
chronos_api = CONF.chronos.host
content_type = '-L -H "Content-type: application/json"'
for root, dirs, names in os.walk(self.temp_dir):
for name in names:
app_path = os.path.join(root, name)
# this is lazy, I could use requests or the native client.
with open(app_path, 'r') as app_file:
app_resource = json.load(app_file)
if 'marathon' in name:
cmd = 'curl -X POST "%s/v2/apps" -d @"%s" %s' % (
marathon_api, app_path, content_type)
with open(app_path, 'r') as app_file:
app_resource = json.load(app_file)
self.marathon_client.add_app(app_resource)
self._start_marathon_app(app_resource)
else:
cmd = 'curl -X POST "%s/scheduler/iso8601" -d @"%s" %s' % (
chronos_api, app_path, content_type)
with open(app_path, 'r') as app_file:
job_resource = json.load(app_file)
self.chronos_client.add_job(job_resource)
LOG.info(cmd)
self._start_chronos_job(app_resource)
def main():
@ -268,7 +290,7 @@ def main():
kolla.write_openrc()
kolla.start()
# kolla.cleanup()
# kolla.cleanup_temp_files()
if __name__ == '__main__':

View File

@ -0,0 +1,35 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import retrying
from kolla_mesos import exception as kolla_mesos_exc
def check_if_not_rollback(exception):
rollback_exceptions = [kolla_mesos_exc.MarathonRollback,
kolla_mesos_exc.ChronosRollback]
return all(not isinstance(exception, exception_cls)
for exception_cls in rollback_exceptions)
def retry_if_not_rollback(*retry_args, **retry_kwargs):
def wrapper(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
retry_kwargs['retry_on_exception'] = check_if_not_rollback
decorated_function = retrying.retry(*retry_args, **retry_kwargs)(f)
return decorated_function(*args, **kwargs)
return wrapped
return wrapper

View File

@ -17,9 +17,13 @@ import os.path
from kazoo import client
from kazoo import exceptions
from oslo_config import cfg
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_group('zookeeper', 'kolla_mesos.config.zookeeper')
def tree(zk, path=None, level=0, pretty=None):
if path is None:
@ -68,9 +72,13 @@ def copy_tree(zk, source_path, dest_path):
zk.set(dest_node, src_fp.read())
def clean(zk, path='/kolla'):
zk.delete(path, recursive=True)
@contextlib.contextmanager
def connection(zk_hosts):
zk = client.KazooClient(hosts=zk_hosts)
def connection():
zk = client.KazooClient(hosts=CONF.zookeeper.host)
try:
zk.start()
yield zk

View File

@ -23,7 +23,7 @@ chronos_opts = [
default=CHRONOS_URL,
help='Chronos connection URL (http://host:port)'),
cfg.IntOpt('timeout',
default=5,
default=30,
help='Timeout for the request to the Chronos API')
]
chronos_opt_group = cfg.OptGroup(name='chronos',

View File

@ -0,0 +1,23 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
CONF = cfg.CONF
deploy_cli_opts = [
cfg.BoolOpt('update',
default=False),
cfg.BoolOpt('force',
default=False)
]
CONF.register_cli_opts(deploy_cli_opts)

23
kolla_mesos/exception.py Normal file
View File

@ -0,0 +1,23 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class MarathonRollback(Exception):
pass
class ChronosException(Exception):
pass
class ChronosRollback(Exception):
pass

View File

@ -10,13 +10,53 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import operator
from dcos import errors as dcos_exc
from dcos import marathon
from oslo_config import cfg
import six
from kolla_mesos.common import retry_utils
from kolla_mesos import exception as kolla_mesos_exc
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)
CONF = cfg.CONF
CONF.import_group('marathon', 'kolla_mesos.config.marathon')
def create_client():
"""Create Marathon client object with parameters from configuration"""
return marathon.Client(CONF.marathon.host, timeout=CONF.marathon.timeout)
class Client(marathon.Client):
"""Marathon client with parameters from configuration"""
def __init__(self, *args, **kwargs):
kwargs['timeout'] = CONF.marathon.timeout
super(Client, self).__init__(CONF.marathon.host, *args, **kwargs)
@retry_utils.retry_if_not_rollback(stop_max_attempt_number=5,
wait_fixed=1000)
def add_app(self, app_resource):
app_id = app_resource['id']
# Check if the app already exists
try:
old_app = self.get_app(app_id)
except dcos_exc.DCOSException:
return super(Client, self).add_app(app_resource)
else:
if CONF.force:
LOG.info('Deployment found and --force flag is used. '
'Destroying previous deployment and re-creating it.')
raise kolla_mesos_exc.MarathonRollback()
else:
LOG.info('App %s is already deployed. If you want to '
'replace it, please use --force flag.', app_id)
return old_app
def remove_all_apps(self):
apps_ids = six.moves.map(operator.itemgetter('id'), self.get_apps())
for app_id in apps_ids:
self.remove_app(app_id, force=True)

View File

@ -10,9 +10,21 @@
# License for the specific language governing permissions and limitations
# under the License.
import testscenarios
import contextlib
from oslotest import base
import six
import testscenarios
# Python 3, thank you for dropping contextlib.nested
if six.PY3:
@contextlib.contextmanager
def nested(*contexts):
with contextlib.ExitStack() as stack:
yield [stack.enter_context(c) for c in contexts]
else:
nested = contextlib.nested
class BaseTestCase(testscenarios.WithScenarios,

View File

@ -0,0 +1,24 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kolla_mesos.common import retry_utils
from kolla_mesos import exception
from kolla_mesos.tests import base
class TestRetryUtils(base.BaseTestCase):
def test_check_if_not_rollback(self):
self.assertTrue(retry_utils.check_if_not_rollback(Exception()))
self.assertTrue(retry_utils.check_if_not_rollback(OSError()))
self.assertFalse(retry_utils.check_if_not_rollback(
exception.MarathonRollback()))

View File

@ -0,0 +1,29 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from kolla_mesos.tests import base
CONF = cfg.CONF
CONF.import_opt('update', 'kolla_mesos.config.deploy_cli')
CONF.import_opt('force', 'kolla_mesos.config.deploy_cli')
class TestConfigCliConfig(base.BaseTestCase):
def test_config_cli(self):
argv = ['--update', '--force']
CONF(argv, project='kolla-mesos')
self.assertTrue(CONF.update)
self.assertTrue(CONF.force)

View File

@ -10,10 +10,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
import requests_mock
from kolla_mesos import chronos
from kolla_mesos import exception
from kolla_mesos.tests import base
@ -64,9 +66,33 @@ class TestClient(base.BaseTestCase):
@requests_mock.mock()
def test_add_job(self, req_mock):
req_mock.get('http://localhost:4400/scheduler/jobs', json=[])
req_mock.post('http://localhost:4400/scheduler/iso8601')
self.client.add_job(EXAMPLE_CHRONOS_JOB)
@mock.patch.object(chronos, 'LOG')
@requests_mock.mock()
def test_add_job_already_existing(self, log_mock, req_mock):
req_mock.get('http://localhost:4400/scheduler/jobs', json=[{
'name': '/keystone-bootstrap'
}])
req_mock.post('http://localhost:4400/scheduler/iso8601')
self.client.add_job(EXAMPLE_CHRONOS_JOB)
log_mock.info.assert_called_with('Job %s is already added. If you '
'want to replace it, please use '
'--force flag',
'/keystone-bootstrap')
@requests_mock.mock()
def test_add_job_already_existing_force(self, req_mock):
CONF.set_override('force', True)
req_mock.get('http://localhost:4400/scheduler/jobs', json=[{
'name': '/keystone-bootstrap'
}])
req_mock.post('http://localhost:4400/scheduler/iso8601')
self.assertRaises(exception.ChronosRollback, self.client.add_job,
EXAMPLE_CHRONOS_JOB)
@requests_mock.mock()
def test_get_jobs(self, req_mock):
req_mock.get('http://localhost:4400/scheduler/jobs',

View File

@ -0,0 +1,80 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dcos import errors
import mock
from oslo_config import cfg
import requests_mock
from kolla_mesos import exception
from kolla_mesos import marathon
from kolla_mesos.tests import base
CONF = cfg.CONF
def add_app(app):
return app
class TestMarathonClient(base.BaseTestCase):
@requests_mock.mock()
def setUp(self, req_mock):
super(TestMarathonClient, self).setUp()
CONF.set_override('host', 'http://127.0.0.1:8080', group='marathon')
req_mock.get('http://127.0.0.1:8080/v2/info', json={
'version': '0.11.0'
})
self.client = marathon.Client()
@mock.patch('dcos.marathon.Client.add_app')
def test_add_app(self, dcos_add_app_mock):
dcos_add_app_mock.side_effect = add_app
with mock.patch.object(
self.client, 'get_app', side_effect=errors.DCOSException()
):
app = self.client.add_app({'id': 'my-new-app'})
self.assertDictEqual(app, {'id': 'my-new-app'})
@mock.patch.object(marathon, 'LOG')
def test_add_app_already_existing(self, log_mock):
with mock.patch.object(
self.client, 'get_app', return_value={'id': 'my-app',
'other_param': 'the-old-one'}
):
app = self.client.add_app({'id': 'my-app',
'other_param': 'the-new-one'})
self.assertDictEqual(app, {'id': 'my-app',
'other_param': 'the-old-one'})
log_mock.info.assert_called_with('App %s is already deployed. '
'If you want to replace it, please '
'use --force flag.', 'my-app')
@mock.patch('dcos.marathon.Client.add_app')
def test_add_app_already_existing_force(self, dcos_add_app_mock):
CONF.set_override('force', True)
dcos_add_app_mock.side_effect = add_app
with base.nested(mock.patch.object(
self.client, 'get_app', return_value={'id': 'my-app',
'other_param': 'the-old-one'}
), mock.patch.object(self.client, 'remove_app')):
self.assertRaises(exception.MarathonRollback,
self.client.add_app,
{'id': 'my-app', 'other_param': 'the-new-one'})

View File

@ -11,4 +11,5 @@ netifaces>=0.10.4
oslo.config>=2.7.0 # Apache-2.0
oslo.utils>=2.8.0 # Apache-2.0
PyYAML>=3.1.0
retrying>=1.2.3,!=1.3.0 # Apache-2.0
six>=1.9.0