Merge "[eventlet-deprecation] Implement a metadata proxy without eventlet"

This commit is contained in:
Zuul 2025-01-20 13:29:16 +00:00 committed by Gerrit Code Review
commit fff7e0b2f7
11 changed files with 290 additions and 13 deletions

View File

@ -26,3 +26,39 @@ Eventlet Deprecation Reference
This document contains the information related to the ``eventlet`` library
deprecation. Each section describes how each module has been migrated, the
caveats, the pending technical debt and the missing parts.
OVN Agent
---------
Launch process
~~~~~~~~~~~~~~
The execution of the OVN agent has been replaced. Instead of using
``oslo_services.launch``, that is still using eventlet, the agent creates
a ``threading.Event`` instance and holds the main thread execution by waiting
for this event.
.. note::
Once the ``oslo_services`` library removes the usage of
eventlet, the previous implementation will be restored. The
``oslo_services.service.ProcessLauncher`` service launcher implements a
signal handler.
Metadata proxy
~~~~~~~~~~~~~~
The ``UnixDomainWSGIServer`` class has been replaced with a new implementation.
This implementation does not rely on ``neutron.api.wsgi.Server`` nor
``eventlet.wsgi.server``. It inherits from the built-in library class
``socketserver.StreamRequestHandler``.
.. note::
This implementation doesn't use ``oslo_services`` to spawn the
processes or the local threads depending on the ``metadata_workers``
configuration variable. Right now only the embedded form (local thread)
is implemented (``metadata_workers=0``, the default value). Future
implementations will enable again this configuration variable.

View File

