Merge "Collector rpc datasource works at 200k entities."

This commit is contained in:
Zuul 2018-08-07 06:37:51 +00:00 committed by Gerrit Code Review
commit 4b7dac42a6
11 changed files with 115 additions and 28 deletions

View File

@ -8,6 +8,7 @@ General
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
high-scale
resource-state-config resource-state-config
alarm-severity-config alarm-severity-config
profiler-config profiler-config

View File

@ -0,0 +1,57 @@
================================
Configure Vitrage for high-scale
================================
In a production environment with > 50,000 entities, the following configuration changes are suggested:
Tune RPC
--------
Vitrage-graph uses RPC to request data from vitrage-collector, these requests take longer, and there is a need to increase the timeout.
The following should be set in ``/etc/vitrage/vitrage.conf``, under ``[DEFAULT]`` section:
+----------------------+---------------------------------------------------------+-----------------+-----------------+
| Name | Description | Default Value | Suggested Value |
+======================+=========================================================+=================+=================+
| rpc_response_timeout | Seconds to wait for a response from a call | 60 | 300 |
+----------------------+---------------------------------------------------------+-----------------+-----------------+
To apply, restart these:
``sudo service vitrage-graph restart``
``sudo service vitrage-collector restart``
Restart the Vitrage api (either vitrage-api or apache)
Tune Memory
-----------
Most of the data is held in-memory. To conserve memory usage, the number of evaluator workers should be decreased.
If using many Vitrage templates the number of evaluator workers can be increased, but kept to a minimum needed.
The following should be set in ``/etc/vitrage/vitrage.conf``, under ``[evaluator]`` section:
+----------------------+---------------------------------------------------------+-----------------+-----------------+
| Name | Description | Default Value | Suggested Value |
+======================+=========================================================+=================+=================+
| workers | Number of workers for template evaluator | number of cores | 1 |
+----------------------+---------------------------------------------------------+-----------------+-----------------+
To apply, run ``sudo service vitrage-graph restart``
Tune Mysql
----------
Vitrage periodically persists the graph to mysql, as a mysql blob. As the graph size increases, it is recommended to increase the mysql max_allowed_packet.
The following should be set in ``/etc/mysql/my.cnf``, under ``[mysqld]`` section:
+----------------------+---------------------------------------------------------+-----------------+-----------------+
| Name | Description | Default Value | Suggested Value |
+======================+=========================================================+=================+=================+
| max_allowed_packet | The maximum size of one packet or any string | 4M-64M | 100M |
+----------------------+---------------------------------------------------------+-----------------+-----------------+
To apply, run ``sudo service mysql restart``

View File

@ -0,0 +1,3 @@
features:
- Support for graphs with more than 100,000 vertices has been added and
tested. See high-scale configuration document.

View File

@ -52,8 +52,8 @@ class DriverBase(object):
@classmethod @classmethod
def make_pickleable(cls, entities, entity_type, datasource_action, *args): def make_pickleable(cls, entities, entity_type, datasource_action, *args):
pickleable_entities = cls.prepare_entities(entities, entity_type, pickleable_entities = cls.make_pickleable_without_end_msg(
datasource_action, args) entities, entity_type, datasource_action, *args)
if datasource_action == DatasourceAction.INIT_SNAPSHOT: if datasource_action == DatasourceAction.INIT_SNAPSHOT:
pickleable_entities.append(cls._get_end_message(entity_type)) pickleable_entities.append(cls._get_end_message(entity_type))
@ -63,12 +63,6 @@ class DriverBase(object):
@classmethod @classmethod
def make_pickleable_without_end_msg(cls, entities, entity_type, def make_pickleable_without_end_msg(cls, entities, entity_type,
datasource_action, *args): datasource_action, *args):
pickleable_entities = cls.prepare_entities(entities, entity_type,
datasource_action, args)
return pickleable_entities
@classmethod
def prepare_entities(cls, entities, entity_type, datasource_action, args):
pickleable_entities = [] pickleable_entities = []
for entity in entities: for entity in entities:
for field in args: for field in args:
@ -80,6 +74,18 @@ class DriverBase(object):
pickleable_entities.append(entity) pickleable_entities.append(entity)
return pickleable_entities return pickleable_entities
@classmethod
def make_pickleable_iter(cls, entities, entity_type,
datasource_action, *args):
for entity in entities:
for field in args:
entity.pop(field, None)
cls._add_entity_type(entity, entity_type)
cls._add_datasource_action(entity, datasource_action)
cls._add_sampling_time(entity)
yield entity
@staticmethod @staticmethod
def _add_entity_type(entity, entity_type): def _add_entity_type(entity, entity_type):
if DSProps.ENTITY_TYPE not in entity: if DSProps.ENTITY_TYPE not in entity:

