Added a subscription_events table & subscription hander

This table will be populated with the subscriber id, the type of event
that occurred and the event-info in the resources he subscribed to.

A person subscribed to a project-group will be notified of
1. Addition/ removal of projects to that project group.
2. Changes to stories, tasks of any of the projects under the
project-group.

A person subscribed to a project will be notified of
1. Changes to stories, tasks associated with that project.
2. When the project is added or removed from a project-group.

A person subscribed to a story will be notified of
1. Any changes to the tasks associated with that story

A person subscribed to a task will be notified of
1. Any changes to the task.

Added authentication checks for current user in v1/subscription_events.py

Added handle_deletions method in subscriptions_handler.py
This method is to remove any active subscriptions(tasks,project_groups)
on the resources that have been just deleted.

Changed the db migration revision number for subscription_events
as it conflicted with fulltext indexes

Added a method to resolve comment content.

Edit: Removed foreign key constraint on subscriber_id in
subscription_events.py from migrations.

Change-Id: Ie226f652241d22d5cee96876370b5797d5bfa54d
This commit is contained in:
Aishwarya Thangappa
2014-08-06 12:57:59 -07:00
parent c8cbc9720d
commit 178b0d36f7
10 changed files with 514 additions and 58 deletions

View File

@@ -28,4 +28,4 @@ class ResourceHook(hooks.PecanHook):
if state.request.method == 'GET':
return
publisher.publish(state)
publisher.process(state)

View File

@@ -0,0 +1,152 @@
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from pecan import abort
from pecan import request
from pecan import response
from pecan import rest
from pecan.secure import secure
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from storyboard.api.auth import authorization_checks as checks
from storyboard.api.v1 import base
from storyboard.db.api import subscription_events as subscription_events_api
from storyboard.db.api import users as user_api
CONF = cfg.CONF
class SubscriptionEvent(base.APIBase):
"""A model that describes a resource subscription.
"""
subscriber_id = int
"""The owner of this subscription.
"""
event_type = wtypes.text
"""This type should serve as a hint for the web-client when rendering
a comment."""
event_info = wtypes.text
"""A JSON encoded field with details about the event."""
@classmethod
def sample(cls):
return cls(
subscriber_id=1,
event_type="project",
event_info={"task_title": "story1", "old_status": "todo",
"task_id": 38, "new_status": "inprogress"}
)
class SubscriptionEventsController(rest.RestController):
"""REST controller for Subscriptions.
Provides Create, Delete, and search methods for resource
subscriptionEvents.
"""
@secure(checks.authenticated)
@wsme_pecan.wsexpose(SubscriptionEvent, int)
def get_one(self, subscription_event_id):
"""Retrieve a specific subscription record.
:param subscription_event_id: The unique id of this subscription.
"""
subscription_event = subscription_events_api \
.subscription_events_get(subscription_event_id)
current_user = user_api.user_get(request.current_user_id)
if current_user.id != subscription_event.subscriber_id and \
not current_user.is_superuser:
abort(403, "Permission Denied")
return SubscriptionEvent.from_db_model(subscription_event)
@secure(checks.authenticated)
@wsme_pecan.wsexpose([SubscriptionEvent], int, int, unicode,
int, unicode, unicode)
def get(self, marker=None, limit=None, event_type=None,
subscriber_id=None, sort_field='id', sort_dir='asc'):
"""Retrieve a list of subscriptions.
:param marker: The resource id where the page should begin.
:param limit: The number of subscriptions to retrieve.
:param event_type: The type of resource to search by.
:param subscriber_id: The unique ID of the subscriber to search by.
:param sort_field: The name of the field to sort on.
:param sort_dir: sort direction for results (asc, desc).
"""
# Boundary check on limit.
if limit is None:
limit = CONF.page_size_default
limit = min(CONF.page_size_maximum, max(1, limit))
# Resolve the marker record.
marker_sub = subscription_events_api.subscription_events_get(marker)
current_user = user_api.user_get(request.current_user_id)
if current_user.id != subscriber_id and \
not current_user.is_superuser:
abort(403, "Permission Denied")
if marker_sub and marker_sub.user_id != subscriber_id:
marker_sub = None
subscriptions = subscription_events_api.subscription_events_get_all(
marker=marker_sub,
limit=limit,
subscriber_id=subscriber_id,
event_type=event_type,
sort_field=sort_field,
sort_dir=sort_dir)
subscription_count = \
subscription_events_api.subscription_events_get_count(
subscriber_id=subscriber_id,
event_type=event_type)
# Apply the query response headers.
response.headers['X-Limit'] = str(limit)
response.headers['X-Total'] = str(subscription_count)
if marker_sub:
response.headers['X-Marker'] = str(marker_sub.id)
return [SubscriptionEvent.from_db_model(s) for s in subscriptions]
@secure(checks.authenticated)
@wsme_pecan.wsexpose(None, int)
def delete(self, subscription_event_id):
"""Delete a specific subscription.
:param subscription_event_id: The unique id of the
subscription_event to delete.
"""
subscription_event = subscription_events_api \
.subscription_events_get(subscription_event_id)
current_user = user_api.user_get(request.current_user_id)
if current_user.id != subscription_event.subscriber_id and \
not current_user.is_superuser:
abort(403, "Permission Denied")
subscription_events_api.subscription_events_delete(
subscription_event_id)
response.status_code = 204

