diff --git a/ceilometer/api/controllers/v2/events.py b/ceilometer/api/controllers/v2/events.py index 6fd92a8046..b314dde0e4 100644 --- a/ceilometer/api/controllers/v2/events.py +++ b/ceilometer/api/controllers/v2/events.py @@ -157,6 +157,18 @@ class Event(base.Base): ) +def _add_user_proj_filter(): + traits_filter = [] + # Returns user_id, proj_id for non-admins + user_id, proj_id = rbac.get_limited_to(pecan.request.headers) + # If non-admin, filter events by user and project + if (user_id and proj_id): + traits_filter.append({"key": "project_id", "string": proj_id, + "op": "eq"}) + traits_filter.append({"key": "user_id", "string": user_id, "op": "eq"}) + return traits_filter + + def _event_query_to_event_filter(q): evt_model_filter = { 'event_type': None, @@ -164,7 +176,7 @@ def _event_query_to_event_filter(q): 'start_timestamp': None, 'end_timestamp': None } - traits_filter = [] + traits_filter = _add_user_proj_filter() for i in q: if not i.op: @@ -267,7 +279,9 @@ class EventsController(rest.RestController): :param message_id: Message ID of the Event to be returned """ rbac.enforce("events:show", pecan.request) - event_filter = storage.EventFilter(message_id=message_id) + t_filter = _add_user_proj_filter() + event_filter = storage.EventFilter(traits_filter=t_filter, + message_id=message_id) events = [event for event in pecan.request.event_storage_conn.get_events(event_filter)] if not events: diff --git a/ceilometer/tests/functional/api/v2/test_event_scenarios.py b/ceilometer/tests/functional/api/v2/test_event_scenarios.py index ab92aef687..aba01f0d7e 100644 --- a/ceilometer/tests/functional/api/v2/test_event_scenarios.py +++ b/ceilometer/tests/functional/api/v2/test_event_scenarios.py @@ -457,6 +457,90 @@ class TestEventAPI(EventTestBase): 'op': 'el'}]) +class AclRestrictedEventTestBase(v2.FunctionalTest, + tests_db.MixinTestsWithBackendScenarios): + + def setUp(self): + super(AclRestrictedEventTestBase, self).setUp() + self.admin_user_id = uuid.uuid4().hex + self.admin_proj_id = uuid.uuid4().hex + self.user_id = uuid.uuid4().hex + self.proj_id = uuid.uuid4().hex + self._generate_models() + + def _generate_models(self): + event_models = [] + self.s_time = datetime.datetime(2013, 12, 31, 5, 0) + event_models.append( + models.Event(message_id='1', + event_type='empty_ev', + generated=self.s_time, + traits=[models.Trait('random', + models.Trait.TEXT_TYPE, + 'blah')], + raw={})) + event_models.append( + models.Event(message_id='2', + event_type='admin_ev', + generated=self.s_time, + traits=[models.Trait('project_id', + models.Trait.TEXT_TYPE, + self.admin_proj_id), + models.Trait('user_id', + models.Trait.TEXT_TYPE, + self.admin_user_id)], + raw={})) + event_models.append( + models.Event(message_id='3', + event_type='user_ev', + generated=self.s_time, + traits=[models.Trait('project_id', + models.Trait.TEXT_TYPE, + self.proj_id), + models.Trait('user_id', + models.Trait.TEXT_TYPE, + self.user_id)], + raw={})) + self.event_conn.record_events(event_models) + + def test_non_admin_access(self): + a_headers = {"X-Roles": "member", + "X-User-Id": self.user_id, + "X-Project-Id": self.proj_id} + data = self.get_json('/events', headers=a_headers) + self.assertEqual(1, len(data)) + self.assertEqual('user_ev', data[0]['event_type']) + + def test_non_admin_access_single(self): + a_headers = {"X-Roles": "member", + "X-User-Id": self.user_id, + "X-Project-Id": self.proj_id} + data = self.get_json('/events/3', headers=a_headers) + self.assertEqual('user_ev', data['event_type']) + + def test_non_admin_access_incorrect_user(self): + a_headers = {"X-Roles": "member", + "X-User-Id": 'blah', + "X-Project-Id": self.proj_id} + data = self.get_json('/events', headers=a_headers) + self.assertEqual(0, len(data)) + + def test_non_admin_access_incorrect_proj(self): + a_headers = {"X-Roles": "member", + "X-User-Id": self.user_id, + "X-Project-Id": 'blah'} + data = self.get_json('/events', headers=a_headers) + self.assertEqual(0, len(data)) + + def test_non_admin_access_single_invalid(self): + a_headers = {"X-Roles": "member", + "X-User-Id": self.user_id, + "X-Project-Id": self.proj_id} + data = self.get_json('/events/1', headers=a_headers, + expect_errors=True) + self.assertEqual(404, data.status_int) + + class EventRestrictionTestBase(v2.FunctionalTest, tests_db.MixinTestsWithBackendScenarios):