Merge "Revival of Cassandra repository"

This commit is contained in:
Jenkins 2016-08-04 13:17:44 +00:00 committed by Gerrit Code Review
commit a3acd792da
7 changed files with 255 additions and 1 deletions

View File

@ -10,9 +10,11 @@ verbose = true
[repositories] [repositories]
# The driver to use for the metrics repository # The driver to use for the metrics repository
metrics_driver = monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository metrics_driver = monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository
#metrics_driver = monasca_persister.repositories.cassandra.metrics_repository:MetricCassandraRepository
# The driver to use for the alarm state history repository # The driver to use for the alarm state history repository
alarm_state_history_driver = monasca_persister.repositories.influxdb.alarm_state_history_repository:AlarmStateHistInfluxdbRepository alarm_state_history_driver = monasca_persister.repositories.influxdb.alarm_state_history_repository:AlarmStateHistInfluxdbRepository
#alarm_state_history_driver = monasca_persister.repositories.cassandra.alarm_state_history_repository:AlarmStateHistCassandraRepository
[zookeeper] [zookeeper]
# Comma separated list of host:port # Comma separated list of host:port
@ -61,3 +63,9 @@ ip_address = 192.168.10.4
port = 8086 port = 8086
user = mon_persister user = mon_persister
password = password password = password
# Uncomment, set cluster_ip_addresses, and change the repositories to point to the cassandra classes
#[cassandra]
# Comma separated list of Cassandra node IP addresses. No spaces.
#cluster_ip_addresses: 192.168.10.6
#keyspace: monasca

View File

@ -16,7 +16,7 @@
"""Persister Module """Persister Module
The Persister reads metrics and alarms from Kafka and then stores them The Persister reads metrics and alarms from Kafka and then stores them
in into Influxdb in into either Influxdb or Cassandra
Start the perister as stand-alone process by running 'persister.py Start the perister as stand-alone process by running 'persister.py
--config-file <config file>' --config-file <config file>'

View File

@ -0,0 +1,22 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
cassandra_opts = [cfg.StrOpt('cluster_ip_addresses'),
cfg.StrOpt('keyspace')]
cassandra_group = cfg.OptGroup(name='cassandra')
cfg.CONF.register_group(cassandra_group)
cfg.CONF.register_opts(cassandra_opts, cassandra_group)

View File

@ -0,0 +1,37 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from oslo_config import cfg
import six
from repositories.abstract_repository import AbstractRepository
@six.add_metaclass(abc.ABCMeta)
class AbstractCassandraRepository(AbstractRepository):
def __init__(self):
super(AbstractCassandraRepository, self).__init__()
self.conf = cfg.CONF
self._cassandra_cluster = Cluster(
self.conf.cassandra.cluster_ip_addresses.split(','))
self.cassandra_session = self._cassandra_cluster.connect(
self.conf.cassandra.keyspace)
self._batch_stmt = BatchStatement()

View File

@ -0,0 +1,71 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from cassandra.query import BatchStatement
from oslo_log import log
from repositories.cassandra.abstract_repository import AbstractCassandraRepository
from repositories.utils import parse_alarm_state_hist_message
LOG = log.getLogger(__name__)
class AlarmStateHistCassandraRepository(AbstractCassandraRepository):
def __init__(self):
super(AlarmStateHistCassandraRepository, self).__init__()
self._insert_alarm_state_hist_stmt = self.cassandra_session.prepare(
'insert into alarm_state_history (tenant_id, alarm_id, '
'metrics, new_state, '
'old_state, reason, reason_data, '
'sub_alarms, time_stamp) values (?,?,?,?,?,?,?,?,?)')
def process_message(self, message):
(alarm_id, metrics, new_state, old_state, link,
lifecycle_state, state_change_reason,
sub_alarms_json_snake_case, tenant_id,
time_stamp) = parse_alarm_state_hist_message(
message)
alarm_state_hist = (
tenant_id.encode('utf8'),
alarm_id.encode('utf8'),
json.dumps(metrics, ensure_ascii=False).encode(
'utf8'),
new_state.encode('utf8'),
old_state.encode('utf8'),
state_change_reason.encode('utf8'),
"{}".encode('utf8'),
sub_alarms_json_snake_case.encode('utf8'),
time_stamp
)
LOG.debug(alarm_state_hist)
return alarm_state_hist
def write_batch(self, alarm_state_hists):
for alarm_state_hist in alarm_state_hists:
self._batch_stmt.add(self._insert_alarm_state_hist_stmt,
alarm_state_hist)
self.cassandra_session.execute(self._batch_stmt)
self._batch_stmt = BatchStatement()

View File

@ -0,0 +1,115 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import json
from cassandra.query import BatchStatement
from oslo_log import log
import urllib
from repositories.cassandra.abstract_repository import AbstractCassandraRepository
from repositories.utils import parse_measurement_message
LOG = log.getLogger(__name__)
class MetricCassandraRepository(AbstractCassandraRepository):
def __init__(self):
super(MetricCassandraRepository, self).__init__()
self._insert_measurement_stmt = self.cassandra_session.prepare(
'insert into measurements (tenant_id,'
'region, metric_hash, time_stamp, value,'
'value_meta) values (?, ?, ?, ?, ?, ?)')
self._insert_metric_map_stmt = self.cassandra_session.prepare(
'insert into metric_map (tenant_id,'
'region, metric_hash, '
'metric_map) values'
'(?,?,?,?)')
def process_message(self, message):
(dimensions, metric_name, region, tenant_id, time_stamp, value,
value_meta) = parse_measurement_message(message)
metric_hash, metric_map = create_metric_hash(metric_name,
dimensions)
measurement = (tenant_id.encode('utf8'),
region.encode('utf8'),
metric_hash,
time_stamp,
value,
json.dumps(value_meta, ensure_ascii=False).encode(
'utf8'))
LOG.debug(measurement)
return MetricMeasurementInfo(
tenant_id.encode('utf8'),
region.encode('utf8'),
metric_hash,
metric_map,
measurement)
def write_batch(self, metric_measurement_infos):
for metric_measurement_info in metric_measurement_infos:
self._batch_stmt.add(self._insert_measurement_stmt,
metric_measurement_info.measurement)
metric_map = (metric_measurement_info.tenant_id,
metric_measurement_info.region,
metric_measurement_info.metric_hash,
metric_measurement_info.metric_map)
self._batch_stmt.add(self._insert_metric_map_stmt,
metric_map)
self.cassandra_session.execute(self._batch_stmt)
self._batch_stmt = BatchStatement()
class MetricMeasurementInfo(object):
def __init__(self, tenant_id, region, metric_hash, metric_map,
measurement):
self.tenant_id = tenant_id
self.region = region
self.metric_hash = metric_hash
self.metric_map = metric_map
self.measurement = measurement
def create_metric_hash(metric_name, dimensions):
dimensions['__name__'] = urllib.quote_plus(metric_name)
hash_string = ''
for dim_name in sorted(dimensions.iterkeys()):
dimension = (urllib.quote_plus(dim_name) + '=' + urllib.quote_plus(
dimensions[dim_name]))
hash_string += dimension
sha1_hash = hashlib.sha1(hash_string).hexdigest()
return bytearray.fromhex(sha1_hash), dimensions

View File

@ -3,4 +3,5 @@ oslo.log
six>=1.9.0 six>=1.9.0
#influxdb==2.8.0 #influxdb==2.8.0
#cassandra-driver>=2.1.4,!=3.6.0 # Apache-2.0
monasca-common monasca-common