View File

@@ -17,6 +17,7 @@ from storyboard.api.v1.auth import AuthController
from storyboard.api.v1.project_groups import ProjectGroupsController
from storyboard.api.v1.projects import ProjectsController
from storyboard.api.v1.stories import StoriesController
from storyboard.api.v1.subscription_events import SubscriptionEventsController
from storyboard.api.v1.subscriptions import SubscriptionsController
from storyboard.api.v1.tasks import TasksController
from storyboard.api.v1.teams import TeamsController
@@ -32,5 +33,6 @@ class V1Controller(object):
stories = StoriesController()
tasks = TasksController()
subscriptions = SubscriptionsController()
subscription_events = SubscriptionEventsController()
openid = AuthController()

View File

@@ -0,0 +1,43 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from storyboard.db.api import base as api_base
from storyboard.db import models
def subscription_events_get(subscription_event_id):
return api_base.entity_get(models.SubscriptionEvents,
subscription_event_id)
def subscription_events_get_all(**kwargs):
return api_base.entity_get_all(models.SubscriptionEvents,
**kwargs)
def subscription_events_get_count(**kwargs):
return api_base.entity_get_count(models.SubscriptionEvents, **kwargs)
def subscription_events_create(values):
return api_base.entity_create(models.SubscriptionEvents, values)
def subscription_events_delete(subscription_event_id):
subscription = subscription_events_get(subscription_event_id)
if subscription:
api_base.entity_hard_delete(models.SubscriptionEvents,
subscription_event_id)

View File

