Merge "Results and Requests"

This commit is contained in:
Zuul 2019-05-03 15:31:23 +00:00 committed by Gerrit Code Review
commit f7f86f52ab
11 changed files with 703 additions and 16 deletions

View File

@ -41,6 +41,27 @@ serializer = objects_base.IotronicObjectSerializer()
Port = list()
# Method to compare two versions.
# Return 1 if v2 is smaller,
# -1 if v1 is smaller,
# 0 if equal
def versionCompare(v1, v2):
v1_list = v1.split(".")[:3]
v2_list = v2.split(".")[:3]
i = 0
while (i < len(v1_list)):
# v2 > v1
if int(v2_list[i]) > int(v1_list[i]):
return -1
# v1 > v1
if int(v1_list[i]) > int(v2_list[i]):
return 1
i += 1
# v2 == v1
return 0
def get_best_agent(ctx):
agents = objects.WampAgent.list(ctx, filters={'online': True})
LOG.debug('found %d Agent(s).', len(agents))
@ -233,13 +254,50 @@ class ConductorEndpoint(object):
if not board.is_online():
raise exception.BoardNotConnected(board=board.uuid)
cctx = self.wamp_agent_client.prepare(server=board.agent)
res = cctx.call(ctx, 's4t_invoke_wamp',
wamp_rpc_call=full_wamp_call,
data=wamp_rpc_args)
res = wm.deserialize(res)
req_data = {
'destination_uuid': board_uuid,
'type': objects.request.BOARD,
'status': objects.request.PENDING,
'action': wamp_rpc_call
}
req = objects.Request(ctx, **req_data)
req.create()
return res
res_data = {
'board_uuid': board_uuid,
'request_uuid': req.uuid,
'result': objects.result.RUNNING,
'message': ""
}
res = objects.Result(ctx, **res_data)
res.create()
cctx = self.wamp_agent_client.prepare(server=board.agent)
# for previous LR version (to be removed asap)
if (versionCompare(board.lr_version, "0.4.9") == -1):
response = cctx.call(ctx, 's4t_invoke_wamp',
wamp_rpc_call=full_wamp_call,
data=wamp_rpc_args)
else:
response = cctx.call(ctx, 's4t_invoke_wamp',
wamp_rpc_call=full_wamp_call,
req_uuid=req.uuid,
data=wamp_rpc_args)
response = wm.deserialize(response)
if (response.result != wm.RUNNING):
res.result = response.result
res.message = response.message
res.save()
req.status = objects.request.COMPLETED
req.save()
return response
def action_board(self, ctx, board_uuid, action, params):

View File

