NSXv: use contexts correctly while using threads

Contexts are not thread safe and therefore, methods which are called
as thread entry point should create their own context, and pass to
any called methods which are using contexts.

Change-Id: Ia8629c211807972d228358893a7b787c55b5be7f
This commit is contained in:
Kobi Samoray 2016-09-06 18:03:22 +03:00
parent c48a13a77d
commit fe72c1bd0c
6 changed files with 74 additions and 63 deletions

View File

@ -42,7 +42,7 @@ class RouterExclusiveDriver(router_driver.RouterBaseDriver):
availability_zone=availability_zone) availability_zone=availability_zone)
if allow_metadata: if allow_metadata:
self.plugin.metadata_proxy_handler.configure_router_edge( self.plugin.metadata_proxy_handler.configure_router_edge(
lrouter['id']) lrouter['id'], context)
def update_router(self, context, router_id, router): def update_router(self, context, router_id, router):
r = router['router'] r = router['router']
@ -84,7 +84,7 @@ class RouterExclusiveDriver(router_driver.RouterBaseDriver):
self.edge_manager.unbind_router_on_edge(context, router_id) self.edge_manager.unbind_router_on_edge(context, router_id)
metadata_proxy_handler = self.plugin.metadata_proxy_handler metadata_proxy_handler = self.plugin.metadata_proxy_handler
if metadata_proxy_handler: if metadata_proxy_handler:
metadata_proxy_handler.cleanup_router_edge(router_id) metadata_proxy_handler.cleanup_router_edge(context, router_id)
def _build_router_data_from_db(self, router_db, router): def _build_router_data_from_db(self, router_db, router):
"""Return a new dictionary with all DB & requested router attributes """Return a new dictionary with all DB & requested router attributes
@ -135,7 +135,7 @@ class RouterExclusiveDriver(router_driver.RouterBaseDriver):
def delete_router(self, context, router_id): def delete_router(self, context, router_id):
if self.plugin.metadata_proxy_handler: if self.plugin.metadata_proxy_handler:
self.plugin.metadata_proxy_handler.cleanup_router_edge( self.plugin.metadata_proxy_handler.cleanup_router_edge(
router_id) context, router_id)
self.edge_manager.delete_lrouter(context, router_id, dist=False) self.edge_manager.delete_lrouter(context, router_id, dist=False)
def update_routes(self, context, router_id, nexthop): def update_routes(self, context, router_id, nexthop):

View File