View File

@ -43,4 +43,5 @@ class NetworkDriver(NeutronBase):
return self.make_pickleable( return self.make_pickleable(
self.client.list_networks()['networks'], self.client.list_networks()['networks'],
NEUTRON_NETWORK_DATASOURCE, NEUTRON_NETWORK_DATASOURCE,
datasource_action) datasource_action,
*self.properties_to_filter_out())

View File

@ -44,4 +44,5 @@ class PortDriver(NeutronBase):
return self.make_pickleable( return self.make_pickleable(
ports, ports,
NEUTRON_PORT_DATASOURCE, NEUTRON_PORT_DATASOURCE,
datasource_action) datasource_action,
*self.properties_to_filter_out())

View File

@ -11,9 +11,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import base64
from concurrent import futures from concurrent import futures
from six.moves import cPickle
import time import time
import zlib
from oslo_log import log from oslo_log import log
@ -44,6 +46,11 @@ class CollectorRpcHandlerService(object):
LOG.info("Collector Rpc Handler Service - Stopped!") LOG.info("Collector Rpc Handler Service - Stopped!")
def compress_events(events):
str_data = cPickle.dumps(events, cPickle.HIGHEST_PROTOCOL)
return base64.b64encode(zlib.compress(str_data))
class DriversEndpoint(object): class DriversEndpoint(object):
def __init__(self, conf): def __init__(self, conf):
@ -71,7 +78,8 @@ class DriversEndpoint(object):
time.sleep(fault_interval) time.sleep(fault_interval)
result.extend(list(self.pool.map(run_driver, failed_drivers))) result.extend(list(self.pool.map(run_driver, failed_drivers)))
events = [e for success, events in result if success for e in events] events = compress_events([e for success, events in result if success
for e in events])
LOG.debug("run drivers get_all done.") LOG.debug("run drivers get_all done.")
return events return events

View File

