diff --git a/neutron/plugins/ml2/drivers/linuxbridge/agent/common/constants.py b/neutron/plugins/ml2/drivers/linuxbridge/agent/common/constants.py index aa970af2e47..45c791868a8 100644 --- a/neutron/plugins/ml2/drivers/linuxbridge/agent/common/constants.py +++ b/neutron/plugins/ml2/drivers/linuxbridge/agent/common/constants.py @@ -19,3 +19,5 @@ LOCAL_VLAN_ID = -2 VXLAN_NONE = 'not_supported' VXLAN_MCAST = 'multicast_flooding' VXLAN_UCAST = 'unicast_flooding' + +EXTENSION_DRIVER_TYPE = 'linuxbridge' diff --git a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py index 242f16c4ff7..7cfae35cf36 100644 --- a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -19,6 +19,7 @@ # Based on the structure of the OpenVSwitch agent in the # Neutron OpenVSwitch Plugin. +import collections import sys import time @@ -35,6 +36,7 @@ from oslo_utils import excutils from six import moves from neutron._i18n import _LE, _LI, _LW +from neutron.agent.l2.extensions import manager as ext_manager from neutron.agent.linux import bridge_lib from neutron.agent.linux import ip_lib from neutron.agent.linux import utils @@ -670,7 +672,8 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # history # 1.1 Support Security Group RPC # 1.3 Added param devices_to_update to security_groups_provider_updated - target = oslo_messaging.Target(version='1.3') + # 1.4 Added support for network_update + target = oslo_messaging.Target(version='1.4') def __init__(self, context, agent, sg_agent): super(LinuxBridgeRpcCallbacks, self).__init__() @@ -708,6 +711,15 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.agent.updated_devices.add(tap_name) LOG.debug("port_update RPC received for port: %s", port_id) + def network_update(self, context, **kwargs): + network_id = kwargs['network']['id'] + LOG.debug("network_update message processed for network " + "%(network_id)s, with ports: %(ports)s", + {'network_id': network_id, + 'ports': self.agent.network_ports[network_id]}) + for port_data in self.agent.network_ports[network_id]: + self.agent.updated_devices.add(port_data['device']) + def fdb_add(self, context, fdb_entries): LOG.debug("fdb_add received") for network_id, values in fdb_entries.items(): @@ -810,8 +822,24 @@ class LinuxBridgeNeutronAgentRPC(service.Service): def start(self): self.prevent_arp_spoofing = cfg.CONF.AGENT.prevent_arp_spoofing self.setup_linux_bridge(self.bridge_mappings, self.interface_mappings) - configurations = {'bridge_mappings': self.bridge_mappings, - 'interface_mappings': self.interface_mappings} + + # stores received port_updates and port_deletes for + # processing by the main loop + self.updated_devices = set() + + # stores all configured ports on agent + self.network_ports = collections.defaultdict(list) + # flag to do a sync after revival + self.fullsync = False + self.context = context.get_admin_context_without_session() + self.setup_rpc(self.interface_mappings.values()) + self.init_extension_manager(self.connection) + + configurations = { + 'bridge_mappings': self.bridge_mappings, + 'interface_mappings': self.interface_mappings, + 'extensions': self.ext_manager.names() + } if self.br_mgr.vxlan_mode != lconst.VXLAN_NONE: configurations['tunneling_ip'] = self.br_mgr.local_ip configurations['tunnel_types'] = [p_const.TYPE_VXLAN] @@ -824,16 +852,11 @@ class LinuxBridgeNeutronAgentRPC(service.Service): 'agent_type': constants.AGENT_TYPE_LINUXBRIDGE, 'start_flag': True} - # stores received port_updates for processing by the main loop - self.updated_devices = set() - # flag to do a sync after revival - self.fullsync = False - self.context = context.get_admin_context_without_session() - self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN) - self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN) - self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context, - self.sg_plugin_rpc, defer_refresh_firewall=True) - self.setup_rpc(self.interface_mappings.values()) + report_interval = cfg.CONF.AGENT.report_interval + if report_interval: + heartbeat = loopingcall.FixedIntervalLoopingCall( + self._report_state) + heartbeat.start(interval=report_interval) self.daemon_loop() def stop(self, graceful=True): @@ -871,6 +894,12 @@ class LinuxBridgeNeutronAgentRPC(service.Service): LOG.error(_LE("Unable to obtain MAC address for unique ID. " "Agent terminated!")) exit(1) + + self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN) + self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN) + self.sg_agent = sg_rpc.SecurityGroupAgentRpc( + self.context, self.sg_plugin_rpc, defer_refresh_firewall=True) + self.agent_id = '%s%s' % ('lb', (mac.replace(":", ""))) LOG.info(_LI("RPC agent_id: %s"), self.agent_id) @@ -883,17 +912,21 @@ class LinuxBridgeNeutronAgentRPC(service.Service): # Define the listening consumers for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE], + [topics.NETWORK, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] + if cfg.CONF.VXLAN.l2_population: consumers.append([topics.L2POPULATION, topics.UPDATE]) self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) - report_interval = cfg.CONF.AGENT.report_interval - if report_interval: - heartbeat = loopingcall.FixedIntervalLoopingCall( - self._report_state) - heartbeat.start(interval=report_interval) + + def init_extension_manager(self, connection): + ext_manager.register_opts(cfg.CONF) + self.ext_manager = ( + ext_manager.AgentExtensionsManager(cfg.CONF)) + self.ext_manager.initialize( + connection, lconst.EXTENSION_DRIVER_TYPE) def setup_linux_bridge(self, bridge_mappings, interface_mappings): self.br_mgr = LinuxBridgeManager(bridge_mappings, interface_mappings) @@ -907,6 +940,22 @@ class LinuxBridgeNeutronAgentRPC(service.Service): else: ip_lib.IPDevice(tap_name).link.set_down() + def _clean_network_ports(self, device): + for netid, ports_list in self.network_ports.items(): + for port_data in ports_list: + if device == port_data['device']: + ports_list.remove(port_data) + if ports_list == []: + self.network_ports.pop(netid) + return port_data['port_id'] + + def _update_network_ports(self, network_id, port_id, device): + self._clean_network_ports(device) + self.network_ports[network_id].append({ + "port_id": port_id, + "device": device + }) + def process_network_devices(self, device_info): resync_a = False resync_b = False @@ -1006,6 +1055,10 @@ class LinuxBridgeNeutronAgentRPC(service.Service): device, self.agent_id, cfg.CONF.host) + self._update_network_ports(device_details['network_id'], + device_details['port_id'], + device_details['device']) + self.ext_manager.handle_port(self.context, device_details) else: LOG.info(_LI("Device %s not defined on plugin"), device) return False @@ -1029,6 +1082,10 @@ class LinuxBridgeNeutronAgentRPC(service.Service): LOG.info(_LI("Port %s updated."), device) else: LOG.debug("Device %s not defined on plugin", device) + port_id = self._clean_network_ports(device) + self.ext_manager.delete_port(self.context, + {'device': device, + 'port_id': port_id}) if self.prevent_arp_spoofing: arp_protect.delete_arp_spoofing_protection(devices) return resync diff --git a/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py index db7e36267a7..84532f1a260 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import sys import mock @@ -30,12 +31,18 @@ from neutron.plugins.ml2.drivers.linuxbridge.agent \ from neutron.tests import base LOCAL_IP = '192.168.0.33' +PORT_1 = 'abcdef01-12ddssdfds-fdsfsd' DEVICE_1 = 'tapabcdef01-12' +NETWORK_ID = '57653b20-ed5b-4ed0-a31d-06f84e3fd909' BRIDGE_MAPPING_VALUE = 'br-eth2' BRIDGE_MAPPINGS = {'physnet0': BRIDGE_MAPPING_VALUE} INTERFACE_MAPPINGS = {'physnet1': 'eth1'} FAKE_DEFAULT_DEV = mock.Mock() FAKE_DEFAULT_DEV.name = 'eth1' +PORT_DATA = { + "port_id": PORT_1, + "device": DEVICE_1 +} class FakeIpLinkCommand(object): @@ -128,10 +135,13 @@ class TestLinuxBridgeAgent(base.BaseTestCase): agent = self.agent agent._ensure_port_admin_state = mock.Mock() devices = [DEVICE_1] + agent.network_ports[NETWORK_ID].append(PORT_DATA) with mock.patch.object(agent.plugin_rpc, "update_device_down") as fn_udd,\ mock.patch.object(agent.sg_agent, - "remove_devices_filter") as fn_rdf: + "remove_devices_filter") as fn_rdf,\ + mock.patch.object(agent.ext_manager, + "delete_port") as ext_mgr_delete_port: fn_udd.return_value = {'device': DEVICE_1, 'exists': True} with mock.patch.object(linuxbridge_neutron_agent.LOG, @@ -141,14 +151,21 @@ class TestLinuxBridgeAgent(base.BaseTestCase): self.assertFalse(resync) self.assertTrue(fn_udd.called) self.assertTrue(fn_rdf.called) + self.assertTrue(ext_mgr_delete_port.called) + self.assertTrue( + PORT_DATA not in agent.network_ports[NETWORK_ID] + ) def test_treat_devices_removed_with_not_existed_device(self): agent = self.agent devices = [DEVICE_1] + agent.network_ports[NETWORK_ID].append(PORT_DATA) with mock.patch.object(agent.plugin_rpc, "update_device_down") as fn_udd,\ mock.patch.object(agent.sg_agent, - "remove_devices_filter") as fn_rdf: + "remove_devices_filter") as fn_rdf,\ + mock.patch.object(agent.ext_manager, + "delete_port") as ext_mgr_delete_port: fn_udd.return_value = {'device': DEVICE_1, 'exists': False} with mock.patch.object(linuxbridge_neutron_agent.LOG, @@ -158,19 +175,30 @@ class TestLinuxBridgeAgent(base.BaseTestCase): self.assertFalse(resync) self.assertTrue(fn_udd.called) self.assertTrue(fn_rdf.called) + self.assertTrue(ext_mgr_delete_port.called) + self.assertTrue( + PORT_DATA not in agent.network_ports[NETWORK_ID] + ) def test_treat_devices_removed_failed(self): agent = self.agent devices = [DEVICE_1] + agent.network_ports[NETWORK_ID].append(PORT_DATA) with mock.patch.object(agent.plugin_rpc, "update_device_down") as fn_udd,\ mock.patch.object(agent.sg_agent, - "remove_devices_filter") as fn_rdf: + "remove_devices_filter") as fn_rdf,\ + mock.patch.object(agent.ext_manager, + "delete_port") as ext_mgr_delete_port: fn_udd.side_effect = Exception() resync = agent.treat_devices_removed(devices) self.assertTrue(resync) self.assertTrue(fn_udd.called) self.assertTrue(fn_rdf.called) + self.assertTrue(ext_mgr_delete_port.called) + self.assertTrue( + PORT_DATA not in agent.network_ports[NETWORK_ID] + ) def _test_scan_devices(self, previous, updated, fake_current, expected, sync): @@ -273,6 +301,27 @@ class TestLinuxBridgeAgent(base.BaseTestCase): self._test_scan_devices(previous, updated, fake_current, expected, sync=False) + def test_scan_devices_updated_deleted_concurrently(self): + previous = { + 'current': set([1, 2]), + 'updated': set(), + 'added': set(), + 'removed': set() + } + # Device 2 disappeared. + fake_current = set([1]) + # Device 2 got an concurrent update via network_update + updated = set([2]) + expected = { + 'current': set([1]), + 'updated': set(), + 'added': set(), + 'removed': set([2]) + } + self._test_scan_devices( + previous, updated, fake_current, expected, sync=False + ) + def test_scan_devices_updated_on_sync(self): previous = {'current': set([1, 2]), 'updated': set([1]), @@ -318,6 +367,11 @@ class TestLinuxBridgeAgent(base.BaseTestCase): 'segmentation_id': 100, 'physical_network': 'physnet1', 'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX} + mock_port_data = { + 'port_id': mock_details['port_id'], + 'device': mock_details['device'] + } + agent.ext_manager = mock.Mock() agent.plugin_rpc = mock.Mock() agent.plugin_rpc.get_devices_details_list.return_value = [mock_details] agent.br_mgr = mock.Mock() @@ -331,6 +385,10 @@ class TestLinuxBridgeAgent(base.BaseTestCase): 100, 'port123', constants.DEVICE_OWNER_NETWORK_PREFIX) self.assertTrue(agent.plugin_rpc.update_device_up.called) + self.assertTrue(agent.ext_manager.handle_port.called) + self.assertTrue( + mock_port_data in agent.network_ports[mock_details['network_id']] + ) def test_set_rpc_timeout(self): self.agent.stop() @@ -370,6 +428,63 @@ class TestLinuxBridgeAgent(base.BaseTestCase): def test_ensure_port_admin_state_down(self): self._test_ensure_port_admin_state(False) + def test_update_network_ports(self): + port_1_data = PORT_DATA + NETWORK_2_ID = 'fake_second_network' + port_2_data = { + 'port_id': 'fake_port_2', + 'device': 'fake_port_2_device_name' + } + self.agent.network_ports[NETWORK_ID].append( + port_1_data + ) + self.agent.network_ports[NETWORK_ID].append( + port_2_data + ) + #check update port: + self.agent._update_network_ports( + NETWORK_2_ID, port_2_data['port_id'], port_2_data['device'] + ) + self.assertTrue( + port_2_data not in self.agent.network_ports[NETWORK_ID] + ) + self.assertTrue( + port_2_data in self.agent.network_ports[NETWORK_2_ID] + ) + + def test_clean_network_ports(self): + port_1_data = PORT_DATA + port_2_data = { + 'port_id': 'fake_port_2', + 'device': 'fake_port_2_device_name' + } + self.agent.network_ports[NETWORK_ID].append( + port_1_data + ) + self.agent.network_ports[NETWORK_ID].append( + port_2_data + ) + #check removing port from network when other ports are still there: + cleaned_port_id = self.agent._clean_network_ports(DEVICE_1) + self.assertTrue( + NETWORK_ID in self.agent.network_ports.keys() + ) + self.assertTrue( + port_1_data not in self.agent.network_ports[NETWORK_ID] + ) + self.assertTrue( + port_2_data in self.agent.network_ports[NETWORK_ID] + ) + self.assertEqual(cleaned_port_id, PORT_1) + #and now remove last port from network: + cleaned_port_id = self.agent._clean_network_ports( + port_2_data['device'] + ) + self.assertTrue( + NETWORK_ID not in self.agent.network_ports.keys() + ) + self.assertEqual(cleaned_port_id, port_2_data['port_id']) + class TestLinuxBridgeManager(base.BaseTestCase): def setUp(self): @@ -1048,6 +1163,8 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase): segment.network_type = 'vxlan' segment.segmentation_id = 1 self.br_mgr.network_map['net_id'] = segment + self.updated_devices = set() + self.network_ports = collections.defaultdict(list) self.lb_rpc = linuxbridge_neutron_agent.LinuxBridgeRpcCallbacks( object(), @@ -1059,17 +1176,30 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase): mock_net = mock.Mock() mock_net.physical_network = None - self.lb_rpc.agent.br_mgr.network_map = {'123': mock_net} + self.lb_rpc.agent.br_mgr.network_map = {NETWORK_ID: mock_net} with mock.patch.object(self.lb_rpc.agent.br_mgr, "get_bridge_name") as get_br_fn,\ mock.patch.object(self.lb_rpc.agent.br_mgr, "delete_bridge") as del_fn: get_br_fn.return_value = "br0" - self.lb_rpc.network_delete("anycontext", network_id="123") - get_br_fn.assert_called_with("123") + self.lb_rpc.network_delete("anycontext", network_id=NETWORK_ID) + get_br_fn.assert_called_with(NETWORK_ID) del_fn.assert_called_with("br0") + def test_port_update(self): + port = {'id': PORT_1} + self.lb_rpc.port_update(context=None, port=port) + self.assertEqual(set([DEVICE_1]), self.lb_rpc.agent.updated_devices) + + def test_network_update(self): + updated_network = {'id': NETWORK_ID} + self.lb_rpc.agent.network_ports = { + NETWORK_ID: [PORT_DATA] + } + self.lb_rpc.network_update(context=None, network=updated_network) + self.assertEqual(set([DEVICE_1]), self.lb_rpc.agent.updated_devices) + def test_network_delete_with_existed_brq(self): mock_net = mock.Mock() mock_net.physical_network = 'physnet0' diff --git a/releasenotes/notes/linuxbridge-agent-extensions-66bdf9feee25ef99.yaml b/releasenotes/notes/linuxbridge-agent-extensions-66bdf9feee25ef99.yaml new file mode 100644 index 00000000000..a839c91fb5b --- /dev/null +++ b/releasenotes/notes/linuxbridge-agent-extensions-66bdf9feee25ef99.yaml @@ -0,0 +1,8 @@ +--- +prelude: > + The Linuxbridge agent now supports l2 agent extensions. +features: + - The Linuxbridge agent can now be extended by 3rd parties using a pluggable + mechanism. +fixes: + - partially closes bug 1468803