@ -584,7 +584,8 @@ class RouterSharedDriver(router_driver.RouterBaseDriver):
# configure metadata service on the router. # configure metadata service on the router.
metadata_proxy_handler = self.plugin.metadata_proxy_handler metadata_proxy_handler = self.plugin.metadata_proxy_handler
if metadata_proxy_handler and new: if metadata_proxy_handler and new:
metadata_proxy_handler.configure_router_edge(router_id) metadata_proxy_handler.configure_router_edge(router_id,
context)
edge_id = edge_utils.get_router_edge_id(context, router_id) edge_id = edge_utils.get_router_edge_id(context, router_id)
with locking.LockManager.get_lock(str(edge_id)): with locking.LockManager.get_lock(str(edge_id)):
# add all internal interfaces of the router on edge # add all internal interfaces of the router on edge
@ -602,7 +603,7 @@ class RouterSharedDriver(router_driver.RouterBaseDriver):
self.edge_manager.unbind_router_on_edge(context, router_id) self.edge_manager.unbind_router_on_edge(context, router_id)
metadata_proxy_handler = self.plugin.metadata_proxy_handler metadata_proxy_handler = self.plugin.metadata_proxy_handler
if metadata_proxy_handler: if metadata_proxy_handler:
metadata_proxy_handler.cleanup_router_edge(router_id) metadata_proxy_handler.cleanup_router_edge(context, router_id)
def _add_router_services_on_available_edge(self, context, router_id): def _add_router_services_on_available_edge(self, context, router_id):
router_ids = self.edge_manager.get_routers_on_same_edge( router_ids = self.edge_manager.get_routers_on_same_edge(

View File

@ -107,16 +107,16 @@ class NsxVMetadataProxyHandler(object):
def __init__(self, nsxv_plugin): def __init__(self, nsxv_plugin):
self.nsxv_plugin = nsxv_plugin self.nsxv_plugin = nsxv_plugin
self.context = neutron_context.get_admin_context() context = neutron_context.get_admin_context()
# Init cannot run concurrently on multiple nodes # Init cannot run concurrently on multiple nodes
with locking.LockManager.get_lock('nsx-metadata-init'): with locking.LockManager.get_lock('nsx-metadata-init'):
self.internal_net, self.internal_subnet = ( self.internal_net, self.internal_subnet = (
self._get_internal_network_and_subnet()) self._get_internal_network_and_subnet(context))
self.proxy_edge_ips = self._get_proxy_edges() self.proxy_edge_ips = self._get_proxy_edges(context)
def _create_metadata_internal_network(self, cidr): def _create_metadata_internal_network(self, context, cidr):
# Neutron requires a network to have some tenant_id # Neutron requires a network to have some tenant_id
tenant_id = nsxv_constants.INTERNAL_TENANT_ID tenant_id = nsxv_constants.INTERNAL_TENANT_ID
@ -125,7 +125,7 @@ class NsxVMetadataProxyHandler(object):
'port_security_enabled': False, 'port_security_enabled': False,
'shared': False, 'shared': False,
'tenant_id': tenant_id}} 'tenant_id': tenant_id}}
net = self.nsxv_plugin.create_network(self.context, net_data) net = self.nsxv_plugin.create_network(context, net_data)
subnet_data = {'subnet': subnet_data = {'subnet':
{'cidr': cidr, {'cidr': cidr,
@ -140,18 +140,18 @@ class NsxVMetadataProxyHandler(object):
'tenant_id': tenant_id}} 'tenant_id': tenant_id}}
subnet = self.nsxv_plugin.create_subnet( subnet = self.nsxv_plugin.create_subnet(
self.context, context,
subnet_data) subnet_data)
return net['id'], subnet['id'] return net['id'], subnet['id']
def _get_internal_network_and_subnet(self): def _get_internal_network_and_subnet(self, context):
internal_net = None internal_net = None
internal_subnet = None internal_subnet = None
# Try to find internal net, internal subnet. If not found, create new # Try to find internal net, internal subnet. If not found, create new
net_list = nsxv_db.get_nsxv_internal_network( net_list = nsxv_db.get_nsxv_internal_network(
self.context.session, context.session,
vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE) vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE)
if net_list: if net_list:
@ -159,7 +159,7 @@ class NsxVMetadataProxyHandler(object):
if internal_net: if internal_net:
internal_subnet = self.nsxv_plugin.get_subnets( internal_subnet = self.nsxv_plugin.get_subnets(
self.context, context,
fields=['id'], fields=['id'],
filters={'network_id': [internal_net]})[0]['id'] filters={'network_id': [internal_net]})[0]['id']
@ -169,15 +169,15 @@ class NsxVMetadataProxyHandler(object):
try: try:
internal_net, internal_subnet = ( internal_net, internal_subnet = (
self._create_metadata_internal_network( self._create_metadata_internal_network(
INTERNAL_SUBNET)) context, INTERNAL_SUBNET))
except Exception as e: except Exception as e:
nsxv_db.delete_nsxv_internal_network( nsxv_db.delete_nsxv_internal_network(
self.context.session, context.session,
vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE) vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE)
# if network is created, clean up # if network is created, clean up
if internal_net: if internal_net:
self.nsxv_plugin.delete_network(self.context, self.nsxv_plugin.delete_network(context,
internal_net) internal_net)
LOG.exception(_LE("Exception %s while creating internal " LOG.exception(_LE("Exception %s while creating internal "
@ -186,7 +186,7 @@ class NsxVMetadataProxyHandler(object):
# Update the new network_id in DB # Update the new network_id in DB
nsxv_db.create_nsxv_internal_network( nsxv_db.create_nsxv_internal_network(
self.context.session, context.session,
nsxv_constants.INTER_EDGE_PURPOSE, nsxv_constants.INTER_EDGE_PURPOSE,
internal_net) internal_net)
else: else:
@ -196,25 +196,23 @@ class NsxVMetadataProxyHandler(object):
return internal_net, internal_subnet return internal_net, internal_subnet
def _get_edge_internal_ip(self, rtr_id): def _get_edge_internal_ip(self, context, rtr_id):
filters = { filters = {
'network_id': [self.internal_net], 'network_id': [self.internal_net],
'device_id': [rtr_id]} 'device_id': [rtr_id]}
ports = self.nsxv_plugin.get_ports(self.context, filters=filters) ports = self.nsxv_plugin.get_ports(context, filters=filters)
if ports: if ports:
return ports[0]['fixed_ips'][0]['ip_address'] return ports[0]['fixed_ips'][0]['ip_address']
else: else:
LOG.error(_LE("No port found for metadata for %s"), rtr_id) LOG.error(_LE("No port found for metadata for %s"), rtr_id)
def _get_edge_rtr_id_by_ext_ip(self, edge_ip): def _get_edge_rtr_id_by_ext_ip(self, context, edge_ip):
rtr_list = nsxv_db.get_nsxv_internal_edge( rtr_list = nsxv_db.get_nsxv_internal_edge(
self.context.session, edge_ip) context.session, edge_ip)
if rtr_list: if rtr_list:
return rtr_list[0]['router_id'] return rtr_list[0]['router_id']
def _get_edge_id_by_rtr_id(self, rtr_id, context=None): def _get_edge_id_by_rtr_id(self, context, rtr_id):
if not context:
context = self.context
binding = nsxv_db.get_nsxv_router_binding( binding = nsxv_db.get_nsxv_router_binding(
context.session, context.session,
rtr_id) rtr_id)
@ -222,10 +220,10 @@ class NsxVMetadataProxyHandler(object):
if binding: if binding:
return binding['edge_id'] return binding['edge_id']
def _get_proxy_edges(self): def _get_proxy_edges(self, context):
proxy_edge_ips = [] proxy_edge_ips = []
db_edge_ips = get_db_internal_edge_ips(self.context) db_edge_ips = get_db_internal_edge_ips(context)
if len(db_edge_ips) > len(cfg.CONF.nsxv.mgt_net_proxy_ips): if len(db_edge_ips) > len(cfg.CONF.nsxv.mgt_net_proxy_ips):
error = _('Number of configured metadata proxy IPs is smaller ' error = _('Number of configured metadata proxy IPs is smaller '
'than number of Edges which are already provisioned') 'than number of Edges which are already provisioned')
@ -279,10 +277,12 @@ class NsxVMetadataProxyHandler(object):
def _setup_proxy_edge_route_and_connectivity(self, rtr_ext_ip, def _setup_proxy_edge_route_and_connectivity(self, rtr_ext_ip,
rtr_id=None, edge_id=None): rtr_id=None, edge_id=None):
# Use separate context per each as we use this in tread context
context = neutron_context.get_admin_context()
if not rtr_id: if not rtr_id:
rtr_id = self._get_edge_rtr_id_by_ext_ip(rtr_ext_ip) rtr_id = self._get_edge_rtr_id_by_ext_ip(context, rtr_ext_ip)
if not edge_id: if not edge_id:
edge_id = self._get_edge_id_by_rtr_id(rtr_id) edge_id = self._get_edge_id_by_rtr_id(context, rtr_id)
# Read and validate DGW. If different, replace with new value # Read and validate DGW. If different, replace with new value
try: try:
@ -300,7 +300,7 @@ class NsxVMetadataProxyHandler(object):
if dgw != cfg.CONF.nsxv.mgt_net_default_gateway: if dgw != cfg.CONF.nsxv.mgt_net_default_gateway:
if cfg.CONF.nsxv.metadata_initializer: if cfg.CONF.nsxv.metadata_initializer:
self.nsxv_plugin._update_routes( self.nsxv_plugin._update_routes(
self.context, rtr_id, context, rtr_id,
cfg.CONF.nsxv.mgt_net_default_gateway) cfg.CONF.nsxv.mgt_net_default_gateway)
else: else:
error = _('Metadata initialization is incomplete on ' error = _('Metadata initialization is incomplete on '
@ -365,31 +365,37 @@ class NsxVMetadataProxyHandler(object):
"proxy edge %(edge)s: %(err)s"), "proxy edge %(edge)s: %(err)s"),
{'edge': edge_id, 'err': e}) {'edge': edge_id, 'err': e})
edge_ip = self._get_edge_internal_ip(rtr_id) edge_ip = self._get_edge_internal_ip(context, rtr_id)
if edge_ip: if edge_ip:
return edge_ip return edge_ip
def _setup_proxy_edge_external_interface_ip(self, rtr_ext_ips): def _setup_proxy_edge_external_interface_ip(self, rtr_ext_ips):
# Use separate context per each as we use this in tread context
context = neutron_context.get_admin_context()
rtr_old_ext_ip, rtr_new_ext_ip = rtr_ext_ips rtr_old_ext_ip, rtr_new_ext_ip = rtr_ext_ips
rtr_id = self._get_edge_rtr_id_by_ext_ip(rtr_old_ext_ip) rtr_id = self._get_edge_rtr_id_by_ext_ip(context, rtr_old_ext_ip)
edge_id = self._get_edge_id_by_rtr_id(rtr_id) edge_id = self._get_edge_id_by_rtr_id(context, rtr_id)
# Replace DB entry as we cannot update the table PK # Replace DB entry as we cannot update the table PK
nsxv_db.delete_nsxv_internal_edge(self.context.session, rtr_old_ext_ip) nsxv_db.delete_nsxv_internal_edge(context.session, rtr_old_ext_ip)
edge_ip = self._setup_proxy_edge_route_and_connectivity( edge_ip = self._setup_proxy_edge_route_and_connectivity(
rtr_new_ext_ip, rtr_id, edge_id) rtr_new_ext_ip, rtr_id, edge_id)
nsxv_db.create_nsxv_internal_edge( nsxv_db.create_nsxv_internal_edge(
self.context.session, rtr_new_ext_ip, context.session, rtr_new_ext_ip,
vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE, rtr_id) vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE, rtr_id)
if edge_ip: if edge_ip:
return edge_ip return edge_ip
def _setup_new_proxy_edge(self, rtr_ext_ip): def _setup_new_proxy_edge(self, rtr_ext_ip):
# Use separate context per each as we use this in tread context
context = neutron_context.get_admin_context()
rtr_id = None rtr_id = None
try: try:
router_data = { router_data = {
@ -400,12 +406,12 @@ class NsxVMetadataProxyHandler(object):
'tenant_id': None}} 'tenant_id': None}}
rtr = self.nsxv_plugin.create_router( rtr = self.nsxv_plugin.create_router(
self.context, context,
router_data, router_data,
allow_metadata=False) allow_metadata=False)
rtr_id = rtr['id'] rtr_id = rtr['id']
edge_id = self._get_edge_id_by_rtr_id(rtr_id) edge_id = self._get_edge_id_by_rtr_id(context, rtr_id)
self.nsxv_plugin.nsx_v.update_interface( self.nsxv_plugin.nsx_v.update_interface(
rtr['id'], rtr['id'],
@ -428,14 +434,14 @@ class NsxVMetadataProxyHandler(object):
'port_security_enabled': False, 'port_security_enabled': False,
'tenant_id': None}} 'tenant_id': None}}
port = self.nsxv_plugin.create_port(self.context, port_data) port = self.nsxv_plugin.create_port(context, port_data)
address_groups = self._get_address_groups( address_groups = self._get_address_groups(
self.context, self.internal_net, rtr_id, is_proxy=True) context, self.internal_net, rtr_id, is_proxy=True)
edge_ip = port['fixed_ips'][0]['ip_address'] edge_ip = port['fixed_ips'][0]['ip_address']
edge_utils.update_internal_interface( edge_utils.update_internal_interface(
self.nsxv_plugin.nsx_v, self.context, rtr_id, self.nsxv_plugin.nsx_v, context, rtr_id,
self.internal_net, address_groups) self.internal_net, address_groups)
self._setup_metadata_lb(rtr_id, self._setup_metadata_lb(rtr_id,
@ -454,18 +460,18 @@ class NsxVMetadataProxyHandler(object):
edge_utils.update_firewall( edge_utils.update_firewall(
self.nsxv_plugin.nsx_v, self.nsxv_plugin.nsx_v,
self.context, context,
rtr_id, rtr_id,
{'firewall_rule_list': firewall_rules}, {'firewall_rule_list': firewall_rules},
allow_external=False) allow_external=False)
if cfg.CONF.nsxv.mgt_net_default_gateway: if cfg.CONF.nsxv.mgt_net_default_gateway:
self.nsxv_plugin._update_routes( self.nsxv_plugin._update_routes(
self.context, rtr_id, context, rtr_id,
cfg.CONF.nsxv.mgt_net_default_gateway) cfg.CONF.nsxv.mgt_net_default_gateway)
nsxv_db.create_nsxv_internal_edge( nsxv_db.create_nsxv_internal_edge(
self.context.session, rtr_ext_ip, context.session, rtr_ext_ip,
vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE, rtr_id) vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE, rtr_id)
return edge_ip return edge_ip
@ -475,19 +481,19 @@ class NsxVMetadataProxyHandler(object):
"for metadata service"), e) "for metadata service"), e)
ports = self.nsxv_plugin.get_ports( ports = self.nsxv_plugin.get_ports(
self.context, filters={'device_id': [rtr_id]}) context, filters={'device_id': [rtr_id]})
for port in ports: for port in ports:
self.nsxv_plugin.delete_port(self.context, port['id'], self.nsxv_plugin.delete_port(context, port['id'],
l3_port_check=True, l3_port_check=True,
nw_gw_port_check=True) nw_gw_port_check=True)
nsxv_db.delete_nsxv_internal_edge( nsxv_db.delete_nsxv_internal_edge(
self.context.session, context.session,
rtr_ext_ip) rtr_ext_ip)
if rtr_id: if rtr_id:
self.nsxv_plugin.delete_router(self.context, rtr_id) self.nsxv_plugin.delete_router(context, rtr_id)
def _get_address_groups(self, context, network_id, device_id, is_proxy): def _get_address_groups(self, context, network_id, device_id, is_proxy):
@ -559,9 +565,9 @@ class NsxVMetadataProxyHandler(object):
proxy_lb=False, context=None): proxy_lb=False, context=None):
if context is None: if context is None:
context = self.context context = neutron_context.get_admin_context()
edge_id = self._get_edge_id_by_rtr_id(rtr_id, context=context) edge_id = self._get_edge_id_by_rtr_id(context, rtr_id)
LOG.debug('Setting up Edge device %s', edge_id) LOG.debug('Setting up Edge device %s', edge_id)
lb_obj = nsxv_lb.NsxvLoadbalancer() lb_obj = nsxv_lb.NsxvLoadbalancer()
@ -641,7 +647,8 @@ class NsxVMetadataProxyHandler(object):
lb_obj.submit_to_backend(self.nsxv_plugin.nsx_v.vcns, edge_id) lb_obj.submit_to_backend(self.nsxv_plugin.nsx_v.vcns, edge_id)
def configure_router_edge(self, rtr_id, context=None): def configure_router_edge(self, rtr_id, context):
ctx = context.elevated()
# Connect router interface to inter-edge network # Connect router interface to inter-edge network
port_data = { port_data = {
'port': { 'port': {
@ -655,10 +662,10 @@ class NsxVMetadataProxyHandler(object):
'port_security_enabled': False, 'port_security_enabled': False,
'tenant_id': None}} 'tenant_id': None}}
self.nsxv_plugin.create_port(self.context, port_data) self.nsxv_plugin.create_port(ctx, port_data)
address_groups = self._get_address_groups( address_groups = self._get_address_groups(
self.context, ctx,
self.internal_net, self.internal_net,
rtr_id, rtr_id,
is_proxy=False) is_proxy=False)
@ -681,11 +688,11 @@ class NsxVMetadataProxyHandler(object):
proxy_lb=False, proxy_lb=False,
context=context) context=context)
def cleanup_router_edge(self, rtr_id, warn=False): def cleanup_router_edge(self, context, rtr_id, warn=False):
filters = { filters = {
'network_id': [self.internal_net], 'network_id': [self.internal_net],
'device_id': [rtr_id]} 'device_id': [rtr_id]}
ports = self.nsxv_plugin.get_ports(self.context, filters=filters) ports = self.nsxv_plugin.get_ports(context, filters=filters)
if ports: if ports:
if warn: if warn:
@ -694,7 +701,7 @@ class NsxVMetadataProxyHandler(object):
{'port': ports[0]['id'], 'router': rtr_id}) {'port': ports[0]['id'], 'router': rtr_id})
try: try:
self.nsxv_plugin.delete_port( self.nsxv_plugin.delete_port(
self.context, ports[0]['id'], context, ports[0]['id'],
l3_port_check=False) l3_port_check=False)
except Exception as e: except Exception as e:
LOG.error(_LE("Failed to delete md_proxy port %(port)s: " LOG.error(_LE("Failed to delete md_proxy port %(port)s: "

View File

@ -1052,7 +1052,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
context.session, dhcp_edge['edge_id']) context.session, dhcp_edge['edge_id'])
if rtr_binding: if rtr_binding:
rtr_id = rtr_binding['router_id'] rtr_id = rtr_binding['router_id']
self.metadata_proxy_handler.cleanup_router_edge(rtr_id) self.metadata_proxy_handler.cleanup_router_edge(
context, rtr_id)
def _update_dhcp_edge_service(self, context, network_id, address_groups): def _update_dhcp_edge_service(self, context, network_id, address_groups):
self.edge_manager.update_dhcp_edge_service( self.edge_manager.update_dhcp_edge_service(

View File

@ -158,6 +158,9 @@ class EdgeManager(object):
edge_type=nsxv_constants.SERVICE_EDGE, edge_type=nsxv_constants.SERVICE_EDGE,
availability_zone=None): availability_zone=None):
"""Create an edge for logical router support.""" """Create an edge for logical router support."""
if context is None:
context = q_context.get_admin_context()
# deploy edge # deploy edge
self.nsxv_manager.deploy_edge(context, lrouter['id'], self.nsxv_manager.deploy_edge(context, lrouter['id'],
lrouter['name'], internal_network=None, lrouter['name'], internal_network=None,
@ -186,16 +189,16 @@ class EdgeManager(object):
appliance_size=nsxv_constants.COMPACT, appliance_size=nsxv_constants.COMPACT,
edge_type=nsxv_constants.SERVICE_EDGE, edge_type=nsxv_constants.SERVICE_EDGE,
availability_zone=None): availability_zone=None):
eventlet.spawn_n(self._pool_creator, context, router_ids, eventlet.spawn_n(self._pool_creator, router_ids, appliance_size,
appliance_size, edge_type, availability_zone) edge_type, availability_zone)
def _pool_creator(self, context, router_ids, appliance_size, def _pool_creator(self, router_ids, appliance_size, edge_type,
edge_type, availability_zone): availability_zone):
for router_id in router_ids: for router_id in router_ids:
fake_router = { fake_router = {
'id': router_id, 'id': router_id,
'name': router_id} 'name': router_id}
self.worker_pool.spawn_n(self._deploy_edge, context, fake_router, self.worker_pool.spawn_n(self._deploy_edge, None, fake_router,
appliance_size=appliance_size, appliance_size=appliance_size,
edge_type=edge_type, edge_type=edge_type,
availability_zone=availability_zone) availability_zone=availability_zone)

View File

@ -588,8 +588,7 @@ class EdgeManagerTestCase(EdgeUtilsTestCaseMixin):
binding_ids = [bind.router_id for bind in router_bindings] binding_ids = [bind.router_id for bind in router_bindings]
self.assertEqual(2, len(router_bindings)) self.assertEqual(2, len(router_bindings))
edge_utils.eventlet.spawn_n.assert_called_with( edge_utils.eventlet.spawn_n.assert_called_with(
mock.ANY, mock.ANY, binding_ids, appliance_size, mock.ANY, binding_ids, appliance_size, edge_type, self.az)
edge_type, self.az)
def test_check_backup_edge_pools_with_empty_conf(self): def test_check_backup_edge_pools_with_empty_conf(self):
pool_edges = (self._create_edge_pools(1, 2, 3, 4, 5) + pool_edges = (self._create_edge_pools(1, 2, 3, 4, 5) +