Event filtering for non-admin users

Non-admin users should be restricted to see only those
events that were generated by them and not the events
associated with other users.The current behavior doesn't
have any restrictions on the events that a non-admin user
can view.As part of this patch, a non-admin user will see
only the events that are associated with the user_id and
project_id that the user requesting events belongs to.
Events stored in the ceilometer database has information
from all other services and in a cloud environment, there
must be restrictions on the data a non-admin user can access.

Change-Id: I2ed7425c14a70d1db4621a277ba5f4f575a15eba
blueprint: events-rbac
This commit is contained in:
Divya 2015-08-27 20:41:10 +02:00 committed by gordon chung
parent 91ce9e1f56
commit 4732fd516d
2 changed files with 100 additions and 2 deletions

View File

@ -157,6 +157,18 @@ class Event(base.Base):
)
def _add_user_proj_filter():
traits_filter = []
# Returns user_id, proj_id for non-admins
user_id, proj_id = rbac.get_limited_to(pecan.request.headers)
# If non-admin, filter events by user and project
if (user_id and proj_id):
traits_filter.append({"key": "project_id", "string": proj_id,
"op": "eq"})
traits_filter.append({"key": "user_id", "string": user_id, "op": "eq"})
return traits_filter
def _event_query_to_event_filter(q):
evt_model_filter = {
'event_type': None,
@ -164,7 +176,7 @@ def _event_query_to_event_filter(q):
'start_timestamp': None,
'end_timestamp': None
}
traits_filter = []
traits_filter = _add_user_proj_filter()
for i in q:
if not i.op:
@ -267,7 +279,9 @@ class EventsController(rest.RestController):
:param message_id: Message ID of the Event to be returned
"""
rbac.enforce("events:show", pecan.request)
event_filter = storage.EventFilter(message_id=message_id)
t_filter = _add_user_proj_filter()
event_filter = storage.EventFilter(traits_filter=t_filter,
message_id=message_id)
events = [event for event
in pecan.request.event_storage_conn.get_events(event_filter)]
if not events:

View File

@ -457,6 +457,90 @@ class TestEventAPI(EventTestBase):
'op': 'el'}])
class AclRestrictedEventTestBase(v2.FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):
def setUp(self):
super(AclRestrictedEventTestBase, self).setUp()
self.admin_user_id = uuid.uuid4().hex
self.admin_proj_id = uuid.uuid4().hex
self.user_id = uuid.uuid4().hex
self.proj_id = uuid.uuid4().hex
self._generate_models()
def _generate_models(self):
event_models = []
self.s_time = datetime.datetime(2013, 12, 31, 5, 0)
event_models.append(
models.Event(message_id='1',
event_type='empty_ev',
generated=self.s_time,
traits=[models.Trait('random',
models.Trait.TEXT_TYPE,
'blah')],
raw={}))
event_models.append(
models.Event(message_id='2',
event_type='admin_ev',
generated=self.s_time,
traits=[models.Trait('project_id',
models.Trait.TEXT_TYPE,
self.admin_proj_id),
models.Trait('user_id',
models.Trait.TEXT_TYPE,
self.admin_user_id)],
raw={}))
event_models.append(
models.Event(message_id='3',
event_type='user_ev',
generated=self.s_time,
traits=[models.Trait('project_id',
models.Trait.TEXT_TYPE,
self.proj_id),
models.Trait('user_id',
models.Trait.TEXT_TYPE,
self.user_id)],
raw={}))
self.event_conn.record_events(event_models)
def test_non_admin_access(self):
a_headers = {"X-Roles": "member",
"X-User-Id": self.user_id,
"X-Project-Id": self.proj_id}
data = self.get_json('/events', headers=a_headers)
self.assertEqual(1, len(data))
self.assertEqual('user_ev', data[0]['event_type'])
def test_non_admin_access_single(self):
a_headers = {"X-Roles": "member",
"X-User-Id": self.user_id,
"X-Project-Id": self.proj_id}
data = self.get_json('/events/3', headers=a_headers)
self.assertEqual('user_ev', data['event_type'])
def test_non_admin_access_incorrect_user(self):
a_headers = {"X-Roles": "member",
"X-User-Id": 'blah',
"X-Project-Id": self.proj_id}
data = self.get_json('/events', headers=a_headers)
self.assertEqual(0, len(data))
def test_non_admin_access_incorrect_proj(self):
a_headers = {"X-Roles": "member",
"X-User-Id": self.user_id,
"X-Project-Id": 'blah'}
data = self.get_json('/events', headers=a_headers)
self.assertEqual(0, len(data))
def test_non_admin_access_single_invalid(self):
a_headers = {"X-Roles": "member",
"X-User-Id": self.user_id,
"X-Project-Id": self.proj_id}
data = self.get_json('/events/1', headers=a_headers,
expect_errors=True)
self.assertEqual(404, data.status_int)
class EventRestrictionTestBase(v2.FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):