@ -709,3 +709,56 @@ class Connection(object):
:param enabled_webservice_id: The id or uuid of a enabled_webservice.
"""
@abc.abstractmethod
def create_request(self, values):
"""Create a new webservice.
:param values: A dict containing several items used to identify
and track the request
:returns: A request.
"""
@abc.abstractmethod
def create_result(self, values):
"""Create a new webservice.
:param values: A dict containing several items used to identify
and track the result
:returns: A result.
"""
@abc.abstractmethod
def get_result(self, board_uuid, request_uuid):
"""get a result.
:param board_uuid: the board uuid result.
:param request_uuid: the request_uuid.
:returns: A result.
"""
@abc.abstractmethod
def update_result(self, result_id, values):
"""Update properties of a result.
:param result_id: The id or uuid of a fleet.
:param values: Dict of values to update.
:returns: A result.
"""
@abc.abstractmethod
def update_request(self, request_id, values):
"""Update properties of a result.
:param request_id: The id or uuid of a fleet.
:param values: Dict of values to update.
:returns: A request.
"""
@abc.abstractmethod
def get_results(self, request_uuid):
"""get results of a request.
:param request_uuid: the request_uuid.
:returns: a list of results
"""

View File

@ -0,0 +1,53 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# revision identifiers, used by Alembic.
revision = '76c628d60004'
down_revision = 'b98819997377'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table('requests',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('uuid', sa.String(length=36), nullable=False),
sa.Column('destination_uuid', sa.String(length=36),
nullable=False),
sa.Column('status', sa.String(length=10), nullable=False),
sa.Column('type', sa.Integer(), nullable=False),
sa.Column('action', sa.String(length=15), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uuid', name='uniq_requests0uuid')
)
op.create_table('results',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('result', sa.String(length=10), nullable=False),
sa.Column('message', sa.TEXT(), nullable=True),
sa.Column('board_uuid', sa.String(length=36),
nullable=False),
sa.Column('request_uuid', sa.String(length=36),
nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('board_uuid', 'request_uuid',
name='uniq_request_on_board'),
sa.ForeignKeyConstraint(['board_uuid'],
['boards.uuid'], ),
sa.ForeignKeyConstraint(['request_uuid'],
['requests.uuid'], )
)

View File

@ -226,6 +226,15 @@ class Connection(api.Connection):
query = query. \
filter(models.Port.board_uuid == filters['board_uuid'])
def _add_result_filters(self, query, filters):
if filters is None:
filters = []
if 'result' in filters:
query = query.filter(models.Result.result == filters['result'])
return query
def _do_update_board(self, board_id, values):
session = get_session()
with session.begin():
@ -1196,3 +1205,87 @@ class Connection(api.Connection):
query = self._add_enabled_webservices_filters(query, filters)
return _paginate_query(models.EnabledWebservice, limit, marker,
sort_key, sort_dir, query)
# REQUEST
def get_request_by_id(self, request_id):
query = model_query(models.Request).filter_by(id=request_id)
try:
return query.one()
except NoResultFound:
raise exception.RequestNotFound(request=request_id)
def get_request_by_uuid(self, request_uuid):
query = model_query(models.Request).filter_by(uuid=request_uuid)
try:
return query.one()
except NoResultFound:
raise exception.RequestNotFound(request=request_uuid)
def _do_update_request(self, update_id, values):
session = get_session()
with session.begin():
query = model_query(models.Request, session=session)
query = add_identity_filter(query, update_id)
try:
ref = query.with_lockmode('update').one()
except NoResultFound:
raise exception.RequestNotFound(result=update_id)
ref.update(values)
return ref
def create_request(self, values):
# ensure defaults are present for new requests
if 'uuid' not in values:
values['uuid'] = uuidutils.generate_uuid()
request = models.Request()
request.update(values)
request.save()
return request
def update_request(self, request_id, values):
if 'uuid' in values:
msg = _("Cannot overwrite UUID for an existing Request.")
raise exception.InvalidParameterValue(err=msg)
return self._do_update_request(request_id, values)
# RESULT
def _do_update_result(self, update_id, values):
session = get_session()
with session.begin():
query = model_query(models.Result, session=session)
query = add_identity_filter(query, update_id)
try:
ref = query.with_lockmode('update').one()
except NoResultFound:
raise exception.ResultNotFound(result=update_id)
ref.update(values)
return ref
def create_result(self, values):
# ensure defaults are present for new results
result = models.Result()
result.update(values)
result.save()
return result
def get_result(self, board_uuid, request_uuid):
query = model_query(models.Result).filter_by(
board_uuid=board_uuid).filter_by(request_uuid=request_uuid)
try:
return query.one()
except NoResultFound:
raise exception.ResultNotFound()
def update_result(self, result_id, values):
return self._do_update_result(result_id, values)
def get_results(self, request_uuid, filters=None):
query = model_query(models.Result).filter_by(
request_uuid=request_uuid)
query = self._add_result_filters(query, filters)
try:
return query.all()
except NoResultFound:
raise exception.ResultNotFound()

View File

@ -314,3 +314,35 @@ class EnabledWebservice(Base):
dns = Column(String(100))
zone = Column(String(100))
extra = Column(JSONEncodedDict)
class Request(Base):
"""Represents a request."""
__tablename__ = 'requests'
__table_args__ = (
schema.UniqueConstraint('uuid', name='uniq_requests0uuid'),
table_args())
id = Column(Integer, primary_key=True)
uuid = Column(String(36))
destination_uuid = Column(String(36))
status = Column(String(10))
type = Column(Integer)
action = Column(String(15))
class Result(Base):
"""Represents a result."""
__tablename__ = 'results'
__table_args__ = (
schema.UniqueConstraint('board_uuid', 'request_uuid',
name='uniq_request_on_board'),
table_args())
id = Column(Integer, primary_key=True)
board_uuid = Column(String(36))
request_uuid = Column(String(36))
result = Column(String(10))
message = Column(TEXT)

View File

@ -21,6 +21,8 @@ from iotronic.objects import injectionplugin
from iotronic.objects import location
from iotronic.objects import plugin
from iotronic.objects import port
from iotronic.objects import request
from iotronic.objects import result
from iotronic.objects import service
from iotronic.objects import sessionwp
from iotronic.objects import wampagent
@ -36,6 +38,8 @@ SessionWP = sessionwp.SessionWP
WampAgent = wampagent.WampAgent
Service = service.Service
Webservice = webservice.Webservice
Request = request.Request
Result = result.Result
Port = port.Port
Fleet = fleet.Fleet
EnabledWebservice = enabledwebservice.EnabledWebservice
@ -53,5 +57,7 @@ __all__ = (
Port,
Fleet,
Webservice,
EnabledWebservice
EnabledWebservice,
Request,
Result,
)

195
iotronic/objects/request.py Normal file
View File

@ -0,0 +1,195 @@
# coding=utf-8
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import strutils
from oslo_utils import uuidutils
from iotronic.common import exception
from iotronic.db import api as db_api
from iotronic.objects import base
from iotronic.objects.result import Result
from iotronic.objects import utils as obj_utils
BOARD = 0
FLOAT = 1
COMPLETED = "COMPLETED"
PENDING = "PENDING"
class Request(base.IotronicObject):
# Version 1.0: Initial version
VERSION = '1.0'
dbapi = db_api.get_instance()
fields = {
'id': int,
'uuid': obj_utils.str_or_none,
'destination_uuid': obj_utils.str_or_none,
'status': obj_utils.str_or_none,
'type': int,
'action': obj_utils.str_or_none,
}
@staticmethod
def _from_db_object(request, db_request):
"""Converts a database entity to a formal object."""
for field in request.fields:
request[field] = db_request[field]
request.obj_reset_changes()
return request
@base.remotable_classmethod
def get(cls, context, request_id):
"""Find a request based on its id or uuid and return a Board object.
:param request_id: the id *or* uuid of a request.
:returns: a :class:`Board` object.
"""
if strutils.is_int_like(request_id):
return cls.get_by_id(context, request_id)
elif uuidutils.is_uuid_like(request_id):
return cls.get_by_uuid(context, request_id)
else:
raise exception.InvalidIdentity(identity=request_id)
@base.remotable_classmethod
def get_by_id(cls, context, request_id):
"""Find a request based on its integer id and return a Board object.
:param request_id: the id of a request.
:returns: a :class:`Board` object.
"""
db_request = cls.dbapi.get_request_by_id(request_id)
request = Request._from_db_object(cls(context), db_request)
return request
@base.remotable_classmethod
def get_by_uuid(cls, context, uuid):
"""Find a request based on uuid and return a Board object.
:param uuid: the uuid of a request.
:returns: a :class:`Board` object.
"""
db_request = cls.dbapi.get_request_by_uuid(uuid)
request = Request._from_db_object(cls(context), db_request)
return request
@base.remotable_classmethod
def get_results(cls, context, request_uuid, filters=None):
"""Find a request based on uuid and return a Board object.
:param uuid: the uuid of a request.
:returns: a :class:`Board` object.
"""
return Result.get_results_list(context,
request_uuid, filters)
# @base.remotable_classmethod
# def get_results_request(cls,context,request_uuid):
# db_requests = cls.dbapi.get_results(request_uuid)
# return [Result._from_db_object(cls(context), obj)
# for obj in db_requests]
# @base.remotable_classmethod
# def get_by_name(cls, context, name):
# """Find a request based on name and return a Board object.
#
# :param name: the logical name of a request.
# :returns: a :class:`Board` object.
# """
# db_request = cls.dbapi.get_request_by_name(name)
# request = Request._from_db_object(cls(context), db_request)
# return request
# @base.remotable_classmethod
# def list(cls, context, limit=None, marker=None, sort_key=None,
# sort_dir=None, filters=None):
# """Return a list of Request objects.
#
# :param context: Security context.
# :param limit: maximum number of resources to return in a
# single result.
# :param marker: pagination marker for large data sets.
# :param sort_key: column to sort results by.
# :param sort_dir: direction to sort. "asc" or "desc".
# :param filters: Filters to apply.
# :returns: a list of :class:`Request` object.
#
# """
# db_requests = cls.dbapi.get_request_list(filters=filters,
# limit=limit,
# marker=marker,
# sort_key=sort_key,
# sort_dir=sort_dir)
# return [Request._from_db_object(cls(context), obj)
# for obj in db_requests]
@base.remotable
def create(self, context=None):
"""Create a Request record in the DB.
Column-wise updates will be made based on the result of
self.what_changed(). If target_power_state is provided,
it will be checked against the in-database copy of the
request before updates are made.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Request(context)
"""
values = self.obj_get_changes()
db_request = self.dbapi.create_request(values)
self._from_db_object(self, db_request)
# @base.remotable
# def destroy(self, context=None):
# """Delete the Request from the DB.
#
# :param context: Security context. NOTE: This should only
# be used internally by the indirection_api.
# Unfortunately, RPC requires context as the first
# argument, even though we don't use it.
# A context should be set when instantiating the
# object, e.g.: Request(context)
# """
# self.dbapi.destroy_request(self.uuid)
# self.obj_reset_changes()
@base.remotable
def save(self, context=None):
"""Save updates to this Request.
Column-wise updates will be made based on the result of
self.what_changed(). If target_power_state is provided,
it will be checked against the in-database copy of the
request before updates are made.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Request(context)
"""
updates = self.obj_get_changes()
self.dbapi.update_request(self.uuid, updates)
self.obj_reset_changes()

