From d1b30e4be0f0431a93f538fa84fce9fbe0cdfead Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 11 Mar 2016 13:45:36 +0200 Subject: [PATCH] Rework staging procedure to support both implicit and explicit stages Current change addresses several problems - - It was impossible to re-stage already commited resources. For example re-run openstack actions without doing any artificial updates in solar inputs. - Also there was no way to execute actions that are not related to state in solar (run/update/remove). An example would be - restart of services. - And following changes addresses isolation problem in staging procedure. By design solar is isolated using tags semantics, but previous implemention of *process* was building a graph unconditionally for all staged resources. It was reworked and now we can support partial processing of resources, based on tags. Implicit staging will be done when resource is update/created/removed. Additionally actions can be staged using solar ch stage command, to support this additional flags were added: --action, -a - action that should be staged --tag, -t - tags to select group of resources --name, -n - resource that will be staged Only one from name or tag will be used, if user will provide both - name will be of higher priority Reverts and discard are working as previously for creation/update/removal of resources, but Exception will be raised if revert will be attempted for custom action, such as *restart*. Processing staged items can be achieved with - solar ch process -t tag1 -t tag2 History and staged log items will be stored in different buckets. Custom siblings resolved for LogItem will ensure that there is only one resource action is staged. implements blueprint refactor-process-of-staging-changes Change-Id: I9e634803a38d80213b87518cd2c8fdc022237aa0 --- solar/cli/system_log.py | 15 +- solar/core/resource/__init__.py | 8 +- solar/core/resource/composer.py | 4 +- solar/core/resource/resource.py | 64 +++- solar/dblayer/solar_models.py | 105 +++++-- solar/dblayer/test/test_log.py | 87 +----- solar/events/controls.py | 5 +- solar/system_log/change.py | 156 ++++++---- solar/system_log/data.py | 14 +- solar/system_log/operations.py | 48 ++- .../test_complete_solar_workflow.py | 6 +- solar/test/test_system_log_api.py | 292 +++++++----------- 12 files changed, 407 insertions(+), 397 deletions(-) diff --git a/solar/cli/system_log.py b/solar/cli/system_log.py index a48e3fbb..dc059656 100644 --- a/solar/cli/system_log.py +++ b/solar/cli/system_log.py @@ -39,10 +39,14 @@ def validate(): @changes.command() +@click.option('--action', '-a', default=None, help='resource action') +@click.option('--name', '-n', default=None, help='resource name') +@click.option('--tag', '-t', multiple=True, help='resource tags') @click.option('-d', default=False, is_flag=True, help='detailed view') -def stage(d): - log = change.stage_changes() - log.reverse() +def stage(action, name, tag, d): + if action and (name or tag): + resource.stage_resources(name or tag, action) + log = change.staged_log(populate_with_changes=True) for item in log: click.echo(data.compact(item)) if d: @@ -65,8 +69,9 @@ def staged_item(uid): @changes.command() -def process(): - uid = change.send_to_orchestration().graph['uid'] +@click.option('--tag', '-t', multiple=True, help='resource tags') +def process(tag): + uid = change.send_to_orchestration(tag).graph['uid'] remember_uid(uid) click.echo(uid) diff --git a/solar/core/resource/__init__.py b/solar/core/resource/__init__.py index cbf9c53e..34474a52 100644 --- a/solar/core/resource/__init__.py +++ b/solar/core/resource/__init__.py @@ -15,9 +15,10 @@ from solar.core.resource.resource import load from solar.core.resource.resource import load_all from solar.core.resource.resource import load_by_tags -from solar.core.resource.resource import load_updated +from solar.core.resource.resource import load_childs from solar.core.resource.resource import Resource from solar.core.resource.resource import RESOURCE_STATE +from solar.core.resource.resource import stage_resources from solar.core.resource.resource import validate_resources __all__ = [ @@ -26,6 +27,7 @@ __all__ = [ 'load', 'load_all', 'load_by_tags', - 'load_updated', - 'validate_resources' + 'load_childs', + 'validate_resources', + 'stage_resources', ] diff --git a/solar/core/resource/composer.py b/solar/core/resource/composer.py index bb52baf6..5b39fe9a 100644 --- a/solar/core/resource/composer.py +++ b/solar/core/resource/composer.py @@ -182,7 +182,7 @@ def _get_template(name, content, kwargs, inputs): def create_resources(base_path, resources, tags=None): - + add_tags = tags created_resources = [] for r in resources: resource_name = r['id'] @@ -191,6 +191,8 @@ def create_resources(base_path, resources, tags=None): values_from = r.get('values_from') spec = r.get('from', None) tags = r.get('tags', []) + if add_tags: + tags.extend(add_tags) is_composer_file = False if spec.startswith('./') or spec.endswith('.yaml'): spec = os.path.join(base_path, '..', spec) diff --git a/solar/core/resource/resource.py b/solar/core/resource/resource.py index de0b4eef..ea5f777c 100644 --- a/solar/core/resource/resource.py +++ b/solar/core/resource/resource.py @@ -33,6 +33,7 @@ from solar.core import validation from solar.dblayer.model import NONE from solar.dblayer.model import StrInt from solar.dblayer.solar_models import CommitedResource +from solar.dblayer.solar_models import LogItem from solar.dblayer.solar_models import Resource as DBResource from solar.events import api from solar import utils @@ -90,6 +91,11 @@ class Resource(object): self.create_inputs(args) self.db_obj.save() + LogItem.new({ + 'resource': self.name, + 'action': 'run', + 'log': 'staged', + 'tags': self.tags}).save_lazy() # Load def create_from_db(self, resource_db): @@ -209,6 +215,12 @@ class Resource(object): for k, v in args.items(): self.db_obj.inputs[k] = v self.db_obj.save_lazy() + # run and update are same things from solar pov + # so lets remove this redundancy + LogItem.new( + {'resource': self.name, + 'action': 'run', + 'tags': self.tags}).save_lazy() def delete(self): return self.db_obj.delete() @@ -219,6 +231,11 @@ class Resource(object): else: self.db_obj.state = RESOURCE_STATE.removed.name self.db_obj.save_lazy() + LogItem.new( + {'resource': self.name, + 'action': 'remove', + 'log': 'staged', + 'tags': self.tags}).save_lazy() def set_operational(self): self.db_obj.state = RESOURCE_STATE.operational.name @@ -312,6 +329,9 @@ class Resource(object): use_defaults=False): mapping = get_mapping(self, receiver, mapping) self._connect_inputs(receiver, mapping) + LogItem.new({'resource': receiver.name, + 'action': 'run', + 'tags': receiver.tags}).save_lazy() # signals.connect(self, receiver, mapping=mapping) # TODO: implement events if use_defaults: @@ -350,17 +370,9 @@ def load(name): return Resource(r) -def load_updated(since=None, with_childs=True): - if since is None: - startkey = StrInt.p_min() - else: - startkey = since - candids = DBResource.updated.filter(startkey, StrInt.p_max()) - if with_childs: - candids = DBResource.childs(candids) - return [Resource(r) for r in DBResource.multi_get(candids)] - -# TODO +def load_childs(parents): + return [Resource(r) for r in + DBResource.multi_get(DBResource.childs(parents))] def load_all(startswith=None): @@ -380,11 +392,33 @@ def load_by_tags(query): parsed_tags = get_string_tokens(query) r_with_tags = [DBResource.tags.filter(tag) for tag in parsed_tags] r_with_tags = set(itertools.chain(*r_with_tags)) - candids = [Resource(r) for r in DBResource.multi_get(r_with_tags)] + resources = [Resource(r) for r in DBResource.multi_get(r_with_tags)] - nodes = filter( - lambda n: Expression(query, n.tags).evaluate(), candids) - return nodes + return filter(lambda n: Expression(query, n.tags).evaluate(), resources) + + +def stage_resources(resources_query, action): + """Create log items for resources selected by query + :param resources_query: iterable with tags or basestring + :param action: basestring + """ + if isinstance(resources_query, basestring): + resources = [load(resources_query)] + else: + resources = load_by_tags(resources_query) + created = [] + for resource in resources: + # save - cache doesnt cover all query in the same sesssion + # and this query will be triggered right after staging resources + + log_item = LogItem.new( + {'resource': resource.name, + 'action': action, + 'log': 'staged', + 'tags': resource.tags}) + log_item.save() + created.append(log_item) + return created def load_by_names(names): diff --git a/solar/dblayer/solar_models.py b/solar/dblayer/solar_models.py index cd445cb4..a11454e4 100644 --- a/solar/dblayer/solar_models.py +++ b/solar/dblayer/solar_models.py @@ -25,6 +25,9 @@ from enum import Enum from solar.computable_inputs import ComputablePassedTypes from solar.computable_inputs.processor import get_processor from solar.config import C +from solar.core.tags_set_parser import Expression +from solar.core.tags_set_parser import get_string_tokens +from solar.dblayer.conflict_resolution import naive_resolver from solar.dblayer.model import check_state_for from solar.dblayer.model import CompositeIndexField from solar.dblayer.model import DBLayerException @@ -1129,44 +1132,96 @@ class LogItem(Model): action = Field(basestring) diff = Field(list) connections_diff = Field(list) - state = Field(basestring) base_path = Field(basestring) # remove me - updated = Field(StrInt) - history = IndexedField(StrInt) - log = Field(basestring) # staged/history - - composite = CompositeIndexField(fields=('log', 'resource', 'action')) + state = Field(basestring) + # based on tags we will filter staged log items during process part + # of staging changes procedure, it will allow to isolate graphs for + # different parts of infrastructure managed by solar (e.g. cluster) + tags = TagsField(default=list) @property def log_action(self): return '.'.join((self.resource, self.action)) - @classmethod - def history_last(cls): - items = cls.history.filter(StrInt.n_max(), - StrInt.n_min(), - max_results=1) - if not items: - return None - return cls.get(items[0]) - - def save(self): - if any(f in self._modified_fields for f in LogItem.composite.fields): - self.composite.reset() - - if 'log' in self._modified_fields and self.log == 'history': - self.history = StrInt(next(NegativeCounter.get_or_create( - 'history'))) - return super(LogItem, self).save() - @classmethod def new(cls, data): vals = {} if 'uid' not in vals: vals['uid'] = cls.uid.default vals.update(data) - return LogItem.from_dict(vals['uid'], vals) + return LogItem.from_dict( + '{}.{}'.format(vals['resource'], vals['action']), vals) + + @classmethod + def from_dict(cls, key, *args, **kwargs): + if key in cls._c.obj_cache: + return cls._c.obj_cache[key] + return super(LogItem, cls).from_dict(key, *args, **kwargs) + + @classmethod + def get(cls, key): + try: + return super(LogItem, cls).get(key) + except DBLayerException: + return None + + def to_history(self): + return HistoryItem.new( + self.uid, + {'uid': self.uid, + 'resource': self.resource, + 'action': self.action, + 'base_path': self.base_path, + 'diff': self.diff, + 'connections_diff': self.connections_diff}) + + @classmethod + def log_items_by_tags(cls, tags): + query = '|'.join(tags) + parsed_tags = get_string_tokens(query) + log_items = set(map( + cls.get, + chain.from_iterable( + [cls.tags.filter(tag) for tag in parsed_tags]))) + return filter(lambda li: Expression(query, li.tags).evaluate(), + log_items) + + @staticmethod + def conflict_resolver(riak_object): + #: it is safe to pick any log item with data, because the key + # if particular log_action + for sibling in riak_object.siblings: + if sibling.encoded_data: + riak_object.siblings = [sibling] + return + naive_resolver(riak_object) + + +class HistoryItem(Model): + + uid = IndexedField(basestring) + resource = Field(basestring) + action = Field(basestring) + diff = Field(list) + connections_diff = Field(list) + base_path = Field(basestring) # remove me + history = IndexedField(StrInt) + + composite = CompositeIndexField(fields=('resource', 'action')) + + @property + def log_action(self): + return '.'.join((self.resource, self.action)) + + def save(self): + if any(f in self._modified_fields for + f in HistoryItem.composite.fields): + self.composite.reset() + + self.history = StrInt(next(NegativeCounter.get_or_create( + 'history'))) + return super(HistoryItem, self).save() class Lock(Model): diff --git a/solar/dblayer/test/test_log.py b/solar/dblayer/test/test_log.py index 17d65db2..7538b865 100644 --- a/solar/dblayer/test/test_log.py +++ b/solar/dblayer/test/test_log.py @@ -13,54 +13,21 @@ # under the License. from solar.dblayer.model import StrInt -from solar.dblayer.solar_models import LogItem +from solar.dblayer.solar_models import HistoryItem from solar.dblayer.solar_models import NegativeCounter -def test_separate_logs(): +def test_composite_filter(): - history = 'history' - staged = 'staged' - history_uids = set() - staged_uids = set() - for i in range(2): - l = LogItem.new({'log': history}) - l.save() - history_uids.add(l.key) - for i in range(3): - l = LogItem.new({'log': staged}) - l.save() - staged_uids.add(l.key) - - assert set(LogItem.composite.filter({'log': history})) == history_uids - assert set(LogItem.composite.filter({'log': staged})) == staged_uids - - -def test_multiple_filter(): - - l1 = LogItem.new({'log': 'history', 'resource': 'a'}) - l2 = LogItem.new({'log': 'history', 'resource': 'b'}) + l1 = HistoryItem.new('a', {'log': 'history', 'resource': 'a'}) + l2 = HistoryItem.new('b', {'log': 'history', 'resource': 'b'}) l1.save() l2.save() - assert LogItem.composite.filter({'log': 'history', - 'resource': 'a'}) == [l1.key] - assert LogItem.composite.filter({'log': 'history', - 'resource': 'b'}) == [l2.key] - - -def test_changed_index(): - - l = LogItem.new({'log': 'staged', 'resource': 'a', 'action': 'run'}) - l.save() - - assert LogItem.composite.filter({'log': 'staged'}) == [l.key] - - l.log = 'history' - l.save() - - assert LogItem.composite.filter({'log': 'staged'}) == [] - assert LogItem.composite.filter({'log': 'history'}) == [l.key] + assert HistoryItem.composite.filter({'log': 'history', + 'resource': 'a'}) == [l1.key] + assert HistoryItem.composite.filter({'log': 'history', + 'resource': 'b'}) == [l2.key] def test_negative_counter(): @@ -71,40 +38,10 @@ def test_negative_counter(): def test_reversed_order_is_preserved(): added = [] for i in range(4): - li = LogItem.new({'log': 'history'}) + li = HistoryItem.new(str(i), {}) li.save() added.append(li.key) added.reverse() - assert list(LogItem.history.filter(StrInt.n_max(), - StrInt.n_min(), - max_results=2)) == added[:2] - - -def test_staged_not_indexed(): - added = [] - for i in range(3): - li = LogItem.new({'log': 'staged'}) - li.save() - added.append(li) - - for li in added[:2]: - li.log = 'history' - li.save() - - assert set(LogItem.history.filter(StrInt.n_max(), StrInt.n_min())) == { - li.key - for li in added[:2] - } - - -def test_history_last_filter(): - for i in range(4): - li = LogItem.new({'log': 'history'}) - li.save() - last = li - - assert LogItem.history_last() == last - - -def test_history_last_returns_none(): - assert LogItem.history_last() is None + assert list(HistoryItem.history.filter(StrInt.n_max(), + StrInt.n_min(), + max_results=2)) == added[:2] diff --git a/solar/events/controls.py b/solar/events/controls.py index ebe1e14f..6cba405a 100644 --- a/solar/events/controls.py +++ b/solar/events/controls.py @@ -33,6 +33,7 @@ if no changes noticed on dependent resource. """ from solar.dblayer.model import DBLayerNotFound +from solar.dblayer.solar_models import DBLayerSolarException from solar.dblayer.solar_models import Resource @@ -104,7 +105,7 @@ class React(Event): try: location_id = Resource.get(self.child).inputs[ 'location_id'] - except DBLayerNotFound: + except (DBLayerNotFound, DBLayerSolarException): location_id = None changes_graph.add_node( self.child_node, status='PENDING', @@ -126,7 +127,7 @@ class StateChange(Event): changed_resources.append(self.parent_node) try: location_id = Resource.get(self.parent).inputs['location_id'] - except DBLayerNotFound: + except (DBLayerNotFound, DBLayerSolarException): location_id = None changes_graph.add_node( self.parent_node, status='PENDING', diff --git a/solar/system_log/change.py b/solar/system_log/change.py index 8c252d12..db56d6e6 100644 --- a/solar/system_log/change.py +++ b/solar/system_log/change.py @@ -20,8 +20,8 @@ from solar.core import resource from solar.core.resource.resource import RESOURCE_STATE from solar.core import signals from solar.dblayer.solar_models import CommitedResource +from solar.dblayer.solar_models import HistoryItem from solar.dblayer.solar_models import LogItem -from solar.dblayer.solar_models import StrInt from solar.events import api as evapi from solar.events.controls import StateChange from solar.orchestration import graph @@ -53,27 +53,10 @@ def create_diff(staged, commited): return listify(res) -def create_logitem(resource, action, diffed, connections_diffed, - base_path=''): - return LogItem.new( - {'resource': resource, - 'action': action, - 'diff': diffed, - 'connections_diff': connections_diffed, - 'base_path': base_path, - 'log': 'staged'}) - - -def create_sorted_diff(staged, commited): - staged.sort() - commited.sort() - return create_diff(staged, commited) - - -def make_single_stage_item(resource_obj): +def populate_log_item(log_item): + resource_obj = resource.load(log_item.resource) commited = resource_obj.load_commited() - base_path = resource_obj.base_path - + log_item.base_path = resource_obj.base_path if resource_obj.to_be_removed(): resource_args = {} resource_connections = [] @@ -88,42 +71,87 @@ def make_single_stage_item(resource_obj): commited_args = commited.inputs commited_connections = commited.connections - inputs_diff = create_diff(resource_args, commited_args) - connections_diff = create_sorted_diff( + log_item.diff = create_diff(resource_args, commited_args) + log_item.connections_diff = create_sorted_diff( resource_connections, commited_connections) - - # if new connection created it will be reflected in inputs - # but using inputs to reverse connections is not possible - if inputs_diff: - li = create_logitem( - resource_obj.name, - guess_action(commited_args, resource_args), - inputs_diff, - connections_diff, - base_path=base_path) - li.save() - return li - return None + return log_item -def stage_changes(): - for li in data.SL(): - li.delete() - - last = LogItem.history_last() - since = StrInt.greater(last.updated) if last else None - staged_log = utils.solar_map(make_single_stage_item, - resource.load_updated(since), concurrency=10) - staged_log = filter(None, staged_log) - return staged_log +def create_logitem(resource, action, populate=True): + """Create log item in staged log + :param resource: basestring + :param action: basestring + """ + log_item = LogItem.new( + {'resource': resource, + 'action': action, + 'log': 'staged'}) + if populate: + populate_log_item(log_item) + return log_item -def send_to_orchestration(): +def create_run(resource): + return create_logitem(resource, 'run') + + +def create_remove(resource): + return create_logitem(resource, 'remove') + + +def create_sorted_diff(staged, commited): + staged.sort() + commited.sort() + return create_diff(staged, commited) + + +def staged_log(populate_with_changes=True): + """Staging procedure takes manually created log items, populate them + with diff and connections diff + + Current implementation prevents from several things to occur: + - same log_action (resource.action pair) cannot not be staged multiple + times + - child will be staged only if diff or connections_diff is changed, + and we can execute *run* action to apply that diff - in all other cases + child should be staged explicitly + """ + log_actions = set() + resources_names = set() + staged_log = data.SL() + without_duplicates = [] + for log_item in staged_log: + if log_item.log_action in log_actions: + log_item.delete() + continue + resources_names.add(log_item.resource) + log_actions.add(log_item.log_action) + without_duplicates.append(log_item) + + utils.solar_map(lambda li: populate_log_item(li), + without_duplicates, concurrency=10) + # this is backward compatible change, there might better way + # to "guess" child actions + childs = filter(lambda child: child.name not in resources_names, + resource.load_childs(list(resources_names))) + child_log_items = filter( + lambda li: li.diff or li.connections_diff, + utils.solar_map(create_run, [c.name for c in childs], concurrency=10)) + for log_item in child_log_items + without_duplicates: + log_item.save_lazy() + return without_duplicates + child_log_items + + +def send_to_orchestration(tags=None): dg = nx.MultiDiGraph() events = {} changed_nodes = [] - for logitem in data.SL(): + if tags: + staged_log = LogItem.log_items_by_tags(tags) + else: + staged_log = data.SL() + for logitem in staged_log: events[logitem.resource] = evapi.all_events(logitem.resource) changed_nodes.append(logitem.resource) @@ -155,20 +183,27 @@ def _get_args_to_update(args, connections): } +def is_create(logitem): + return all((item[0] == 'add' for item in logitem.diff)) + + +def is_update(logitem): + return any((item[0] == 'change' for item in logitem.diff)) + + def revert_uids(uids): """Reverts uids :param uids: iterable not generator """ - items = LogItem.multi_get(uids) + items = HistoryItem.multi_get(uids) for item in items: - - if item.action == CHANGES.update.name: + if is_update(item): _revert_update(item) elif item.action == CHANGES.remove.name: _revert_remove(item) - elif item.action == CHANGES.run.name: + elif is_create(item): _revert_run(item) else: log.debug('Action %s for resource %s is a side' @@ -219,8 +254,8 @@ def _update_inputs_connections(res_obj, args, old_connections, new_connections): # that some values can not be updated # even if connection was removed receiver_obj.db_obj.save() - - res_obj.update(args) + if args: + res_obj.update(args) def _revert_update(logitem): @@ -256,10 +291,9 @@ def _discard_update(item): old_connections = resource_obj.connections new_connections = dictdiffer.revert( item.connections_diff, sorted(old_connections)) - args = dictdiffer.revert(item.diff, resource_obj.args) - + inputs = dictdiffer.revert(item.diff, resource_obj.args) _update_inputs_connections( - resource_obj, _get_args_to_update(args, new_connections), + resource_obj, _get_args_to_update(inputs, old_connections), old_connections, new_connections) @@ -268,13 +302,13 @@ def _discard_run(item): def discard_uids(uids): - items = LogItem.multi_get(uids) + items = filter(bool, LogItem.multi_get(uids)) for item in items: - if item.action == CHANGES.update.name: + if is_update(item): _discard_update(item) elif item.action == CHANGES.remove.name: _discard_remove(item) - elif item.action == CHANGES.run.name: + elif is_create(item): _discard_run(item) else: log.debug('Action %s for resource %s is a side' @@ -288,7 +322,7 @@ def discard_uid(uid): def discard_all(): staged_log = data.SL() - return discard_uids([l.uid for l in staged_log]) + return discard_uids([l.key for l in staged_log]) def commit_all(): diff --git a/solar/system_log/data.py b/solar/system_log/data.py index db0cddcf..8e7aa67d 100644 --- a/solar/system_log/data.py +++ b/solar/system_log/data.py @@ -12,18 +12,22 @@ # License for the specific language governing permissions and limitations # under the License. - +from solar.dblayer.solar_models import HistoryItem from solar.dblayer.solar_models import LogItem def SL(): - rst = LogItem.composite.filter({'log': 'staged'}) - return LogItem.multi_get(rst) + rst = LogItem.bucket.get_index('$bucket', + startkey='_', + max_results=100000).results + return filter(bool, LogItem.multi_get(rst)) def CL(): - rst = LogItem.composite.filter({'log': 'history'}) - return LogItem.multi_get(rst) + rst = HistoryItem.bucket.get_index('$bucket', + startkey='_', + max_results=100000).results + return HistoryItem.multi_get(rst) def compact(logitem): diff --git a/solar/system_log/operations.py b/solar/system_log/operations.py index 759886c4..3890b6f0 100644 --- a/solar/system_log/operations.py +++ b/solar/system_log/operations.py @@ -28,34 +28,32 @@ def set_error(log_action, *args, **kwargs): resource_obj = resource.load(item.resource) resource_obj.set_error() item.state = 'error' - item.save() + item.delete() + + +def commit_log_item(item): + resource_obj = resource.load(item.resource) + commited = CommitedResource.get_or_create(item.resource) + if item.action == CHANGES.remove.name: + resource_obj.delete() + commited.state = resource.RESOURCE_STATE.removed.name + else: + resource_obj.set_operational() + commited.state = resource.RESOURCE_STATE.operational.name + commited.base_path = item.base_path + resource_obj.db_obj.save_lazy() + commited.inputs = patch(item.diff, commited.inputs) + # TODO fix TagsWrp to return list + # commited.tags = resource_obj.tags + sorted_connections = sorted(commited.connections) + commited.connections = patch(item.connections_diff, sorted_connections) + commited.save_lazy() + item.to_history().save_lazy() + item.delete() def move_to_commited(log_action, *args, **kwargs): sl = data.SL() item = next((i for i in sl if i.log_action == log_action), None) if item: - resource_obj = resource.load(item.resource) - commited = CommitedResource.get_or_create(item.resource) - updated = resource_obj.db_obj.updated - if item.action == CHANGES.remove.name: - - resource_obj.delete() - commited.state = resource.RESOURCE_STATE.removed.name - else: - resource_obj.set_operational() - commited.state = resource.RESOURCE_STATE.operational.name - commited.base_path = item.base_path - updated = resource_obj.db_obj.updated - # required to update `updated` field - resource_obj.db_obj.save() - commited.inputs = patch(item.diff, commited.inputs) - # TODO fix TagsWrp to return list - # commited.tags = resource_obj.tags - sorted_connections = sorted(commited.connections) - commited.connections = patch(item.connections_diff, sorted_connections) - commited.save() - item.log = 'history' - item.state = 'success' - item.updated = updated - item.save() + commit_log_item(item) diff --git a/solar/test/functional/test_complete_solar_workflow.py b/solar/test/functional/test_complete_solar_workflow.py index a39d4c5b..e514ee3f 100644 --- a/solar/test/functional/test_complete_solar_workflow.py +++ b/solar/test/functional/test_complete_solar_workflow.py @@ -20,6 +20,7 @@ import pytest from solar.config import C # NOQA from solar.core.resource import composer from solar.dblayer.model import clear_cache +from solar.dblayer.model import ModelMeta from solar.errors import ExecutionTimeout from solar import orchestration from solar.orchestration.graph import wait_finish @@ -57,7 +58,8 @@ def test_concurrent_sequences_with_no_handler(scale, clients): timeout = scale * 2 scheduler_client = clients['scheduler'] - assert len(change.stage_changes()) == total_resources + assert len(change.staged_log()) == total_resources + ModelMeta.session_end() plan = change.send_to_orchestration() scheduler_client.next({}, plan.graph['uid']) @@ -75,4 +77,4 @@ def test_concurrent_sequences_with_no_handler(scale, clients): assert res[states.SUCCESS.name] == total_resources assert len(data.CL()) == total_resources clear_cache() - assert len(change.stage_changes()) == 0 + assert len(change.staged_log()) == 0 diff --git a/solar/test/test_system_log_api.py b/solar/test/test_system_log_api.py index fdf928c0..bd966f74 100644 --- a/solar/test/test_system_log_api.py +++ b/solar/test/test_system_log_api.py @@ -13,78 +13,63 @@ # under the License. import mock -from pytest import mark from solar.core.resource import repository from solar.core.resource import resource -from solar.core.resource import RESOURCE_STATE -from solar.core import signals -from solar.dblayer.model import clear_cache +from solar.core.resource import stage_resources from solar.dblayer.model import ModelMeta from solar.dblayer.solar_models import CommitedResource from solar.dblayer.solar_models import Resource as DBResource from solar.system_log import change -from solar.system_log import data from solar.system_log import operations +def create_resource(name, tags=None): + resource = DBResource.from_dict( + name, + {'name': name, + 'base_path': 'x', + 'state': '', + 'tags': tags or [], + 'meta_inputs': {'a': {'value': None, + 'schema': 'str'}}}) + resource.save_lazy() + return resource + + def test_revert_update(): - commit = {'a': '10'} - previous = {'a': '9'} - res = DBResource.from_dict('test1', - {'name': 'test1', - 'base_path': 'x', - 'meta_inputs': {'a': {'value': None, - 'schema': 'str'}}}) + prev = {'a': '9'} + new = {'a': '10'} + res = create_resource('test1') res.save() - action = 'update' - res.inputs['a'] = '9' + action = 'run' resource_obj = resource.load(res.name) - assert resource_obj.args == previous - - log = data.SL() - logitem = change.create_logitem(res.name, - action, - change.create_diff(commit, previous), - [], - base_path=res.base_path) - log.append(logitem) - resource_obj.update(commit) - operations.move_to_commited(logitem.log_action) + resource_obj.update(prev) + logitem = change.create_logitem(res.name, action) + operations.commit_log_item(logitem) + resource_obj.update(new) + logitem = change.create_logitem(res.name, action) + uid = logitem.uid assert logitem.diff == [['change', 'a', ['9', '10']]] - assert resource_obj.args == commit + operations.commit_log_item(logitem) + assert resource_obj.args == new - change.revert(logitem.uid) - assert resource_obj.args == previous + change.revert(uid) + assert resource_obj.args == {'a': '9'} def test_revert_update_connected(): - res1 = DBResource.from_dict('test1', - {'name': 'test1', - 'base_path': 'x', - 'state': RESOURCE_STATE.created.name, - 'meta_inputs': {'a': {'value': None, - 'schema': 'str'}}}) + res1 = create_resource('test1') res1.inputs['a'] = '9' res1.save_lazy() - res2 = DBResource.from_dict('test2', - {'name': 'test2', - 'base_path': 'x', - 'state': RESOURCE_STATE.created.name, - 'meta_inputs': {'a': {'value': None, - 'schema': 'str'}}}) + res2 = create_resource('test2') res2.inputs['a'] = '' res2.save_lazy() - res3 = DBResource.from_dict('test3', - {'name': 'test3', - 'base_path': 'x', - 'state': RESOURCE_STATE.created.name, - 'meta_inputs': {'a': {'value': None, - 'schema': 'str'}}}) + res3 = create_resource('test3') res3.inputs['a'] = '' res3.save_lazy() @@ -95,41 +80,36 @@ def test_revert_update_connected(): res2.connect(res3) ModelMeta.save_all_lazy() - staged_log = change.stage_changes() + staged_log = map(lambda res: change.create_run(res.name), + (res1, res2, res3)) assert len(staged_log) == 3 for item in staged_log: assert item.action == 'run' - operations.move_to_commited(item.log_action) - assert len(change.stage_changes()) == 0 + operations.commit_log_item(item) res1.disconnect(res2) - staged_log = change.stage_changes() - assert len(staged_log) == 2 + staged_log = map(lambda res: change.create_run(res.name), + (res2, res3)) to_revert = [] for item in staged_log: - assert item.action == 'update' - operations.move_to_commited(item.log_action) + assert item.action == 'run' to_revert.append(item.uid) + operations.commit_log_item(item) change.revert_uids(sorted(to_revert, reverse=True)) ModelMeta.save_all_lazy() - staged_log = change.stage_changes() + staged_log = map(lambda res: change.create_run(res.name), + (res2, res3)) - assert len(staged_log) == 2 for item in staged_log: assert item.diff == [['change', 'a', ['', '9']]] def test_revert_removal(): - res = DBResource.from_dict('test1', - {'name': 'test1', - 'base_path': 'x', - 'state': RESOURCE_STATE.created.name, - 'meta_inputs': {'a': {'value': None, - 'schema': 'str'}}}) + res = create_resource('test1') res.inputs['a'] = '9' res.save_lazy() @@ -141,14 +121,13 @@ def test_revert_removal(): resource_obj.remove() ModelMeta.save_all_lazy() - changes = change.stage_changes() - assert len(changes) == 1 - assert changes[0].diff == [['remove', '', [['a', '9']]]] - operations.move_to_commited(changes[0].log_action) + log_item = change.create_remove(resource_obj.name) + log_item.save() + uid = log_item.uid + assert log_item.diff == [['remove', '', [['a', '9']]]] + operations.commit_log_item(log_item) - clear_cache() - assert DBResource._c.obj_cache == {} - # assert DBResource.bucket.get('test1').siblings == [] + ModelMeta.save_all_lazy() with mock.patch.object(repository.Repository, 'read_meta') as mread: mread.return_value = { @@ -157,10 +136,9 @@ def test_revert_removal(): } with mock.patch.object(repository.Repository, 'get_path') as mpath: mpath.return_value = 'x' + change.revert(uid) - change.revert(changes[0].uid) ModelMeta.save_all_lazy() - # assert len(DBResource.bucket.get('test1').siblings) == 1 resource_obj = resource.load('test1') assert resource_obj.args == { @@ -170,177 +148,135 @@ def test_revert_removal(): } -@mark.xfail( - reason="""With current approach child will - notice changes after parent is removed""" -) -def test_revert_removed_child(): - res1 = orm.DBResource(id='test1', name='test1', base_path='x') # NOQA - res1.save() - res1.add_input('a', 'str', '9') - - res2 = orm.DBResource(id='test2', name='test2', base_path='x') # NOQA - res2.save() - res2.add_input('a', 'str', 0) - - res1 = resource.load('test1') - res2 = resource.load('test2') - signals.connect(res1, res2) - - staged_log = change.stage_changes() - assert len(staged_log) == 2 - for item in staged_log: - operations.move_to_commited(item.log_action) - res2.remove() - - staged_log = change.stage_changes() - assert len(staged_log) == 1 - logitem = next(staged_log.collection()) - operations.move_to_commited(logitem.log_action) - - with mock.patch.object(repository, 'read_meta') as mread: - mread.return_value = {'input': {'a': {'schema': 'str!'}}} - change.revert(logitem.uid) - - res2 = resource.load('test2') - assert res2.args == {'a': '9'} - - def test_revert_create(): - res = DBResource.from_dict('test1', - {'name': 'test1', - 'base_path': 'x', - 'state': RESOURCE_STATE.created.name, - 'meta_inputs': {'a': {'value': None, - 'schema': 'str'}}}) + res = create_resource('test1') res.inputs['a'] = '9' res.save_lazy() - ModelMeta.save_all_lazy() - staged_log = change.stage_changes() - assert len(staged_log) == 1 - logitem = staged_log[0] - operations.move_to_commited(logitem.log_action) + logitem = change.create_run(res.name) assert logitem.diff == [['add', '', [['a', '9']]]] + uid = logitem.uid + operations.commit_log_item(logitem) commited = CommitedResource.get('test1') assert commited.inputs == {'a': '9'} - change.revert(logitem.uid) - - staged_log = change.stage_changes() + change.revert(uid) + ModelMeta.save_all_lazy() + staged_log = change.staged_log() assert len(staged_log) == 1 for item in staged_log: - operations.move_to_commited(item.log_action) + operations.commit_log_item(item) assert resource.load_all() == [] def test_discard_all_pending_changes_resources_created(): - res1 = DBResource.from_dict('test1', - {'name': 'test1', - 'base_path': 'x', - 'state': RESOURCE_STATE.created.name, - 'meta_inputs': {'a': {'value': None, - 'schema': 'str'}}}) + res1 = create_resource('test1') res1.inputs['a'] = '9' res1.save_lazy() - res2 = DBResource.from_dict('test2', - {'name': 'test2', - 'base_path': 'x', - 'state': RESOURCE_STATE.created.name, - 'meta_inputs': {'a': {'value': None, - 'schema': 'str'}}}) + res2 = create_resource('test2') res2.inputs['a'] = '0' res2.save_lazy() - ModelMeta.save_all_lazy() - - staged_log = change.stage_changes() - assert len(staged_log) == 2 + staged_log = map(change.create_run, (res1.name, res2.name)) change.discard_all() - staged_log = change.stage_changes() + staged_log = change.staged_log() assert len(staged_log) == 0 assert resource.load_all() == [] def test_discard_connection(): - res1 = DBResource.from_dict('test1', - {'name': 'test1', - 'base_path': 'x', - 'state': RESOURCE_STATE.created.name, - 'meta_inputs': {'a': {'value': None, - 'schema': 'str'}}}) + res1 = create_resource('test1') res1.inputs['a'] = '9' res1.save_lazy() - res2 = DBResource.from_dict('test2', - {'name': 'test2', - 'base_path': 'x', - 'state': RESOURCE_STATE.created.name, - 'meta_inputs': {'a': {'value': None, - 'schema': 'str'}}}) + res2 = create_resource('test2') res2.inputs['a'] = '0' res2.save_lazy() - ModelMeta.save_all_lazy() - staged_log = change.stage_changes() + staged_log = map(change.create_run, (res1.name, res2.name)) for item in staged_log: - operations.move_to_commited(item.log_action) + operations.commit_log_item(item) res1 = resource.load('test1') res2 = resource.load('test2') res1.connect(res2, {'a': 'a'}) - staged_log = change.stage_changes() + ModelMeta.save_all_lazy() + staged_log = change.staged_log() assert len(staged_log) == 1 assert res2.args == {'a': '9'} change.discard_all() assert res2.args == {'a': '0'} - assert len(change.stage_changes()) == 0 + assert len(change.staged_log()) == 0 def test_discard_removed(): - res1 = DBResource.from_dict('test1', - {'name': 'test1', - 'base_path': 'x', - 'state': RESOURCE_STATE.created.name, - 'meta_inputs': {'a': {'value': None, - 'schema': 'str'}}}) + res1 = create_resource('test1') res1.inputs['a'] = '9' res1.save_lazy() - ModelMeta.save_all_lazy() - staged_log = change.stage_changes() - for item in staged_log: - operations.move_to_commited(item.log_action) + res1 = resource.load('test1') res1.remove() - assert len(change.stage_changes()) == 1 + ModelMeta.save_all_lazy() + assert len(change.staged_log()) == 1 assert res1.to_be_removed() change.discard_all() - assert len(change.stage_changes()) == 0 + assert len(change.staged_log()) == 0 assert not resource.load('test1').to_be_removed() def test_discard_update(): - res1 = DBResource.from_dict('test1', - {'name': 'test1', - 'base_path': 'x', - 'state': RESOURCE_STATE.created.name, - 'meta_inputs': {'a': {'value': None, - 'schema': 'str'}}}) + res1 = create_resource('test1') res1.inputs['a'] = '9' res1.save_lazy() - ModelMeta.save_all_lazy() - staged_log = change.stage_changes() - for item in staged_log: - operations.move_to_commited(item.log_action) + operations.commit_log_item(change.create_run(res1.name)) res1 = resource.load('test1') res1.update({'a': '11'}) - assert len(change.stage_changes()) == 1 + ModelMeta.save_all_lazy() + assert len(change.staged_log()) == 1 assert res1.args == {'a': '11'} change.discard_all() assert res1.args == {'a': '9'} + + +def test_stage_and_process_partially(): + a = ['a'] + b = ['b'] + both = a + b + range_a = range(1, 4) + range_b = range(4, 6) + with_tag_a = [create_resource(str(n), tags=a) for n in range_a] + with_tag_b = [create_resource(str(n), tags=b) for n in range_b] + ModelMeta.save_all_lazy() + created_log_items_with_a = stage_resources(a, 'restart') + assert len(created_log_items_with_a) == len(with_tag_a) + created_log_items_with_b = stage_resources(b, 'restart') + assert len(created_log_items_with_b) == len(with_tag_b) + + a_graph = change.send_to_orchestration(a) + a_expected = set(['%s.restart' % n for n in range_a]) + assert set(a_graph.nodes()) == a_expected + b_graph = change.send_to_orchestration(b) + b_expected = set(['%s.restart' % n for n in range_b]) + assert set(b_graph.nodes()) == b_expected + both_graph = change.send_to_orchestration(both) + assert set(both_graph.nodes()) == a_expected | b_expected + + +def test_childs_added_on_stage(): + res_0, res_1 = [create_resource(str(n)) for n in range(2)] + res_0.connect(res_1, {'a': 'a'}) + ModelMeta.save_all_lazy() + created_log_items = stage_resources(res_0.name, 'run') + assert len(created_log_items) == 1 + assert created_log_items[0].resource == res_0.name + staged_log = change.staged_log() + assert len(staged_log) == 2 + child_log_item = next(li for li in staged_log + if li.resource == res_1.name) + assert child_log_item.action == 'run'