@ -17,7 +17,6 @@ import abc
import urllib
import netaddr
from neutron_lib import constants
from oslo_log import log as logging
from oslo_utils import netutils
@ -43,9 +42,10 @@ class MetadataProxyHandlerBase(metaclass=abc.ABCMeta):
NETWORK_ID_HEADER: str
ROUTER_ID_HEADER: str
def __init__(self, conf, has_cache=False):
def __init__(self, conf, has_cache=False, **kwargs):
self.conf = conf
self._has_cache = has_cache
super().__init__(**kwargs)
@abc.abstractmethod
def get_port(self, remote_address, network_id=None, remote_mac=None,

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
import uuid
from oslo_log import log as logging
@ -159,6 +160,13 @@ class OVNNeutronAgent(service.Service):
self.ext_manager_api.nb_idl = self._load_nb_idl()
self.ext_manager.start()
LOG.info('OVN Neutron Agent started')
self.wait()
def wait(self):
# TODO(ralonsoh): remove this forced wait when the oslo_service.service
# implementation is restored in the OVN agent main method.
event = threading.Event()
event.wait()
def stop(self, graceful=True):
LOG.info('Stopping OVN Neutron Agent')

View File

@ -24,7 +24,7 @@ from ovsdbapp.backend.ovs_idl import vlog
from neutron.agent.linux import external_process
from neutron.agent.ovn.extensions import extension_manager
from neutron.agent.ovn.metadata import agent as metadata_agent
from neutron.agent.ovn.metadata import server as metadata_server
from neutron.agent.ovn.metadata import server_socket as metadata_server
from neutron.common.ovn import constants as ovn_const
from neutron.conf.agent.database import agents_db
from neutron.conf.agent.metadata import config as meta_conf
@ -178,5 +178,9 @@ class MetadataExtension(extension_manager.OVNAgentExtension,
# Register the agent with its corresponding Chassis
self.register_metadata_agent()
# Start the metadata server.
proxy_thread = threading.Thread(target=self._proxy.wait)
proxy_thread.start()
# Raise the "is_started" flag.
self._is_started = True

View File

@ -14,16 +14,17 @@
import threading
from neutron.agent.linux import utils as agent_utils
from neutron.agent.metadata import proxy_base
from neutron.agent.ovn.metadata import ovsdb
from neutron.common.ovn import constants as ovn_const
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from oslo_config import cfg
from oslo_log import log as logging
from neutron.agent.linux import utils as agent_utils
from neutron.agent.metadata import proxy_base
from neutron.agent.ovn.metadata import ovsdb
from neutron.common.ovn import constants as ovn_const
LOG = logging.getLogger(__name__)

View File

@ -0,0 +1,218 @@
# Copyright 2025 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import socketserver
import urllib
import jinja2
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import encodeutils
import requests
import webob
from webob import exc as webob_exc
from neutron._i18n import _
from neutron.agent.linux import utils as agent_utils
from neutron.agent.metadata import proxy_base
from neutron.common import ipv6_utils
from neutron.common.ovn import constants as ovn_const
from neutron.common import utils as common_utils
LOG = logging.getLogger(__name__)
RESPONSE = jinja2.Template("""HTTP/1.1 {{ http_code }}
Content-Type: text/plain; charset=UTF-8
Connection: close
Content-Length: {{ len }}
<html>
<head>
<title>{{ title }}</title>
</head>
<body>
<h1>{{ body_title }}</h1>
{{ body }}<br /><br />
</body>
</html>""")
RESPONSE_LENGHT = 40
class MetadataProxyHandlerBaseSocketServer(
proxy_base.MetadataProxyHandlerBase):
@staticmethod
def _http_response(http_response, request):
_res = webob.Response(
body=http_response.content,
headerlist=list(http_response.headers.items()),
status=http_response.status_code,
content_type=http_response.headers['content-type'],
charset=http_response.encoding)
# NOTE(ralonsoh): there should be a better way to format the HTTP
# response, adding the HTTP version to the ``webob.Response``
# output string.
out = request.http_version + ' ' + str(_res)
return out.encode(http_response.encoding)
def _proxy_request(self, instance_id, project_id, req):
headers = {
'X-Forwarded-For': req.headers.get('X-Forwarded-For'),
'X-Instance-ID': instance_id,
'X-Tenant-ID': project_id,
'X-Instance-ID-Signature': common_utils.sign_instance_id(
self.conf, instance_id)
}
nova_host_port = ipv6_utils.valid_ipv6_url(
self.conf.nova_metadata_host,
self.conf.nova_metadata_port)
url = urllib.parse.urlunsplit((
self.conf.nova_metadata_protocol,
nova_host_port,
req.path_info,
req.query_string,
''))
disable_ssl_certificate_validation = self.conf.nova_metadata_insecure
if self.conf.auth_ca_cert and not disable_ssl_certificate_validation:
verify_cert = self.conf.auth_ca_cert
else:
verify_cert = not disable_ssl_certificate_validation
client_cert = None
if self.conf.nova_client_cert and self.conf.nova_client_priv_key:
client_cert = (self.conf.nova_client_cert,
self.conf.nova_client_priv_key)
try:
resp = requests.request(method=req.method, url=url,
headers=headers,
data=req.body,
cert=client_cert,
verify=verify_cert,
timeout=60)
except requests.ConnectionError:
msg = _('The remote metadata server is temporarily unavailable. '
'Please try again later.')
LOG.warning(msg)
title = '503 Service Unavailable'
length = RESPONSE_LENGHT + len(title) * 2 + len(msg)
reponse = RESPONSE.render(http_code=title, title=title,
body_title=title, body=title, len=length)
return encodeutils.to_utf8(reponse)
if resp.status_code == 200:
return self._http_response(resp, req)
if resp.status_code == 403:
LOG.warning(
'The remote metadata server responded with Forbidden. This '
'response usually occurs when shared secrets do not match.'
)
# TODO(ralonsoh): add info in the returned HTTP message to the VM.
return self._http_response(resp, req)
if resp.status_code == 500:
msg = _(
'Remote metadata server experienced an internal server error.'
)
LOG.warning(msg)
# TODO(ralonsoh): add info in the returned HTTP message to the VM.
return self._http_response(resp, req)
if resp.status_code in (400, 404, 409, 502, 503, 504):
# TODO(ralonsoh): add info in the returned HTTP message to the VM.
return self._http_response(resp, req)
raise Exception(_('Unexpected response code: %s') % resp.status_code)
class MetadataProxyHandler(MetadataProxyHandlerBaseSocketServer,
socketserver.StreamRequestHandler):
NETWORK_ID_HEADER = 'X-OVN-Network-ID'
ROUTER_ID_HEADER = ''
_conf = None
_chassis = None
_sb_idl = None
def __init__(self, request, client_address, server):
super().__init__(self._conf, has_cache=False, request=request,
client_address=client_address, server=server)
def handle(self):
try:
request = self.request.recv(4096)
LOG.debug('Request: %s', request.decode('utf-8'))
f_request = io.BytesIO(request)
req = webob.Request.from_file(f_request)
instance_id, project_id = self._get_instance_and_project_id(req)
if instance_id:
res = self._proxy_request(instance_id, project_id, req)
self.wfile.write(res)
return
# TODO(ralonsoh): change this return to be a formatted Request
# and added to self.wfile
return webob_exc.HTTPNotFound()
except Exception as exc:
LOG.exception('Error while receiving data.')
raise exc
@property
def sb_idl(self):
return self._sb_idl
def get_port(self, remote_address, network_id=None, remote_mac=None,
router_id=None, skip_cache=False):
ports = self.sb_idl.get_network_port_bindings_by_ip(network_id,
remote_address,
mac=remote_mac)
num_ports = len(ports)
if num_ports == 1:
external_ids = ports[0].external_ids
return (external_ids[ovn_const.OVN_DEVID_EXT_ID_KEY],
external_ids[ovn_const.OVN_PROJID_EXT_ID_KEY])
if num_ports == 0:
LOG.error("No port found in network %s with IP address %s",
network_id, remote_address)
elif num_ports > 1:
port_uuids = ', '.join([str(port.uuid) for port in ports])
LOG.error("More than one port found in network %s with IP address "
"%s. Please run the neutron-ovn-db-sync-util script as "
"there seems to be inconsistent data between Neutron "
"and OVN databases. OVN Port uuids: %s", network_id,
remote_address, port_uuids)
return None, None
class UnixDomainMetadataProxy(proxy_base.UnixDomainMetadataProxyBase):
def __init__(self, conf, chassis, sb_idl=None):
super().__init__(conf)
self.chassis = chassis
self.sb_idl = sb_idl
agent_utils.ensure_directory_exists_without_file(
cfg.CONF.metadata_proxy_socket)
self._server = None
def run(self):
file_socket = cfg.CONF.metadata_proxy_socket
self._server = socketserver.ThreadingUnixStreamServer(
file_socket, MetadataProxyHandler)
MetadataProxyHandler._conf = self.conf
MetadataProxyHandler._chassis = self.chassis
MetadataProxyHandler._sb_idl = self.sb_idl
def wait(self):
self._server.serve_forever()

View File

@ -20,7 +20,6 @@ from neutron.common import utils
from neutron.conf import common as common_config
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import service
from neutron.agent.ovn.agent import ovn_neutron_agent
from neutron.conf.agent.ovn.ovn_neutron_agent import config as config_ovn_agent
@ -44,5 +43,9 @@ def main():
ovn_agent = ovn_neutron_agent.OVNNeutronAgent(cfg.CONF)
LOG.info('OVN Neutron Agent initialized successfully, now running... ')
launcher = service.launch(cfg.CONF, ovn_agent, restart_method='mutate')
launcher.wait()
# NOTE(ralonsoh): restore the oslo_service.service implementation once
# the eventlet removal is done; it handles the system signals properly
# (TERM, HUP, INT).
# launcher = service.launch(cfg.CONF, ovn_agent, restart_method='mutate')
# launcher.wait()
ovn_agent.start()

View File

View File

@ -22,6 +22,7 @@ from oslo_utils import uuidutils
from neutron.agent.ovn.agent import ovn_neutron_agent
from neutron.agent.ovn.agent import ovsdb as agent_ovsdb
from neutron.agent.ovn.metadata import agent as metadata_agent
from neutron.agent.ovn.metadata import server_socket
from neutron.common.ovn import constants as ovn_const
from neutron.common import utils as n_utils
from neutron.tests.common import net_helpers
@ -69,7 +70,13 @@ class TestOVNNeutronAgentBase(base.TestOVNFunctionalBase):
agt.test_ovs_idl = []
agt.test_ovn_sb_idl = []
agt.test_ovn_nb_idl = []
agt.start()
# NOTE(ralonsoh): it is needed to ``UnixDomainMetadataProxy.wait``
# method in eventlet environments in order not to block the execution.
# Once eventlet is completely removed, this mock can be deleted.
with mock.patch.object(ovn_neutron_agent.OVNNeutronAgent, 'wait'), \
mock.patch.object(server_socket.UnixDomainMetadataProxy,
'wait'):
agt.start()
self._check_loaded_and_started_extensions(agt)
self.addCleanup(agt.ext_manager_api.ovs_idl.ovsdb_connection.stop)
@ -127,4 +134,4 @@ class TestOVNNeutronAgentMetadataExtension(TestOVNNeutronAgentBase):
# Check Unix proxy is running.
metadata_extension = self.ovn_agent[METADATA_EXTENSION]
self.assertIsNotNone(metadata_extension._proxy.server)
self.assertIsNotNone(metadata_extension._proxy._server)

View File

@ -56,7 +56,7 @@ console_scripts =
neutron-sanity-check = neutron.cmd.sanity_check:main
neutron-periodic-workers = neutron.cmd.eventlet.server:main_periodic_eventlet
neutron-status = neutron.cmd.status:main
neutron-ovn-agent = neutron.cmd.eventlet.agents.ovn_neutron_agent:main
neutron-ovn-agent = neutron.cmd.agents.ovn_neutron_agent:main
neutron-ovn-maintenance-worker = neutron.cmd.eventlet.server:main_ovn_maintenance_eventlet
neutron-ovn-metadata-agent = neutron.cmd.eventlet.agents.ovn_metadata:main
neutron-ovn-migration-mtu = neutron.cmd.ovn.migration_mtu:main