154
iotronic/objects/result.py Normal file
View File

@ -0,0 +1,154 @@
# coding=utf-8
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# from oslo_utils import strutils
# from oslo_utils import uuidutils
# from iotronic.common import exception
from iotronic.db import api as db_api
from iotronic.objects import base
from iotronic.objects import utils as obj_utils
SUCCESS = "SUCCESS"
ERROR = "ERROR"
WARNING = "WARNING"
RUNNING = "RUNNING"
class Result(base.IotronicObject):
# Version 1.0: Initial version
VERSION = '1.0'
dbapi = db_api.get_instance()
fields = {
'id': int,
'board_uuid': obj_utils.str_or_none,
'request_uuid': obj_utils.str_or_none,
'result': obj_utils.str_or_none,
'message': obj_utils.str_or_none,
}
@staticmethod
def _from_db_object(result, db_result):
"""Converts a database entity to a formal object."""
for field in result.fields:
result[field] = db_result[field]
result.obj_reset_changes()
return result
@base.remotable_classmethod
def get(cls, context, board_uuid, request_uuid):
"""Find a result based on name and return a Board object.
:param board_uuid: the board uuid result.
:param request_uuid: the request_uuid.
:returns: a :class:`result` object.
"""
db_result = cls.dbapi.get_result(board_uuid, request_uuid)
result = Result._from_db_object(cls(context), db_result)
return result
@base.remotable_classmethod
def get_results_list(cls, context, request_uuid, filters=None):
"""Find a result based on name and return a Board object.
:param board_uuid: the board uuid result.
:param request_uuid: the request_uuid.
:returns: a :class:`result` object.
"""
db_requests = cls.dbapi.get_results(request_uuid,
filters=filters)
return [Result._from_db_object(cls(context), obj)
for obj in db_requests]
# @base.remotable_classmethod
# def list(cls, context, limit=None, marker=None, sort_key=None,
# sort_dir=None, filters=None):
# """Return a list of Result objects.
#
# :param context: Security context.
# :param limit: maximum number of resources to return in a
# single result.
# :param marker: pagination marker for large data sets.
# :param sort_key: column to sort results by.
# :param sort_dir: direction to sort. "asc" or "desc".
# :param filters: Filters to apply.
# :returns: a list of :class:`Result` object.
#
# """
# db_results = cls.dbapi.get_result_list(filters=filters,
# limit=limit,
# marker=marker,
# sort_key=sort_key,
# sort_dir=sort_dir)
# return [Result._from_db_object(cls(context), obj)
# for obj in db_results]
@base.remotable
def create(self, context=None):
"""Create a Result record in the DB.
Column-wise updates will be made based on the result of
self.what_changed(). If target_power_state is provided,
it will be checked against the in-database copy of the
result before updates are made.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Result(context)
"""
values = self.obj_get_changes()
db_result = self.dbapi.create_result(values)
self._from_db_object(self, db_result)
# @base.remotable
# def destroy(self, context=None):
# """Delete the Result from the DB.
#
# :param context: Security context. NOTE: This should only
# be used internally by the indirection_api.
# Unfortunately, RPC requires context as the first
# argument, even though we don't use it.
# A context should be set when instantiating the
# object, e.g.: Result(context)
# """
# self.dbapi.destroy_result(self.uuid)
# self.obj_reset_changes()
@base.remotable
def save(self, context=None):
"""Save updates to this Result.
Column-wise updates will be made based on the result of
self.what_changed(). If target_power_state is provided,
it will be checked against the in-database copy of the
result before updates are made.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Result(context)
"""
updates = self.obj_get_changes()
self.dbapi.update_result(self.id, updates)
self.obj_reset_changes()

