From 4732fd516d8cc3c6401705078bf9dd5b5f39b560 Mon Sep 17 00:00:00 2001 From: Divya Date: Thu, 27 Aug 2015 20:41:10 +0200 Subject: [PATCH] Event filtering for non-admin users Non-admin users should be restricted to see only those events that were generated by them and not the events associated with other users.The current behavior doesn't have any restrictions on the events that a non-admin user can view.As part of this patch, a non-admin user will see only the events that are associated with the user_id and project_id that the user requesting events belongs to. Events stored in the ceilometer database has information from all other services and in a cloud environment, there must be restrictions on the data a non-admin user can access. Change-Id: I2ed7425c14a70d1db4621a277ba5f4f575a15eba blueprint: events-rbac --- ceilometer/api/controllers/v2/events.py | 18 +++- .../functional/api/v2/test_event_scenarios.py | 84 +++++++++++++++++++ 2 files changed, 100 insertions(+), 2 deletions(-) diff --git a/ceilometer/api/controllers/v2/events.py b/ceilometer/api/controllers/v2/events.py index 6fd92a8046..b314dde0e4 100644 --- a/ceilometer/api/controllers/v2/events.py +++ b/ceilometer/api/controllers/v2/events.py @@ -157,6 +157,18 @@ class Event(base.Base): ) +def _add_user_proj_filter(): + traits_filter = [] + # Returns user_id, proj_id for non-admins + user_id, proj_id = rbac.get_limited_to(pecan.request.headers) + # If non-admin, filter events by user and project + if (user_id and proj_id): + traits_filter.append({"key": "project_id", "string": proj_id, + "op": "eq"}) + traits_filter.append({"key": "user_id", "string": user_id, "op": "eq"}) + return traits_filter + + def _event_query_to_event_filter(q): evt_model_filter = { 'event_type': None, @@ -164,7 +176,7 @@ def _event_query_to_event_filter(q): 'start_timestamp': None, 'end_timestamp': None } - traits_filter = [] + traits_filter = _add_user_proj_filter() for i in q: if not i.op: @@ -267,7 +279,9 @@ class EventsController(rest.RestController): :param message_id: Message ID of the Event to be returned """ rbac.enforce("events:show", pecan.request) - event_filter = storage.EventFilter(message_id=message_id) + t_filter = _add_user_proj_filter() + event_filter = storage.EventFilter(traits_filter=t_filter, + message_id=message_id) events = [event for event in pecan.request.event_storage_conn.get_events(event_filter)] if not events: diff --git a/ceilometer/tests/functional/api/v2/test_event_scenarios.py b/ceilometer/tests/functional/api/v2/test_event_scenarios.py index ab92aef687..aba01f0d7e 100644 --- a/ceilometer/tests/functional/api/v2/test_event_scenarios.py +++ b/ceilometer/tests/functional/api/v2/test_event_scenarios.py @@ -457,6 +457,90 @@ class TestEventAPI(EventTestBase): 'op': 'el'}]) +class AclRestrictedEventTestBase(v2.FunctionalTest, + tests_db.MixinTestsWithBackendScenarios): + + def setUp(self): + super(AclRestrictedEventTestBase, self).setUp() + self.admin_user_id = uuid.uuid4().hex + self.admin_proj_id = uuid.uuid4().hex + self.user_id = uuid.uuid4().hex + self.proj_id = uuid.uuid4().hex + self._generate_models() + + def _generate_models(self): + event_models = [] + self.s_time = datetime.datetime(2013, 12, 31, 5, 0) + event_models.append( + models.Event(message_id='1', + event_type='empty_ev', + generated=self.s_time, + traits=[models.Trait('random', + models.Trait.TEXT_TYPE, + 'blah')], + raw={})) + event_models.append( + models.Event(message_id='2', + event_type='admin_ev', + generated=self.s_time, + traits=[models.Trait('project_id', + models.Trait.TEXT_TYPE, + self.admin_proj_id), + models.Trait('user_id', + models.Trait.TEXT_TYPE, + self.admin_user_id)], + raw={})) + event_models.append( + models.Event(message_id='3', + event_type='user_ev', + generated=self.s_time, + traits=[models.Trait('project_id', + models.Trait.TEXT_TYPE, + self.proj_id), + models.Trait('user_id', + models.Trait.TEXT_TYPE, + self.user_id)], + raw={})) + self.event_conn.record_events(event_models) + + def test_non_admin_access(self): + a_headers = {"X-Roles": "member", + "X-User-Id": self.user_id, + "X-Project-Id": self.proj_id} + data = self.get_json('/events', headers=a_headers) + self.assertEqual(1, len(data)) + self.assertEqual('user_ev', data[0]['event_type']) + + def test_non_admin_access_single(self): + a_headers = {"X-Roles": "member", + "X-User-Id": self.user_id, + "X-Project-Id": self.proj_id} + data = self.get_json('/events/3', headers=a_headers) + self.assertEqual('user_ev', data['event_type']) + + def test_non_admin_access_incorrect_user(self): + a_headers = {"X-Roles": "member", + "X-User-Id": 'blah', + "X-Project-Id": self.proj_id} + data = self.get_json('/events', headers=a_headers) + self.assertEqual(0, len(data)) + + def test_non_admin_access_incorrect_proj(self): + a_headers = {"X-Roles": "member", + "X-User-Id": self.user_id, + "X-Project-Id": 'blah'} + data = self.get_json('/events', headers=a_headers) + self.assertEqual(0, len(data)) + + def test_non_admin_access_single_invalid(self): + a_headers = {"X-Roles": "member", + "X-User-Id": self.user_id, + "X-Project-Id": self.proj_id} + data = self.get_json('/events/1', headers=a_headers, + expect_errors=True) + self.assertEqual(404, data.status_int) + + class EventRestrictionTestBase(v2.FunctionalTest, tests_db.MixinTestsWithBackendScenarios):