Support to get workers by admin user

Admin user can get current workers related to a function.

Change-Id: Ia17e52ad1f7541e63b9e815b47751ad94d33e743
This commit is contained in:
Lingxian Kong 2017-12-08 13:51:40 +13:00
parent ddaa5fc471
commit 939ba95cc4
10 changed files with 151 additions and 7 deletions

View File

@ -9,4 +9,6 @@
"runtime:delete": "rule:context_is_admin",
"function:get_all:all_projects": "rule:context_is_admin",
"function_worker:get_all": "rule:context_is_admin",
}

View File

@ -45,7 +45,24 @@ CODE_SOURCE = set(['package', 'swift', 'image'])
UPDATE_ALLOWED = set(['name', 'description', 'code', 'package', 'entry'])
class FunctionWorkerController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.FunctionWorkers, types.uuid)
def get_all(self, function_id):
acl.enforce('function_worker:get_all', context.get_ctx())
LOG.info("Get workers for function %s.", function_id)
db_workers = db_api.get_function_workers(function_id)
workers = [resources.FunctionWorker.from_dict(db_model.to_dict())
for db_model in db_workers]
return resources.FunctionWorkers(workers=workers)
class FunctionsController(rest.RestController):
workers = FunctionWorkerController()
_custom_actions = {
'scale_up': ['POST'],
'scale_down': ['POST'],

View File

@ -213,6 +213,20 @@ class Functions(ResourceList):
return sample
class FunctionWorker(Resource):
id = wtypes.text
function_id = wsme.wsattr(types.uuid, readonly=True)
worker_name = wtypes.text
class FunctionWorkers(ResourceList):
workers = [FunctionWorker]
def __init__(self, **kwargs):
self._type = 'workers'
super(FunctionWorkers, self).__init__(**kwargs)
class Runtime(Resource):
id = wtypes.text
name = wtypes.text

View File

@ -418,15 +418,17 @@ def delete_function_service_mapping(id, session=None):
@db_base.session_aware()
def create_function_worker(values, session=None):
mapping = models.FunctionWorkers()
mapping.update(values.copy())
worker = models.FunctionWorkers()
worker.update(values.copy())
# Ignore duplicate error for FunctionWorkers
try:
mapping.save(session=session)
worker.save(session=session)
except oslo_db_exc.DBDuplicateEntry:
session.close()
return worker
@db_base.session_aware()
def get_function_workers(function_id, session=None):

View File

@ -230,6 +230,9 @@ class KubernetesManager(base.OrchestratorBase):
label_selector='function_id=%s' % function_id
)
if len(ret.items) > 0:
LOG.debug(
"Function %s already associates to a pod.", function_id
)
return ret.items[:count]
ret = self.v1.list_namespaced_pod(

View File

@ -0,0 +1,47 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from qinling.db import api as db_api
from qinling.tests.unit.api import base
TEST_CASE_NAME = 'TestFunctionWorkerController'
class TestFunctionWorkerController(base.APITest):
def setUp(self):
super(TestFunctionWorkerController, self).setUp()
db_func = self.create_function(prefix=TEST_CASE_NAME)
self.function_id = db_func.id
def test_get_all_workers(self):
db_worker = db_api.create_function_worker(
{
'function_id': self.function_id,
'worker_name': 'worker_1',
}
)
expected = {
"id": db_worker.id,
"function_id": self.function_id,
"worker_name": "worker_1",
}
resp = self.app.get('/v1/functions/%s/workers' % self.function_id)
self.assertEqual(200, resp.status_int)
actual = self._assert_single_item(
resp.json['workers'], id=db_worker.id
)
self._assertDictContainsSubset(actual, expected)

View File

@ -0,0 +1,18 @@
# Copyright 2017 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
def main(seconds=5, **kwargs):
time.sleep(seconds)

View File

@ -94,3 +94,6 @@ class QinlingClient(client_base.QinlingClientBase):
def get_execution_log(self, execution_id):
return self.get('/v1/executions/%s/log' % execution_id,
headers={'Accept': 'text/plain'})
def get_function_workers(self, function_id):
return self.get_resources('functions/%s/workers' % function_id)

View File

@ -16,6 +16,7 @@ import pkg_resources
import tempfile
import zipfile
import futurist
from oslo_serialization import jsonutils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
@ -100,12 +101,13 @@ class ExecutionsTest(base.BaseQinlingTest):
input={'name': 'Qinling'})
self.assertEqual(201, resp.status)
self.assertEqual('success', body['status'])
execution_id = body['id']
self.addCleanup(self.client.delete_resource, 'executions',
execution_id, ignore_notfound=True)
self.assertEqual('success', body['status'])
# Get executions
resp, body = self.client.get_resources('executions')
@ -128,12 +130,13 @@ class ExecutionsTest(base.BaseQinlingTest):
resp, body = self.client.create_execution(self.function_id, sync=False)
self.assertEqual(201, resp.status)
self.assertEqual('running', body['status'])
execution_id = body['id']
self.addCleanup(self.client.delete_resource, 'executions',
execution_id, ignore_notfound=True)
self.assertEqual('running', body['status'])
self.await_execution_success(execution_id)
@decorators.idempotent_id('6cb47b1d-a8c6-48f2-a92f-c4f613c33d1c')
@ -145,11 +148,11 @@ class ExecutionsTest(base.BaseQinlingTest):
input={'name': 'OpenStack'})
self.assertEqual(201, resp.status)
self.addCleanup(self.client.delete_resource, 'executions',
body['id'], ignore_notfound=True)
self.assertEqual('success', body['status'])
execution_id = body['id']
self.addCleanup(self.client.delete_resource, 'executions',
execution_id, ignore_notfound=True)
# Get execution log
resp, body = self.client.get_execution_log(execution_id)
@ -157,6 +160,35 @@ class ExecutionsTest(base.BaseQinlingTest):
self.assertEqual(200, resp.status)
self.assertIn('Hello, OpenStack', body)
@decorators.idempotent_id('f22097dc-37db-484d-83d3-3a97e72ec576')
def test_execution_concurrency(self):
self.await_runtime_available(self.runtime_id)
self._create_function(name='test_python_sleep.py')
def _create_execution():
resp, body = self.client.create_execution(self.function_id)
return resp, body
futs = []
with futurist.GreenThreadPoolExecutor(max_workers=4) as executor:
for _ in range(3):
fut = executor.submit(_create_execution)
futs.append(fut)
for f in futs:
# Wait until we get the response
resp, body = f.result()
self.assertEqual(201, resp.status)
self.addCleanup(self.client.delete_resource, 'executions',
body['id'], ignore_notfound=True)
self.assertEqual('success', body['status'])
resp, body = self.admin_client.get_function_workers(self.function_id)
self.assertEqual(200, resp.status)
self.assertEqual(1, len(body['workers']))
@decorators.idempotent_id('a948382a-84af-4f0e-ad08-4297345e302c')
def test_python_execution_file_limit(self):
self.await_runtime_available(self.runtime_id)
self._create_function(name='test_python_file_limit.py')
@ -164,6 +196,8 @@ class ExecutionsTest(base.BaseQinlingTest):
resp, body = self.client.create_execution(self.function_id)
self.assertEqual(201, resp.status)
self.addCleanup(self.client.delete_resource, 'executions',
body['id'], ignore_notfound=True)
self.assertEqual('failed', body['status'])
output = jsonutils.loads(body['output'])
@ -171,6 +205,7 @@ class ExecutionsTest(base.BaseQinlingTest):
'Too many open files', output['output']
)
@decorators.idempotent_id('bf6f8f35-fa88-469b-8878-7aa85a8ce5ab')
def test_python_execution_process_number(self):
self.await_runtime_available(self.runtime_id)
self._create_function(name='test_python_process_limit.py')
@ -178,6 +213,8 @@ class ExecutionsTest(base.BaseQinlingTest):
resp, body = self.client.create_execution(self.function_id)
self.assertEqual(201, resp.status)
self.addCleanup(self.client.delete_resource, 'executions',
body['id'], ignore_notfound=True)
self.assertEqual('failed', body['status'])
output = jsonutils.loads(body['output'])

View File

@ -10,6 +10,7 @@ testrepository>=0.0.18 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD
testtools>=1.4.0 # MIT
tempest>=16.1.0 # Apache-2.0
futurist>=1.2.0 # Apache-2.0
openstackdocstheme>=1.16.0 # Apache-2.0