View File

@ -95,8 +95,19 @@ connected = False
async def wamp_request(kwarg):
LOG.debug("calling: " + kwarg['wamp_rpc_call'])
d = await wamp_session_caller.call(kwarg['wamp_rpc_call'], *kwarg['data'])
# for previous LR version (to be removed asap)
if 'req_uuid' in kwarg:
LOG.debug("calling: " + kwarg['wamp_rpc_call'] +
" with request id: " + kwarg['req_uuid'])
d = await wamp_session_caller.call(kwarg['wamp_rpc_call'],
kwarg['req_uuid'],
*kwarg['data'])
else:
LOG.debug("calling: " + kwarg['wamp_rpc_call'])
d = await wamp_session_caller.call(kwarg['wamp_rpc_call'],
*kwarg['data'])
return d
@ -234,6 +245,8 @@ class WampManager(object):
AGENT_HOST + u'.stack4things.alive')
session.register(fun.wamp_alive,
AGENT_HOST + u'.stack4things.wamp_alive')
session.register(fun.notify_result,
AGENT_HOST + u'.stack4things.notify_result')
LOG.debug("procedure registered")
except Exception as e:

View File

@ -175,3 +175,26 @@ def registration(code, session):
def board_on_join(session_id):
LOG.debug('A board with %s joined', session_id['session'])
def notify_result(board_uuid, wampmessage):
wmsg = wm.deserialize(wampmessage)
LOG.info('Board %s completed the its request %s with result: %s',
board_uuid, wmsg.req_id, wmsg.result)
res = objects.Result.get(ctxt, board_uuid, wmsg.req_id)
res.result = wmsg.result
res.message = wmsg.message
res.save()
filter = {"result": objects.result.RUNNING}
list_result = objects.Request.get_results(ctxt,
wmsg.req_id,
filter)
if len(list_result) == 0:
req = objects.Request.get_by_uuid(ctxt, wmsg.req_id)
req.status = objects.request.COMPLETED
req.save()
return wm.WampSuccess('notification_received').serialize()

