Merge "Event filtering for non-admin users"

This commit is contained in:
Jenkins 2015-09-02 04:41:16 +00:00 committed by Gerrit Code Review
commit 40be122a8b
2 changed files with 100 additions and 2 deletions

View File

@ -157,6 +157,18 @@ class Event(base.Base):
)
def _add_user_proj_filter():
traits_filter = []
# Returns user_id, proj_id for non-admins
user_id, proj_id = rbac.get_limited_to(pecan.request.headers)
# If non-admin, filter events by user and project
if (user_id and proj_id):
traits_filter.append({"key": "project_id", "string": proj_id,
"op": "eq"})
traits_filter.append({"key": "user_id", "string": user_id, "op": "eq"})
return traits_filter
def _event_query_to_event_filter(q):
evt_model_filter = {
'event_type': None,
@ -164,7 +176,7 @@ def _event_query_to_event_filter(q):
'start_timestamp': None,
'end_timestamp': None
}
traits_filter = []
traits_filter = _add_user_proj_filter()
for i in q:
if not i.op:
@ -267,7 +279,9 @@ class EventsController(rest.RestController):
:param message_id: Message ID of the Event to be returned
"""
rbac.enforce("events:show", pecan.request)
event_filter = storage.EventFilter(message_id=message_id)
t_filter = _add_user_proj_filter()
event_filter = storage.EventFilter(traits_filter=t_filter,
message_id=message_id)
events = [event for event
in pecan.request.event_storage_conn.get_events(event_filter)]
if not events:

View File

@ -457,6 +457,90 @@ class TestEventAPI(EventTestBase):
'op': 'el'}])
class AclRestrictedEventTestBase(v2.FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):
def setUp(self):
super(AclRestrictedEventTestBase, self).setUp()
self.admin_user_id = uuid.uuid4().hex
self.admin_proj_id = uuid.uuid4().hex
self.user_id = uuid.uuid4().hex
self.proj_id = uuid.uuid4().hex
self._generate_models()
def _generate_models(self):
event_models = []
self.s_time = datetime.datetime(2013, 12, 31, 5, 0)
event_models.append(
models.Event(message_id='1',
event_type='empty_ev',
generated=self.s_time,
traits=[models.Trait('random',
models.Trait.TEXT_TYPE,
'blah')],
raw={}))
event_models.append(
models.Event(message_id='2',
event_type='admin_ev',
generated=self.s_time,
traits=[models.Trait('project_id',
models.Trait.TEXT_TYPE,
self.admin_proj_id),
models.Trait('user_id',
models.Trait.TEXT_TYPE,
self.admin_user_id)],
raw={}))
event_models.append(
models.Event(message_id='3',
event_type='user_ev',
generated=self.s_time,
traits=[models.Trait('project_id',
models.Trait.TEXT_TYPE,
self.proj_id),
models.Trait('user_id',
models.Trait.TEXT_TYPE,
self.user_id)],
raw={}))
self.event_conn.record_events(event_models)
def test_non_admin_access(self):
a_headers = {"X-Roles": "member",
"X-User-Id": self.user_id,
"X-Project-Id": self.proj_id}
data = self.get_json('/events', headers=a_headers)
self.assertEqual(1, len(data))
self.assertEqual('user_ev', data[0]['event_type'])
def test_non_admin_access_single(self):
a_headers = {"X-Roles": "member",
"X-User-Id": self.user_id,
"X-Project-Id": self.proj_id}
data = self.get_json('/events/3', headers=a_headers)
self.assertEqual('user_ev', data['event_type'])
def test_non_admin_access_incorrect_user(self):
a_headers = {"X-Roles": "member",
"X-User-Id": 'blah',
"X-Project-Id": self.proj_id}
data = self.get_json('/events', headers=a_headers)
self.assertEqual(0, len(data))
def test_non_admin_access_incorrect_proj(self):
a_headers = {"X-Roles": "member",
"X-User-Id": self.user_id,
"X-Project-Id": 'blah'}
data = self.get_json('/events', headers=a_headers)
self.assertEqual(0, len(data))
def test_non_admin_access_single_invalid(self):
a_headers = {"X-Roles": "member",
"X-User-Id": self.user_id,
"X-Project-Id": self.proj_id}
data = self.get_json('/events/1', headers=a_headers,
expect_errors=True)
self.assertEqual(404, data.status_int)
class EventRestrictionTestBase(v2.FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):