@ -11,7 +11,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from base64 import standard_b64decode
from six.moves import cPickle
import time import time
import zlib
from oslo_log import log from oslo_log import log
import oslo_messaging import oslo_messaging
@ -35,12 +38,15 @@ def get_all(rpc_client, events_coordination, driver_names, action,
t1 = time.time() t1 = time.time()
def _call(): def _call():
return rpc_client.call( result = rpc_client.call(
{}, {},
'driver_get_all', 'driver_get_all',
driver_names=driver_names, driver_names=driver_names,
action=action, action=action,
retry_on_fault=retry_on_fault) retry_on_fault=retry_on_fault)
events = cPickle.loads(zlib.decompress(standard_b64decode(result)))
for e in events:
yield e
try: try:
events = _call() events = _call()
@ -48,10 +54,10 @@ def get_all(rpc_client, events_coordination, driver_names, action,
LOG.exception('Got MessagingTimeout') LOG.exception('Got MessagingTimeout')
events = _call() if retry_on_fault else [] events = _call() if retry_on_fault else []
t2 = time.time() t2 = time.time()
events_coordination.handle_multiple_low_priority(events) count = events_coordination.handle_multiple_low_priority(events)
t3 = time.time() t3 = time.time()
LOG.info('get_all took %s, processing took %s for %s events', LOG.info('get_all took %s, processing took %s for %s events',
t2 - t1, t3 - t2, len(events)) t2 - t1, t3 - t2, count)
def get_changes(rpc_client, events_coordination, driver_name): def get_changes(rpc_client, events_coordination, driver_name):

View File

@ -173,8 +173,10 @@ class EventsCoordination(object):
self._lock.release() self._lock.release()
def handle_multiple_low_priority(self, events): def handle_multiple_low_priority(self, events):
for e in events: index = 0
for index, e in enumerate(events):
self._do_low_priority_work(e) self._do_low_priority_work(e)
return index
def _init_listener(self, topic, callback): def _init_listener(self, topic, callback):
if not topic: if not topic:

View File

@ -322,7 +322,10 @@ class NXGraph(Graph):
node_link_data['links'][i]['target'] = vitrage_id_to_index[ node_link_data['links'][i]['target'] = vitrage_id_to_index[
node_link_data['links'][i]['target']] node_link_data['links'][i]['target']]
return json.dumps(node_link_data) if kwargs.get('raw', False):
return node_link_data
else:
return json.dumps(node_link_data)
def write_gpickle(self): def write_gpickle(self):
return cPickle.dumps(self._g, cPickle.HIGHEST_PROTOCOL) return cPickle.dumps(self._g, cPickle.HIGHEST_PROTOCOL)

View File

@ -11,8 +11,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import json
from oslo_log import log from oslo_log import log
@ -30,7 +28,7 @@ class MockDriver(StaticDriver):
def __init__(self, conf): def __init__(self, conf):
super(StaticDriver, self).__init__() super(StaticDriver, self).__init__()
mock_cfg = conf.mock_graph_datasource mock_cfg = conf.mock_graph_datasource
self.e_graph = GraphGenerator( e_graph = GraphGenerator(
networks=mock_cfg.networks, networks=mock_cfg.networks,
zones_per_cluster=mock_cfg.zones_per_cluster, zones_per_cluster=mock_cfg.zones_per_cluster,
hosts_per_zone=mock_cfg.hosts_per_zone, hosts_per_zone=mock_cfg.hosts_per_zone,
@ -42,19 +40,20 @@ class MockDriver(StaticDriver):
tripleo_controllers=mock_cfg.tripleo_controllers, tripleo_controllers=mock_cfg.tripleo_controllers,
zabbix_alarms_per_controller=mock_cfg.zabbix_alarms_per_controller zabbix_alarms_per_controller=mock_cfg.zabbix_alarms_per_controller
).create_graph() ).create_graph()
definitions = e_graph.json_output_graph(raw=True)
self.mock_entities = self._get_mock_entities(definitions)
def get_all(self, datasource_action): def get_all(self, datasource_action):
return self.make_pickleable(self._get_mock_entities(), return self.make_pickleable_iter(self.mock_entities,
MOCK_DATASOURCE, MOCK_DATASOURCE,
datasource_action) datasource_action)
def get_changes(self, datasource_action): def get_changes(self, datasource_action):
return self.make_pickleable([], return self.make_pickleable_iter([],
MOCK_DATASOURCE, MOCK_DATASOURCE,
datasource_action) datasource_action)
def _get_mock_entities(self): def _get_mock_entities(self, definitions):
definitions = json.loads(self.e_graph.json_output_graph())
for node in definitions['nodes']: for node in definitions['nodes']:
node[StaticFields.STATIC_ID] = str(node[VProps.GRAPH_INDEX]) node[StaticFields.STATIC_ID] = str(node[VProps.GRAPH_INDEX])
node[StaticFields.TYPE] = node[VProps.VITRAGE_TYPE] node[StaticFields.TYPE] = node[VProps.VITRAGE_TYPE]