View File

@ -19,6 +19,7 @@ import json
SUCCESS = 'SUCCESS'
ERROR = 'ERROR'
WARNING = 'WARNING'
RUNNING = 'RUNNING'
def deserialize(received):
@ -27,24 +28,30 @@ def deserialize(received):
class WampMessage(object):
def __init__(self, message=None, result=None):
def __init__(self, message, result, req_id):
self.message = message
self.result = result
self.req_id = req_id
def serialize(self):
return json.dumps(self, default=lambda o: o.__dict__)
class WampSuccess(WampMessage):
def __init__(self, msg=None):
super(WampSuccess, self).__init__(msg, SUCCESS)
def __init__(self, msg=None, req_id=None):
super(WampSuccess, self).__init__(msg, SUCCESS, req_id)
class WampError(WampMessage):
def __init__(self, msg=None):
super(WampError, self).__init__(msg, ERROR)
def __init__(self, msg=None, req_id=None):
super(WampError, self).__init__(msg, ERROR, req_id)
class WampWarning(WampMessage):
def __init__(self, msg=None):
super(WampWarning, self).__init__(msg, WARNING)
def __init__(self, msg=None, req_id=None):
super(WampWarning, self).__init__(msg, WARNING, req_id)
class WampRunning(WampMessage):
def __init__(self, msg=None, req_id=None):
super(WampRunning, self).__init__(msg, RUNNING, req_id)