@@ -20,7 +20,7 @@ from pecan import request
from storyboard.common import event_types
from storyboard.db.api import base as api_base
from storyboard.db import models
from storyboard.notifications import connection_service
from storyboard.notifications.publisher import publish
CONF = cfg.CONF
@@ -48,25 +48,13 @@ def event_create(values):
new_event = api_base.entity_create(models.TimeLineEvent, values)
if CONF.enable_notifications:
payload_timeline_events = {
payload = {
"user_id": request.current_user_id,
"method": "POST",
"resource": "timeline_event",
"resource": "timeline_events",
"event_id": new_event.id
}
payload_timeline_events = json.dumps(payload_timeline_events)
routing_key = "timeline_events"
conn = connection_service.get_connection()
channel = conn.connection.channel()
conn.create_exchange(channel, 'storyboard', 'topic')
channel.basic_publish(exchange='storyboard',
routing_key=routing_key,
body=payload_timeline_events)
channel.close()
publish(payload, "timeline_events")
return new_event

View File

@@ -0,0 +1,54 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""This migration creates the subscription_events table.
Revision ID: 024
Revises: 023
Create Date: 2014-08-05 15:37:30.662966
"""
# revision identifiers, used by Alembic.
revision = '024'
down_revision = '023'
from alembic import op
import sqlalchemy as sa
MYSQL_ENGINE = 'InnoDB'
MYSQL_CHARSET = 'utf8'
def upgrade(active_plugins=None, options=None):
op.create_table(
'subscription_events',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('subscriber_id', sa.Integer(), nullable=False),
sa.Column('event_type', sa.Unicode(100), nullable=False),
sa.Column('event_info', sa.UnicodeText(), nullable=True),
sa.PrimaryKeyConstraint('id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET
)
def downgrade(active_plugins=None, options=None):
op.drop_table('subscription_events')

View File

@@ -320,3 +320,11 @@ class Subscription(Base):
# Cant use foreign key here as it depends on the type
target_id = Column(Integer)
class SubscriptionEvents(Base):
__tablename__ = 'subscription_events'
subscriber_id = Column(Integer, ForeignKey('users.id'))
event_type = Column(Unicode(100), nullable=False)
event_info = Column(UnicodeText(), nullable=True)

View File

@@ -26,44 +26,16 @@ CONF = cfg.CONF
LOG = log.getLogger(__name__)
def publish(state):
def parse(s):
url_pattern = re.match("^\/v1\/([a-z]+)\/?([0-9]+)?"
"\/?([a-z]+)?$", s)
if url_pattern and url_pattern.groups()[0] != "openid":
return url_pattern.groups()
else:
return
request = state.request
req_method = request.method
req_user_id = request.current_user_id
req_path = request.path
req_resource_grp = parse(req_path)
if req_resource_grp:
resource = req_resource_grp[0]
resource_id = req_resource_grp[1]
def parse(s):
url_pattern = re.match("^\/v1\/([a-z_]+)\/?([0-9]+)?"
"\/?([a-z]+)?\/?([0-9]+)?$", s)
if url_pattern and url_pattern.groups()[0] != "openid":
return url_pattern.groups()
else:
return
if not resource_id:
response_str = state.response.body
response = json.loads(response_str)
if response:
resource_id = response.get('id')
else:
resource_id = None
payload = {
"user_id": req_user_id,
"method": req_method,
"resource_name": resource,
"resource_id": resource_id,
}
def publish(payload, resource):
payload = json.dumps(payload)
routing_key = resource
conn = connection_service.get_connection()
@@ -74,4 +46,53 @@ def publish(state):
channel.basic_publish(exchange='storyboard',
routing_key=routing_key,
body=payload)
channel.close()
def process(state):
request = state.request
req_method = request.method
req_user_id = request.current_user_id
req_path = request.path
req_resource_grp = parse(req_path)
if req_resource_grp:
resource = req_resource_grp[0]
if req_resource_grp[1]:
resource_id = req_resource_grp[1]
# When a resource is created..
else:
response_str = state.response.body
response = json.loads(response_str)
if response:
resource_id = response.get('id')
else:
resource_id = None
# when adding/removing projects to project_groups..
if req_resource_grp[3]:
sub_resource_id = req_resource_grp[3]
payload = {
"user_id": req_user_id,
"method": req_method,
"resource": resource,
"resource_id": resource_id,
"sub_resource_id": sub_resource_id
}
else:
payload = {
"user_id": req_user_id,
"method": req_method,
"resource": resource,
"resource_id": resource_id
}
publish(payload, resource)
else:
return

View File

@@ -13,8 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
from oslo.config import cfg
from storyboard.db.api import timeline_events
from storyboard.notifications import connection_service
from storyboard.notifications.subscriptions_handler import handle_deletions
from storyboard.notifications.subscriptions_handler import handle_resources
from storyboard.notifications.subscriptions_handler import \
handle_timeline_events
from storyboard.openstack.common import log
CONF = cfg.CONF
@@ -22,24 +30,48 @@ LOG = log.getLogger(__name__)
def subscribe():
log.setup('storyboard')
CONF(project='storyboard')
connection_service.initialize()
conn = connection_service.get_connection()
channel = conn.connection.channel()
conn.create_exchange(channel, 'storyboard', 'topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = ['projects', 'tasks', 'stories', 'timeline_events']
conn.create_exchange(channel, 'storyboard', 'topic')
result = channel.queue_declare(queue='subscription_queue', durable=True)
queue_name = result.method.queue
binding_keys = ['tasks', 'stories', 'projects', 'project_groups',
'timeline_events']
for binding_key in binding_keys:
channel.queue_bind(exchange='storyboard',
queue=queue_name,
routing_key=binding_key)
def callback(ch, method, properties, body):
print(" [x] %r %r %r %r"
% (method.routing_key, body, ch, properties))
body_dict = ast.literal_eval(body)
if 'event_id' in body_dict:
event_id = body_dict['event_id']
event = timeline_events.event_get(event_id)
handle_timeline_events(event)
else:
if body_dict['resource'] == 'project_groups':
if 'sub_resource_id' in body_dict:
handle_resources(body_dict['method'],
body_dict['resource_id'],
body_dict['sub_resource_id'])
else:
handle_resources(body_dict['method'],
body_dict['resource_id'])
if body_dict['method'] == 'DELETE':
resource_name = body_dict['resource']
resource_id = body_dict['resource_id']
if 'sub_resource_id' not in body_dict:
handle_deletions(resource_name, resource_id)
channel.basic_consume(callback,
queue=queue_name,

View File

@@ -0,0 +1,156 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from storyboard.db.api import comments as comments_api
from storyboard.db.api import project_groups as project_groups_api
from storyboard.db.api import subscription_events as sub_events_api
from storyboard.db.api import subscriptions as subscriptions_api
from storyboard.db.api import tasks as tasks_api
def handle_timeline_events(event):
target_subs = []
user_ids = set()
story_id = event.story_id
if event.event_info:
event_info = json.loads(event.event_info)
task_id = event_info.get("task_id")
# Handling tasks targeted.
target_sub = subscriptions_api.subscription_get_all_by_target(
'task', task_id)
target_subs.extend(target_sub)
# Handling stories targeted.
target_sub = subscriptions_api.subscription_get_all_by_target(
'story', story_id)
target_subs.extend(target_sub)
# Handling projects, project groups targeted for stories without tasks.
tasks = tasks_api.task_get_all(story_id=story_id)
for task in tasks:
project_id = task.project_id
# Handling projects targeted.
target_sub = subscriptions_api.subscription_get_all_by_target(
'project', project_id)
target_subs.extend(target_sub)
# Handling project groups targeted.
pgs = project_groups_api.project_group_get_all(project_id=project_id)
for pg in pgs:
target_sub = subscriptions_api.subscription_get_all_by_target(
'project_group', pg.id)
target_subs.extend(target_sub)
for sub in target_subs:
user_ids.add(sub.user_id)
for user_id in user_ids:
if event.event_type == 'user_comment':
event_info = resolve_comments(event)
else:
event_info = event.event_info
sub_events_api.subscription_events_create({
"subscriber_id": user_id,
"event_type": event.event_type,
"event_info": event_info
})
def handle_resources(method, *argv):
target_subs = []
user_ids = set()
resource_id = argv[0]
if len(argv) > 1:
sub_resource_id = argv[1]
# Handling project addition/deletion to/from project_group.
target_sub = subscriptions_api.subscription_get_all_by_target(
'project', sub_resource_id)
target_subs.extend(target_sub)
for sub in target_subs:
user_ids.add(sub.user_id)
for user_id in user_ids:
if method == 'DELETE':
event_type = 'project removed from project_group'
event_info = json.dumps({'project_group_id': resource_id,
'project_id': sub_resource_id})
else:
event_type = 'project added to project_group'
event_info = json.dumps({'project_group_id': resource_id,
'project_id': sub_resource_id})
sub_events_api.subscription_events_create({
"subscriber_id": user_id,
"event_type": event_type,
"event_info": event_info
})
else:
if method == 'DELETE':
#Handling project_group targeted.
target_sub = subscriptions_api.subscription_get_all_by_target(
'project_group', resource_id)
target_subs.extend(target_sub)
for sub in target_subs:
user_ids.add(sub.user_id)
for user_id in user_ids:
sub_events_api.subscription_events_create({
"subscriber_id": user_id,
"event_type": 'project_group deleted',
"event_info": json.dumps({'project_group_id': resource_id})
})
def handle_deletions(resource_name, resource_id):
target_subs = []
sub_ids = set()
resource_name = resource_name[:-1]
target_sub = subscriptions_api.subscription_get_all_by_target(
resource_name, resource_id)
target_subs.extend(target_sub)
for sub in target_subs:
sub_ids.add(sub.id)
for sub_id in sub_ids:
subscriptions_api.subscription_delete(sub_id)
def resolve_comments(event):
comment_id = event.comment_id
comment = comments_api.comment_get(comment_id)
comment_content = comment.content
return json.dumps({'comment_id': comment_id, 'comment_content':
comment_content})