[eventlet-removal] Remove eventlet in the Metadata Agent
This patch removes the usage of eventlet in the Metadata Agent. The new metadata proxy is based on the implementation done in [1]. This new new implementation mimics the implementation of the ``UnixDomainWSGIServer`` class. The server listens to a file socket that is populated by the HAProxy instance running inside the network metadata namespace. The HAProxy writes the requests of the virtual machines on the Unix file socket and the metadata proxy sends the requests to the Nova metadata server. This implementation doesn't use ``oslo.services`` to spawn the processes or the local threads depending on the ``metadata_workers`` configuration variable. Right now only the embedded form (local thread) is implemented (metadata_workers=0, the default value). Future implementations will enable again this configuration variable. [1]https://review.opendev.org/c/openstack/neutron/+/937545 Closes-Bug: #2099896 Change-Id: Ifadef192ef766a4f291643868686a98b591ddb40
This commit is contained in:
parent
7804950a59
commit
bc0741e689
@ -74,6 +74,17 @@ The OVN metadata agent uses the same implementation as the OVN agent. The same
|
||||
limitations apply.
|
||||
|
||||
|
||||
Metadata agent
|
||||
--------------
|
||||
|
||||
The Metadata agent uses the same implementation as the OVN agent and the same
|
||||
limitations apply. The ``MetadataProxyHandler`` class is now instantiated every
|
||||
time a new request is done; after the call, the instance is destroyed. The
|
||||
cache used to store the previous RPC calls results is no longer relevant and
|
||||
has been removed. In order to implement an RPC cache, it should be implemented
|
||||
outside the mentioned class.
|
||||
|
||||
|
||||
Neutron API
|
||||
-----------
|
||||
|
||||
|
@ -12,23 +12,51 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import io
|
||||
import socketserver
|
||||
import urllib
|
||||
|
||||
import jinja2
|
||||
from neutron_lib.agent import topics
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import context
|
||||
from neutron_lib.utils import host
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import encodeutils
|
||||
import requests
|
||||
import webob
|
||||
from webob import exc as webob_exc
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.agent.common import base_agent_rpc
|
||||
from neutron.agent.linux import utils as agent_utils
|
||||
from neutron.agent.metadata import proxy_base
|
||||
from neutron.agent import rpc as agent_rpc
|
||||
from neutron.common import cache_utils as cache
|
||||
from neutron.common import ipv6_utils
|
||||
from neutron.common import utils as common_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
RESPONSE = jinja2.Template("""HTTP/1.1 {{ http_code }}
|
||||
Content-Type: text/plain; charset=UTF-8
|
||||
Connection: close
|
||||
Content-Length: {{ len }}
|
||||
|
||||
<html>
|
||||
<head>
|
||||
<title>{{ title }}</title>
|
||||
</head>
|
||||
<body>
|
||||
<h1>{{ body_title }}</h1>
|
||||
{{ body }}<br /><br />
|
||||
|
||||
|
||||
</body>
|
||||
</html>""")
|
||||
RESPONSE_LENGHT = 40
|
||||
|
||||
|
||||
class MetadataPluginAPI(base_agent_rpc.BasePluginApi):
|
||||
"""Agent-side RPC for metadata agent-to-plugin interaction.
|
||||
@ -43,7 +71,6 @@ class MetadataPluginAPI(base_agent_rpc.BasePluginApi):
|
||||
API version history:
|
||||
1.0 - Initial version.
|
||||
"""
|
||||
|
||||
def __init__(self, topic):
|
||||
super().__init__(
|
||||
topic=topic,
|
||||
@ -51,25 +78,128 @@ class MetadataPluginAPI(base_agent_rpc.BasePluginApi):
|
||||
version='1.0')
|
||||
|
||||
|
||||
class MetadataProxyHandler(proxy_base.MetadataProxyHandlerBase):
|
||||
class MetadataProxyHandlerBaseSocketServer(
|
||||
proxy_base.MetadataProxyHandlerBase):
|
||||
@staticmethod
|
||||
def _http_response(http_response, request):
|
||||
_res = webob.Response(
|
||||
body=http_response.content,
|
||||
status=http_response.status_code,
|
||||
content_type=http_response.headers['content-type'],
|
||||
charset=http_response.encoding)
|
||||
# NOTE(ralonsoh): there should be a better way to format the HTTP
|
||||
# response, adding the HTTP version to the ``webob.Response``
|
||||
# output string.
|
||||
out = request.http_version + ' ' + str(_res)
|
||||
if (int(_res.headers['content-length']) == 0 and
|
||||
_res.status_code == 200):
|
||||
# Add 2 extra \r\n to the result. HAProxy is also expecting
|
||||
# it even when the body is empty.
|
||||
out += '\r\n\r\n'
|
||||
return out.encode(http_response.encoding)
|
||||
|
||||
def _proxy_request(self, instance_id, project_id, req):
|
||||
headers = {
|
||||
'X-Forwarded-For': req.headers.get('X-Forwarded-For'),
|
||||
'X-Instance-ID': instance_id,
|
||||
'X-Tenant-ID': project_id,
|
||||
'X-Instance-ID-Signature': common_utils.sign_instance_id(
|
||||
self.conf, instance_id)
|
||||
}
|
||||
|
||||
nova_host_port = ipv6_utils.valid_ipv6_url(
|
||||
self.conf.nova_metadata_host,
|
||||
self.conf.nova_metadata_port)
|
||||
|
||||
url = urllib.parse.urlunsplit((
|
||||
self.conf.nova_metadata_protocol,
|
||||
nova_host_port,
|
||||
req.path_info,
|
||||
req.query_string,
|
||||
''))
|
||||
|
||||
disable_ssl_certificate_validation = self.conf.nova_metadata_insecure
|
||||
if self.conf.auth_ca_cert and not disable_ssl_certificate_validation:
|
||||
verify_cert = self.conf.auth_ca_cert
|
||||
else:
|
||||
verify_cert = not disable_ssl_certificate_validation
|
||||
|
||||
client_cert = None
|
||||
if self.conf.nova_client_cert and self.conf.nova_client_priv_key:
|
||||
client_cert = (self.conf.nova_client_cert,
|
||||
self.conf.nova_client_priv_key)
|
||||
|
||||
try:
|
||||
resp = requests.request(method=req.method, url=url,
|
||||
headers=headers,
|
||||
data=req.body,
|
||||
cert=client_cert,
|
||||
verify=verify_cert,
|
||||
timeout=60)
|
||||
except requests.ConnectionError:
|
||||
msg = _('The remote metadata server is temporarily unavailable. '
|
||||
'Please try again later.')
|
||||
LOG.warning(msg)
|
||||
title = '503 Service Unavailable'
|
||||
length = RESPONSE_LENGHT + len(title) * 2 + len(msg)
|
||||
reponse = RESPONSE.render(http_code=title, title=title,
|
||||
body_title=title, body=title, len=length)
|
||||
return encodeutils.to_utf8(reponse)
|
||||
|
||||
if resp.status_code == 200:
|
||||
return self._http_response(resp, req)
|
||||
if resp.status_code == 403:
|
||||
LOG.warning(
|
||||
'The remote metadata server responded with Forbidden. This '
|
||||
'response usually occurs when shared secrets do not match.'
|
||||
)
|
||||
# TODO(ralonsoh): add info in the returned HTTP message to the VM.
|
||||
return self._http_response(resp, req)
|
||||
if resp.status_code == 500:
|
||||
msg = _(
|
||||
'Remote metadata server experienced an internal server error.'
|
||||
)
|
||||
LOG.warning(msg)
|
||||
# TODO(ralonsoh): add info in the returned HTTP message to the VM.
|
||||
return self._http_response(resp, req)
|
||||
if resp.status_code in (400, 404, 409, 502, 503, 504):
|
||||
# TODO(ralonsoh): add info in the returned HTTP message to the VM.
|
||||
return self._http_response(resp, req)
|
||||
raise Exception(_('Unexpected response code: %s') % resp.status_code)
|
||||
|
||||
|
||||
class MetadataProxyHandler(MetadataProxyHandlerBaseSocketServer,
|
||||
socketserver.StreamRequestHandler):
|
||||
NETWORK_ID_HEADER = 'X-Neutron-Network-ID'
|
||||
ROUTER_ID_HEADER = 'X-Neutron-Router-ID'
|
||||
_conf = None
|
||||
|
||||
def __init__(self, conf):
|
||||
self._cache = cache.get_cache(conf)
|
||||
super().__init__(conf, has_cache=True)
|
||||
|
||||
def __init__(self, request, client_address, server):
|
||||
self.plugin_rpc = MetadataPluginAPI(topics.PLUGIN)
|
||||
self.context = context.get_admin_context_without_session()
|
||||
super().__init__(self._conf, has_cache=False, request=request,
|
||||
client_address=client_address, server=server)
|
||||
|
||||
def _get_ports_from_server(self, router_id=None, ip_address=None,
|
||||
networks=None, mac_address=None):
|
||||
"""Get ports from server."""
|
||||
filters = self._get_port_filters(
|
||||
router_id, ip_address, networks, mac_address)
|
||||
return self.plugin_rpc.get_ports(self.context, filters)
|
||||
def handle(self):
|
||||
try:
|
||||
request = self.request.recv(4096)
|
||||
LOG.debug('Request: %s', request.decode('utf-8'))
|
||||
f_request = io.BytesIO(request)
|
||||
req = webob.Request.from_file(f_request)
|
||||
instance_id, project_id = self._get_instance_and_project_id(req)
|
||||
if instance_id:
|
||||
res = self._proxy_request(instance_id, project_id, req)
|
||||
self.wfile.write(res)
|
||||
return
|
||||
# TODO(ralonsoh): change this return to be a formatted Request
|
||||
# and added to self.wfile
|
||||
return webob_exc.HTTPNotFound()
|
||||
except Exception as exc:
|
||||
LOG.exception('Error while receiving data.')
|
||||
raise exc
|
||||
|
||||
def _get_port_filters(self, router_id=None, ip_address=None,
|
||||
@staticmethod
|
||||
def _get_port_filters(router_id=None, ip_address=None,
|
||||
networks=None, mac_address=None):
|
||||
filters = {}
|
||||
if router_id:
|
||||
@ -89,13 +219,18 @@ class MetadataProxyHandler(proxy_base.MetadataProxyHandlerBase):
|
||||
|
||||
return filters
|
||||
|
||||
@cache.cache_method_results
|
||||
def _get_ports_from_server(self, router_id=None, ip_address=None,
|
||||
networks=None, mac_address=None):
|
||||
"""Get ports from server."""
|
||||
filters = self._get_port_filters(
|
||||
router_id, ip_address, networks, mac_address)
|
||||
return self.plugin_rpc.get_ports(self.context, filters)
|
||||
|
||||
def _get_router_networks(self, router_id, skip_cache=False):
|
||||
"""Find all networks connected to given router."""
|
||||
internal_ports = self._get_ports_from_server(router_id=router_id)
|
||||
return tuple(p['network_id'] for p in internal_ports)
|
||||
|
||||
@cache.cache_method_results
|
||||
def _get_ports_for_remote_address(self, remote_address, networks,
|
||||
remote_mac=None,
|
||||
skip_cache=False):
|
||||
@ -196,16 +331,8 @@ class UnixDomainMetadataProxy(proxy_base.UnixDomainMetadataProxyBase):
|
||||
self.agent_state.pop('start_flag', None)
|
||||
|
||||
def run(self):
|
||||
server = agent_utils.UnixDomainWSGIServer(
|
||||
constants.AGENT_PROCESS_METADATA)
|
||||
# Set the default metadata_workers if not yet set in the config file
|
||||
md_workers = self.conf.metadata_workers
|
||||
if md_workers is None:
|
||||
md_workers = host.cpu_count() // 2
|
||||
server.start(MetadataProxyHandler(self.conf),
|
||||
self.conf.metadata_proxy_socket,
|
||||
workers=md_workers,
|
||||
backlog=self.conf.metadata_backlog,
|
||||
mode=self._get_socket_mode())
|
||||
self._init_state_reporting()
|
||||
server.wait()
|
||||
file_socket = cfg.CONF.metadata_proxy_socket
|
||||
self._server = socketserver.ThreadingUnixStreamServer(
|
||||
file_socket, MetadataProxyHandler)
|
||||
MetadataProxyHandler._conf = self.conf
|
||||
self._server.serve_forever()
|
||||
|
24
neutron/cmd/agents/metadata.py
Normal file
24
neutron/cmd/agents/metadata.py
Normal file
@ -0,0 +1,24 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import setproctitle
|
||||
|
||||
from neutron.agent import metadata_agent
|
||||
from neutron_lib import constants
|
||||
|
||||
|
||||
def main():
|
||||
proctitle = "{} ({})".format(
|
||||
constants.AGENT_PROCESS_METADATA, setproctitle.getproctitle())
|
||||
setproctitle.setproctitle(proctitle)
|
||||
|
||||
metadata_agent.main()
|
@ -25,7 +25,6 @@ from oslo_config import fixture as config_fixture
|
||||
from oslo_utils import fileutils
|
||||
from oslo_utils import netutils
|
||||
|
||||
from neutron.agent.linux import utils as agent_utils
|
||||
from neutron.agent.metadata import agent
|
||||
from neutron.agent.metadata import proxy_base
|
||||
from neutron.agent import metadata_agent
|
||||
@ -41,16 +40,6 @@ class ConfFixture(config_fixture.Config):
|
||||
cache.register_oslo_configs(self.conf)
|
||||
|
||||
|
||||
class NewCacheConfFixture(ConfFixture):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.config(
|
||||
group='cache',
|
||||
enabled=True,
|
||||
backend='oslo_cache.dict',
|
||||
expiration_time=5)
|
||||
|
||||
|
||||
class TestMetadataProxyHandlerBase(base.BaseTestCase):
|
||||
fake_conf = cfg.CONF
|
||||
fake_conf_fixture = ConfFixture(fake_conf)
|
||||
@ -60,7 +49,10 @@ class TestMetadataProxyHandlerBase(base.BaseTestCase):
|
||||
self.useFixture(self.fake_conf_fixture)
|
||||
self.log_p = mock.patch.object(proxy_base, 'LOG')
|
||||
self.log = self.log_p.start()
|
||||
self.handler = agent.MetadataProxyHandler(self.fake_conf)
|
||||
agent.MetadataProxyHandler._conf = self.fake_conf
|
||||
with mock.patch.object(agent.MetadataProxyHandler, 'handle'):
|
||||
self.handler = agent.MetadataProxyHandler(
|
||||
mock.Mock(), mock.Mock(), mock.Mock())
|
||||
self.handler.plugin_rpc = mock.Mock()
|
||||
self.handler.context = mock.Mock()
|
||||
|
||||
@ -122,16 +114,6 @@ class _TestMetadataProxyHandlerCacheMixin:
|
||||
retval = self.handler(req)
|
||||
self.assertEqual('value', retval)
|
||||
|
||||
def test_call_skip_cache(self):
|
||||
req = mock.Mock()
|
||||
with mock.patch.object(self.handler,
|
||||
'_get_instance_and_project_id') as get_ids:
|
||||
get_ids.return_value = ('instance_id', 'tenant_id')
|
||||
with mock.patch.object(self.handler, '_proxy_request') as proxy:
|
||||
proxy.return_value = webob.exc.HTTPNotFound()
|
||||
self.handler(req)
|
||||
get_ids.assert_called_with(req, skip_cache=True)
|
||||
|
||||
def test_call_no_instance_match(self):
|
||||
req = mock.Mock()
|
||||
with mock.patch.object(self.handler,
|
||||
@ -405,12 +387,6 @@ class _TestMetadataProxyHandlerCacheMixin:
|
||||
)
|
||||
|
||||
|
||||
class TestMetadataProxyHandlerNewCache(TestMetadataProxyHandlerBase,
|
||||
_TestMetadataProxyHandlerCacheMixin):
|
||||
fake_conf = cfg.CONF
|
||||
fake_conf_fixture = NewCacheConfFixture(fake_conf)
|
||||
|
||||
|
||||
class TestUnixDomainMetadataProxy(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
@ -459,25 +435,6 @@ class TestUnixDomainMetadataProxy(base.BaseTestCase):
|
||||
agent.UnixDomainMetadataProxy(mock.Mock())
|
||||
unlink.assert_called_once_with('/the/path')
|
||||
|
||||
@mock.patch.object(agent, 'MetadataProxyHandler')
|
||||
@mock.patch.object(agent_utils, 'UnixDomainWSGIServer')
|
||||
@mock.patch.object(fileutils, 'ensure_tree')
|
||||
def test_run(self, ensure_dir, server, handler):
|
||||
p = agent.UnixDomainMetadataProxy(self.cfg.CONF)
|
||||
p.run()
|
||||
|
||||
ensure_dir.assert_called_once_with('/the', mode=0o755)
|
||||
server.assert_has_calls([
|
||||
mock.call(n_const.AGENT_PROCESS_METADATA),
|
||||
mock.call().start(handler.return_value,
|
||||
'/the/path', workers=0,
|
||||
backlog=128, mode=0o644),
|
||||
mock.call().wait()]
|
||||
)
|
||||
self.looping_mock.assert_called_once_with(p._report_state)
|
||||
self.looping_mock.return_value.start.assert_called_once_with(
|
||||
interval=mock.ANY)
|
||||
|
||||
def test_main(self):
|
||||
with mock.patch.object(agent, 'UnixDomainMetadataProxy') as proxy:
|
||||
with mock.patch.object(metadata_agent, 'config') as config:
|
||||
|
@ -39,7 +39,7 @@ console_scripts =
|
||||
neutron-ipset-cleanup = neutron.cmd.ipset_cleanup:main
|
||||
neutron-l3-agent = neutron.cmd.eventlet.agents.l3:main
|
||||
neutron-macvtap-agent = neutron.cmd.eventlet.plugins.macvtap_neutron_agent:main
|
||||
neutron-metadata-agent = neutron.cmd.eventlet.agents.metadata:main
|
||||
neutron-metadata-agent = neutron.cmd.agents.metadata:main
|
||||
neutron-netns-cleanup = neutron.cmd.netns_cleanup:main
|
||||
neutron-openvswitch-agent = neutron.cmd.eventlet.plugins.ovs_neutron_agent:main
|
||||
neutron-ovs-cleanup = neutron.cmd.ovs_cleanup:main
|
||||
|
Loading…
x
Reference in New Issue
Block a user