Merge "Support all_tenants search_opts for neutron"
This commit is contained in:
commit
feb01439cf
@ -152,15 +152,28 @@ class SecurityGroupAPI(security_group_base.SecurityGroupBase):
|
||||
search_opts=None):
|
||||
"""Returns list of security group rules owned by tenant."""
|
||||
neutron = neutronapi.get_client(context)
|
||||
search_opts = {}
|
||||
params = {}
|
||||
search_opts = search_opts if search_opts else {}
|
||||
if names:
|
||||
search_opts['name'] = names
|
||||
params['name'] = names
|
||||
if ids:
|
||||
search_opts['id'] = ids
|
||||
if project:
|
||||
search_opts['tenant_id'] = project
|
||||
params['id'] = ids
|
||||
|
||||
# NOTE(jeffrey4l): list all the security groups when following
|
||||
# conditions are met
|
||||
# * names and ids don't exist.
|
||||
# * it is admin context and all_tenants exist in search_opts.
|
||||
# * project is not specified.
|
||||
list_all_tenants = (context.is_admin
|
||||
and 'all_tenants' in search_opts
|
||||
and not any([names, ids]))
|
||||
# NOTE(jeffrey4l): The neutron doesn't have `all-tenants` concept.
|
||||
# All the security group will be returned if the project/tenant
|
||||
# id is not passed.
|
||||
if project and not list_all_tenants:
|
||||
params['tenant_id'] = project
|
||||
try:
|
||||
security_groups = neutron.list_security_groups(**search_opts).get(
|
||||
security_groups = neutron.list_security_groups(**params).get(
|
||||
'security_groups')
|
||||
except n_exc.NeutronClientException:
|
||||
with excutils.save_and_reraise_exception():
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
import mock
|
||||
from mox3 import mox
|
||||
from neutronclient.common import exceptions as n_exc
|
||||
from neutronclient.v2_0 import client
|
||||
@ -48,6 +49,100 @@ class TestNeutronDriver(test.NoDBTestCase):
|
||||
sg_api = neutron_driver.SecurityGroupAPI()
|
||||
sg_api.list(self.context, project=project_id)
|
||||
|
||||
def test_list_with_all_tenants_and_admin_context(self):
|
||||
project_id = '0af70a4d22cf4652824ddc1f2435dd85'
|
||||
search_opts = {'all_tenants': 1}
|
||||
security_groups_list = {'security_groups': []}
|
||||
admin_context = context.RequestContext('user1', project_id, True)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
with mock.patch.object(
|
||||
self.moxed_client,
|
||||
'list_security_groups',
|
||||
return_value=security_groups_list) as mock_list_secgroup:
|
||||
sg_api = neutron_driver.SecurityGroupAPI()
|
||||
sg_api.list(admin_context,
|
||||
project=project_id,
|
||||
search_opts=search_opts)
|
||||
|
||||
mock_list_secgroup.assert_called_once_with()
|
||||
|
||||
def test_list_without_all_tenants_and_admin_context(self):
|
||||
project_id = '0af70a4d22cf4652824ddc1f2435dd85'
|
||||
security_groups_list = {'security_groups': []}
|
||||
admin_context = context.RequestContext('user1', project_id, True)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
with mock.patch.object(
|
||||
self.moxed_client,
|
||||
'list_security_groups',
|
||||
return_value=security_groups_list) as mock_list_secgroup:
|
||||
sg_api = neutron_driver.SecurityGroupAPI()
|
||||
sg_api.list(admin_context, project=project_id)
|
||||
|
||||
mock_list_secgroup.assert_called_once_with(tenant_id=project_id)
|
||||
|
||||
def test_list_with_all_tenants_sec_name_and_admin_context(self):
|
||||
project_id = '0af70a4d22cf4652824ddc1f2435dd85'
|
||||
search_opts = {'all_tenants': 1}
|
||||
security_group_names = ['secgroup_ssh']
|
||||
security_groups_list = {'security_groups': []}
|
||||
admin_context = context.RequestContext('user1', project_id, True)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
with mock.patch.object(
|
||||
self.moxed_client,
|
||||
'list_security_groups',
|
||||
return_value=security_groups_list) as mock_list_secgroup:
|
||||
sg_api = neutron_driver.SecurityGroupAPI()
|
||||
sg_api.list(admin_context, project=project_id,
|
||||
names=security_group_names,
|
||||
search_opts=search_opts)
|
||||
|
||||
mock_list_secgroup.assert_called_once_with(
|
||||
name=security_group_names,
|
||||
tenant_id=project_id)
|
||||
|
||||
def test_list_with_all_tenants_sec_name_ids_and_admin_context(self):
|
||||
project_id = '0af70a4d22cf4652824ddc1f2435dd85'
|
||||
search_opts = {'all_tenants': 1}
|
||||
security_group_names = ['secgroup_ssh']
|
||||
security_group_ids = ['id1']
|
||||
security_groups_list = {'security_groups': []}
|
||||
admin_context = context.RequestContext('user1', project_id, True)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
with mock.patch.object(
|
||||
self.moxed_client,
|
||||
'list_security_groups',
|
||||
return_value=security_groups_list) as mock_list_secgroup:
|
||||
sg_api = neutron_driver.SecurityGroupAPI()
|
||||
sg_api.list(admin_context, project=project_id,
|
||||
names=security_group_names,
|
||||
ids=security_group_ids,
|
||||
search_opts=search_opts)
|
||||
|
||||
mock_list_secgroup.assert_called_once_with(
|
||||
name=security_group_names,
|
||||
id=security_group_ids,
|
||||
tenant_id=project_id)
|
||||
|
||||
def test_list_with_all_tenants_not_admin(self):
|
||||
search_opts = {'all_tenants': 1}
|
||||
security_groups_list = {'security_groups': []}
|
||||
self.mox.ReplayAll()
|
||||
|
||||
with mock.patch.object(
|
||||
self.moxed_client,
|
||||
'list_security_groups',
|
||||
return_value=security_groups_list) as mock_list_secgroup:
|
||||
sg_api = neutron_driver.SecurityGroupAPI()
|
||||
sg_api.list(self.context, project=self.context.tenant,
|
||||
search_opts=search_opts)
|
||||
|
||||
mock_list_secgroup.assert_called_once_with(
|
||||
tenant_id=self.context.tenant)
|
||||
|
||||
def test_get_with_name_duplicated(self):
|
||||
sg_name = 'web_server'
|
||||
expected_sg_id = '85cc3048-abc3-43cc-89b3-377341426ac5'
|
||||
|
Loading…
x
Reference in New Issue
Block a user