Add multi-process support for API services
Implements blueprint multi-process-api-service This is based on Huang Zhiteng's patch. This patch adds support for running services as multiple processes. This is primarily intended to be used with the API service as a way to provide more concurrency than eventlet can sometimes provide. A SIGTERM or SIGINT signal will cause the parent process to gracefully terminate the child processes, allowing them to finish processing the requests currently being processed. The parent will wait for the children to finish before exiting. Change-Id: Ie6d6802626eb42d5e64c4167be363fbf6cea2a1b
This commit is contained in:
parent
2264c1c0b6
commit
46c1b6eaee
15
bin/nova-all
15
bin/nova-all
@ -28,7 +28,7 @@ continue attempting to launch the rest of the services.
|
||||
"""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
@ -54,25 +54,26 @@ if __name__ == '__main__':
|
||||
flags.parse_args(sys.argv)
|
||||
logging.setup()
|
||||
utils.monkey_patch()
|
||||
servers = []
|
||||
launcher = service.ProcessLauncher()
|
||||
|
||||
# nova-api
|
||||
for api in flags.FLAGS.enabled_apis:
|
||||
try:
|
||||
servers.append(service.WSGIService(api))
|
||||
server = service.WSGIService(api)
|
||||
launcher.launch_server(server, workers=server.workers or 1)
|
||||
except (Exception, SystemExit):
|
||||
LOG.exception(_('Failed to load %s') % '%s-api' % api)
|
||||
|
||||
for mod in [s3server, xvp_proxy]:
|
||||
try:
|
||||
servers.append(mod.get_wsgi_server())
|
||||
launcher.launch_server(mod.get_wsgi_server())
|
||||
except (Exception, SystemExit):
|
||||
LOG.exception(_('Failed to load %s') % mod.__name__)
|
||||
|
||||
for binary in ['nova-compute', 'nova-volume',
|
||||
'nova-network', 'nova-scheduler', 'nova-cert']:
|
||||
try:
|
||||
servers.append(service.Service.create(binary=binary))
|
||||
launcher.launch_server(service.Service.create(binary=binary))
|
||||
except (Exception, SystemExit):
|
||||
LOG.exception(_('Failed to load %s'), binary)
|
||||
service.serve(*servers)
|
||||
service.wait()
|
||||
launcher.wait()
|
||||
|
10
bin/nova-api
10
bin/nova-api
@ -24,7 +24,7 @@ Starts both the EC2 and OpenStack APIs in separate greenthreads.
|
||||
"""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
@ -45,8 +45,8 @@ if __name__ == '__main__':
|
||||
flags.parse_args(sys.argv)
|
||||
logging.setup()
|
||||
utils.monkey_patch()
|
||||
servers = []
|
||||
launcher = service.ProcessLauncher()
|
||||
for api in flags.FLAGS.enabled_apis:
|
||||
servers.append(service.WSGIService(api))
|
||||
service.serve(*servers)
|
||||
service.wait()
|
||||
server = service.WSGIService(api)
|
||||
launcher.launch_server(server, workers=server.workers or 1)
|
||||
launcher.wait()
|
||||
|
@ -20,7 +20,7 @@
|
||||
"""Starter script for Nova EC2 API."""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
@ -42,5 +42,5 @@ if __name__ == '__main__':
|
||||
logging.setup()
|
||||
utils.monkey_patch()
|
||||
server = service.WSGIService('ec2')
|
||||
service.serve(server)
|
||||
service.serve(server, workers=server.workers)
|
||||
service.wait()
|
||||
|
@ -20,7 +20,7 @@
|
||||
"""Starter script for Nova Metadata API."""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
@ -42,5 +42,5 @@ if __name__ == '__main__':
|
||||
logging.setup()
|
||||
utils.monkey_patch()
|
||||
server = service.WSGIService('metadata')
|
||||
service.serve(server)
|
||||
service.serve(server, workers=server.workers)
|
||||
service.wait()
|
||||
|
@ -20,7 +20,7 @@
|
||||
"""Starter script for Nova OS API."""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
@ -42,5 +42,5 @@ if __name__ == '__main__':
|
||||
logging.setup()
|
||||
utils.monkey_patch()
|
||||
server = service.WSGIService('osapi_compute')
|
||||
service.serve(server)
|
||||
service.serve(server, workers=server.workers)
|
||||
service.wait()
|
||||
|
@ -20,7 +20,7 @@
|
||||
"""Starter script for Nova OS API."""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
@ -42,5 +42,5 @@ if __name__ == '__main__':
|
||||
logging.setup()
|
||||
utils.monkey_patch()
|
||||
server = service.WSGIService('osapi_volume')
|
||||
service.serve(server)
|
||||
service.serve(server, workers=server.workers)
|
||||
service.wait()
|
||||
|
@ -489,12 +489,18 @@
|
||||
# ec2_listen_port=8773
|
||||
#### (IntOpt) port for ec2 api to listen
|
||||
|
||||
# ec2_workers=0
|
||||
#### (IntOpt) Number of EC2 API workers
|
||||
|
||||
# osapi_compute_listen=0.0.0.0
|
||||
#### (StrOpt) IP address for OpenStack API to listen
|
||||
|
||||
# osapi_compute_listen_port=8774
|
||||
#### (IntOpt) list port for osapi compute
|
||||
|
||||
# osapi_compute_workers=0
|
||||
#### (IntOpt) Number of workers for OpenStack API
|
||||
|
||||
# metadata_manager=nova.api.manager.MetadataManager
|
||||
#### (StrOpt) OpenStack metadata service manager
|
||||
|
||||
@ -504,12 +510,18 @@
|
||||
# metadata_listen_port=8775
|
||||
#### (IntOpt) port for metadata api to listen
|
||||
|
||||
# metadata_workers=0
|
||||
#### (IntOpt) Number of workers for metadata API
|
||||
|
||||
# osapi_volume_listen=0.0.0.0
|
||||
#### (StrOpt) IP address for OpenStack Volume API to listen
|
||||
|
||||
# osapi_volume_listen_port=8776
|
||||
#### (IntOpt) port for os volume api to listen
|
||||
|
||||
# osapi_volume_workers=0
|
||||
#### (IntOpt) Number of workers for OpenStack Volume API
|
||||
|
||||
|
||||
######## defined in nova.test ########
|
||||
|
||||
|
253
nova/service.py
253
nova/service.py
@ -19,10 +19,13 @@
|
||||
|
||||
"""Generic Node base class for all workers that run on hosts."""
|
||||
|
||||
import errno
|
||||
import inspect
|
||||
import os
|
||||
import random
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
|
||||
import eventlet
|
||||
import greenlet
|
||||
@ -61,12 +64,18 @@ service_opts = [
|
||||
cfg.IntOpt('ec2_listen_port',
|
||||
default=8773,
|
||||
help='port for ec2 api to listen'),
|
||||
cfg.IntOpt('ec2_workers',
|
||||
default=None,
|
||||
help='Number of workers for EC2 API service'),
|
||||
cfg.StrOpt('osapi_compute_listen',
|
||||
default="0.0.0.0",
|
||||
help='IP address for OpenStack API to listen'),
|
||||
cfg.IntOpt('osapi_compute_listen_port',
|
||||
default=8774,
|
||||
help='list port for osapi compute'),
|
||||
cfg.IntOpt('osapi_compute_workers',
|
||||
default=None,
|
||||
help='Number of workers for OpenStack API service'),
|
||||
cfg.StrOpt('metadata_manager',
|
||||
default='nova.api.manager.MetadataManager',
|
||||
help='OpenStack metadata service manager'),
|
||||
@ -76,12 +85,18 @@ service_opts = [
|
||||
cfg.IntOpt('metadata_listen_port',
|
||||
default=8775,
|
||||
help='port for metadata api to listen'),
|
||||
cfg.IntOpt('metadata_workers',
|
||||
default=None,
|
||||
help='Number of workers for metadata service'),
|
||||
cfg.StrOpt('osapi_volume_listen',
|
||||
default="0.0.0.0",
|
||||
help='IP address for OpenStack Volume API to listen'),
|
||||
cfg.IntOpt('osapi_volume_listen_port',
|
||||
default=8776,
|
||||
help='port for os volume api to listen')
|
||||
help='port for os volume api to listen'),
|
||||
cfg.IntOpt('osapi_volume_workers',
|
||||
default=None,
|
||||
help='Number of workers for OpenStack Volume API service'),
|
||||
]
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
@ -98,6 +113,7 @@ class Launcher(object):
|
||||
|
||||
"""
|
||||
self._services = []
|
||||
eventlet_backdoor.initialize_if_enabled()
|
||||
|
||||
@staticmethod
|
||||
def run_server(server):
|
||||
@ -135,15 +151,6 @@ class Launcher(object):
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
def sigterm(sig, frame):
|
||||
LOG.audit(_("SIGTERM received"))
|
||||
# NOTE(jk0): Raise a ^C which is caught by the caller and cleanly
|
||||
# shuts down the service. This does not yet handle eventlet
|
||||
# threads.
|
||||
raise KeyboardInterrupt
|
||||
|
||||
signal.signal(signal.SIGTERM, sigterm)
|
||||
|
||||
for service in self._services:
|
||||
try:
|
||||
service.wait()
|
||||
@ -151,6 +158,198 @@ class Launcher(object):
|
||||
pass
|
||||
|
||||
|
||||
class ServiceLauncher(Launcher):
|
||||
def _handle_signal(self, signo, frame):
|
||||
signame = {signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT'}[signo]
|
||||
LOG.info(_('Caught %s, exiting'), signame)
|
||||
|
||||
# Allow the process to be killed again and die from natural causes
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
def wait(self):
|
||||
signal.signal(signal.SIGTERM, self._handle_signal)
|
||||
signal.signal(signal.SIGINT, self._handle_signal)
|
||||
|
||||
LOG.debug(_('Full set of FLAGS:'))
|
||||
for flag in FLAGS:
|
||||
flag_get = FLAGS.get(flag, None)
|
||||
# hide flag contents from log if contains a password
|
||||
# should use secret flag when switch over to openstack-common
|
||||
if ("_password" in flag or "_key" in flag or
|
||||
(flag == "sql_connection" and "mysql:" in flag_get)):
|
||||
LOG.debug(_('%(flag)s : FLAG SET ') % locals())
|
||||
else:
|
||||
LOG.debug('%(flag)s : %(flag_get)s' % locals())
|
||||
|
||||
status = None
|
||||
try:
|
||||
super(ServiceLauncher, self).wait()
|
||||
except SystemExit as exc:
|
||||
status = exc.code
|
||||
self.stop()
|
||||
rpc.cleanup()
|
||||
|
||||
if status is not None:
|
||||
sys.exit(status)
|
||||
|
||||
|
||||
class ServerWrapper(object):
|
||||
def __init__(self, server, workers):
|
||||
self.server = server
|
||||
self.workers = workers
|
||||
self.children = set()
|
||||
self.forktimes = []
|
||||
|
||||
|
||||
class ProcessLauncher(object):
|
||||
def __init__(self):
|
||||
self.children = {}
|
||||
self.running = True
|
||||
rfd, self.writepipe = os.pipe()
|
||||
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
|
||||
|
||||
signal.signal(signal.SIGTERM, self._handle_signal)
|
||||
signal.signal(signal.SIGINT, self._handle_signal)
|
||||
|
||||
def _handle_signal(self, signo, frame):
|
||||
signame = {signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT'}[signo]
|
||||
LOG.info(_('Caught %s, stopping children'), signame)
|
||||
|
||||
self.running = False
|
||||
for pid in self.children:
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
except OSError as exc:
|
||||
if exc.errno != errno.ESRCH:
|
||||
raise
|
||||
|
||||
# Allow the process to be killed again and die from natural causes
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
||||
|
||||
def _pipe_watcher(self):
|
||||
# This will block until the write end is closed when the parent
|
||||
# dies unexpectedly
|
||||
self.readpipe.read()
|
||||
|
||||
LOG.info(_('Parent process has died unexpectedly, exiting'))
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
def _child_process(self, server):
|
||||
# Setup child signal handlers differently
|
||||
def _sigterm(*args):
|
||||
LOG.info(_('Received SIGTERM, stopping'))
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
server.stop()
|
||||
|
||||
signal.signal(signal.SIGTERM, _sigterm)
|
||||
# Block SIGINT and let the parent send us a SIGTERM
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
|
||||
# Reopen the eventlet hub to make sure we don't share an epoll
|
||||
# fd with parent and/or siblings, which would be bad
|
||||
eventlet.hubs.use_hub()
|
||||
|
||||
# Close write to ensure only parent has it open
|
||||
os.close(self.writepipe)
|
||||
# Create greenthread to watch for parent to close pipe
|
||||
eventlet.spawn(self._pipe_watcher)
|
||||
|
||||
# Reseed random number generator
|
||||
random.seed()
|
||||
|
||||
launcher = Launcher()
|
||||
launcher.run_server(server)
|
||||
|
||||
def _start_child(self, wrap):
|
||||
if len(wrap.forktimes) > wrap.workers:
|
||||
# Limit ourselves to one process a second (over the period of
|
||||
# number of workers * 1 second). This will allow workers to
|
||||
# start up quickly but ensure we don't fork off children that
|
||||
# die instantly too quickly.
|
||||
if time.time() - wrap.forktimes[0] < wrap.workers:
|
||||
LOG.info(_('Forking too fast, sleeping'))
|
||||
time.sleep(1)
|
||||
|
||||
wrap.forktimes.pop(0)
|
||||
|
||||
wrap.forktimes.append(time.time())
|
||||
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
# NOTE(johannes): All exceptions are caught to ensure this
|
||||
# doesn't fallback into the loop spawning children. It would
|
||||
# be bad for a child to spawn more children.
|
||||
status = 0
|
||||
try:
|
||||
self._child_process(wrap.server)
|
||||
except SystemExit as exc:
|
||||
status = exc.code
|
||||
except BaseException:
|
||||
LOG.exception(_('Unhandled exception'))
|
||||
status = 2
|
||||
|
||||
os._exit(status)
|
||||
|
||||
LOG.info(_('Started child %d'), pid)
|
||||
|
||||
wrap.children.add(pid)
|
||||
self.children[pid] = wrap
|
||||
|
||||
return pid
|
||||
|
||||
def launch_server(self, server, workers=1):
|
||||
wrap = ServerWrapper(server, workers)
|
||||
|
||||
LOG.info(_('Starting %d workers'), wrap.workers)
|
||||
while self.running and len(wrap.children) < wrap.workers:
|
||||
self._start_child(wrap)
|
||||
|
||||
def _wait_child(self):
|
||||
try:
|
||||
pid, status = os.wait()
|
||||
except OSError as exc:
|
||||
if exc.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
return None
|
||||
|
||||
if os.WIFSIGNALED(status):
|
||||
sig = os.WTERMSIG(status)
|
||||
LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
|
||||
else:
|
||||
code = os.WEXITSTATUS(status)
|
||||
LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
|
||||
|
||||
if pid not in self.children:
|
||||
LOG.warning(_('pid %d not in child list'), pid)
|
||||
return None
|
||||
|
||||
wrap = self.children.pop(pid)
|
||||
wrap.children.remove(pid)
|
||||
return wrap
|
||||
|
||||
def wait(self):
|
||||
"""Loop waiting on children to die and respawning as necessary"""
|
||||
# Loop calling wait and respawning as necessary
|
||||
while self.running:
|
||||
wrap = self._wait_child()
|
||||
if not wrap:
|
||||
continue
|
||||
|
||||
while self.running and len(wrap.children) < wrap.workers:
|
||||
self._start_child(wrap)
|
||||
|
||||
# Wait for children to die
|
||||
if self.children:
|
||||
LOG.info(_('Waiting on %d children to exit'), len(self.children))
|
||||
while self.children:
|
||||
self._wait_child()
|
||||
|
||||
|
||||
class Service(object):
|
||||
"""Service object for binaries running on hosts.
|
||||
|
||||
@ -170,7 +369,6 @@ class Service(object):
|
||||
self.report_interval = report_interval
|
||||
self.periodic_interval = periodic_interval
|
||||
self.periodic_fuzzy_delay = periodic_fuzzy_delay
|
||||
super(Service, self).__init__(*args, **kwargs)
|
||||
self.saved_args, self.saved_kwargs = args, kwargs
|
||||
self.timers = []
|
||||
|
||||
@ -361,10 +559,13 @@ class WSGIService(object):
|
||||
self.app = self.loader.load_app(name)
|
||||
self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0")
|
||||
self.port = getattr(FLAGS, '%s_listen_port' % name, 0)
|
||||
self.workers = getattr(FLAGS, '%s_workers' % name, None)
|
||||
self.server = wsgi.Server(name,
|
||||
self.app,
|
||||
host=self.host,
|
||||
port=self.port)
|
||||
# Pull back actual port used
|
||||
self.port = self.server.port
|
||||
|
||||
def _get_manager(self):
|
||||
"""Initialize a Manager object appropriate for this service.
|
||||
@ -400,7 +601,6 @@ class WSGIService(object):
|
||||
if self.manager:
|
||||
self.manager.init_host()
|
||||
self.server.start()
|
||||
self.port = self.server.port
|
||||
|
||||
def stop(self):
|
||||
"""Stop serving this API.
|
||||
@ -425,29 +625,18 @@ class WSGIService(object):
|
||||
_launcher = None
|
||||
|
||||
|
||||
def serve(*servers):
|
||||
def serve(server, workers=None):
|
||||
global _launcher
|
||||
if not _launcher:
|
||||
_launcher = Launcher()
|
||||
for server in servers:
|
||||
_launcher.launch_server(server)
|
||||
if _launcher:
|
||||
raise RuntimeError(_('serve() can only be called once'))
|
||||
|
||||
eventlet_backdoor.initialize_if_enabled()
|
||||
if workers:
|
||||
_launcher = ProcessLauncher()
|
||||
_launcher.launch_server(server, workers=workers)
|
||||
else:
|
||||
_launcher = ServiceLauncher()
|
||||
_launcher.launch_server(server)
|
||||
|
||||
|
||||
def wait():
|
||||
LOG.debug(_('Full set of FLAGS:'))
|
||||
for flag in FLAGS:
|
||||
flag_get = FLAGS.get(flag, None)
|
||||
# hide flag contents from log if contains a password
|
||||
# should use secret flag when switch over to openstack-common
|
||||
if ("_password" in flag or "_key" in flag or
|
||||
(flag == "sql_connection" and "mysql:" in flag_get)):
|
||||
LOG.debug(_('%(flag)s : FLAG SET ') % locals())
|
||||
else:
|
||||
LOG.debug('%(flag)s : %(flag_get)s' % locals())
|
||||
try:
|
||||
_launcher.wait()
|
||||
except KeyboardInterrupt:
|
||||
_launcher.stop()
|
||||
rpc.cleanup()
|
||||
|
@ -44,7 +44,7 @@ from nova import log as logging
|
||||
import eventlet
|
||||
|
||||
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
FLAGS.use_stderr = False
|
||||
|
169
nova/tests/integrated/test_multiprocess_api.py
Normal file
169
nova/tests/integrated/test_multiprocess_api.py
Normal file
@ -0,0 +1,169 @@
|
||||
# Copyright (c) 2012 Intel, LLC
|
||||
# Copyright (c) 2012 OpenStack, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Test multiprocess enabled API service.
|
||||
"""
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from nova import flags
|
||||
from nova.log import logging
|
||||
from nova import service
|
||||
from nova.tests.integrated import integrated_helpers
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MultiprocessWSGITest(integrated_helpers._IntegratedTestBase):
|
||||
def _start_api_service(self):
|
||||
# Process will be started in _spawn()
|
||||
self.osapi = service.WSGIService("osapi_compute")
|
||||
self.auth_url = 'http://%s:%s/v2' % (self.osapi.host, self.osapi.port)
|
||||
LOG.info('auth_url = %s' % self.auth_url)
|
||||
|
||||
def _get_flags(self):
|
||||
self.workers = 2
|
||||
f = super(MultiprocessWSGITest, self)._get_flags()
|
||||
f['osapi_compute_workers'] = self.workers
|
||||
return f
|
||||
|
||||
def _spawn(self):
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
# NOTE(johannes): We can't let the child processes exit back
|
||||
# into the unit test framework since then we'll have multiple
|
||||
# processes running the same tests (and possibly forking more
|
||||
# processes that end up in the same situation). So we need
|
||||
# to catch all exceptions and make sure nothing leaks out, in
|
||||
# particlar SystemExit, which is raised by sys.exit(). We use
|
||||
# os._exit() which doesn't have this problem.
|
||||
status = 0
|
||||
try:
|
||||
launcher = service.ProcessLauncher()
|
||||
launcher.launch_server(self.osapi, workers=self.osapi.workers)
|
||||
launcher.wait()
|
||||
except SystemExit as exc:
|
||||
status = exc.code
|
||||
except BaseException:
|
||||
# We need to be defensive here too
|
||||
try:
|
||||
traceback.print_exc()
|
||||
except BaseException:
|
||||
print "Couldn't print traceback"
|
||||
status = 2
|
||||
|
||||
# Really exit
|
||||
os._exit(status)
|
||||
|
||||
self.pid = pid
|
||||
|
||||
# Wait for up to a second for workers to get started
|
||||
start = time.time()
|
||||
while time.time() - start < 1:
|
||||
workers = self._get_workers()
|
||||
if len(workers) == self.workers:
|
||||
break
|
||||
|
||||
time.sleep(.1)
|
||||
|
||||
self.assertEqual(len(workers), self.workers)
|
||||
return workers
|
||||
|
||||
def tearDown(self):
|
||||
if self.pid:
|
||||
# Make sure all processes are stopped
|
||||
os.kill(self.pid, signal.SIGTERM)
|
||||
|
||||
# Make sure we reap our test process
|
||||
self._reap_test()
|
||||
|
||||
super(MultiprocessWSGITest, self).tearDown()
|
||||
|
||||
def _reap_test(self):
|
||||
pid, status = os.waitpid(self.pid, 0)
|
||||
self.pid = None
|
||||
return status
|
||||
|
||||
def _get_workers(self):
|
||||
f = os.popen('ps ax -o pid,ppid,command')
|
||||
# Skip ps header
|
||||
f.readline()
|
||||
|
||||
processes = [tuple(int(p) for p in l.strip().split()[:2])
|
||||
for l in f.readlines()]
|
||||
return [p for p, pp in processes if pp == self.pid]
|
||||
|
||||
def test_killed_worker_recover(self):
|
||||
start_workers = self._spawn()
|
||||
|
||||
# kill one worker and check if new worker can come up
|
||||
LOG.info('pid of first child is %s' % start_workers[0])
|
||||
os.kill(start_workers[0], signal.SIGTERM)
|
||||
|
||||
# loop and check if new worker is spawned (for 1 second max)
|
||||
start = time.time()
|
||||
while time.time() - start < 1:
|
||||
end_workers = self._get_workers()
|
||||
LOG.info('workers: %r' % end_workers)
|
||||
|
||||
if start_workers != end_workers:
|
||||
break
|
||||
|
||||
time.sleep(.1)
|
||||
|
||||
# Make sure worker pids don't match
|
||||
self.assertNotEqual(start_workers, end_workers)
|
||||
|
||||
# check if api service still works
|
||||
flavors = self.api.get_flavors()
|
||||
self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.')
|
||||
|
||||
def _terminate_with_signal(self, sig):
|
||||
self._spawn()
|
||||
|
||||
# check if api service is working
|
||||
flavors = self.api.get_flavors()
|
||||
self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.')
|
||||
|
||||
os.kill(self.pid, sig)
|
||||
|
||||
# loop and check if all processes are killed (for 1 second max)
|
||||
start = time.time()
|
||||
while time.time() - start < 1:
|
||||
workers = self._get_workers()
|
||||
LOG.info('workers: %r' % workers)
|
||||
|
||||
if not workers:
|
||||
break
|
||||
|
||||
time.sleep(.1)
|
||||
|
||||
self.assertFalse(workers, 'No OS processes left.')
|
||||
|
||||
def test_terminate_sigkill(self):
|
||||
self._terminate_with_signal(signal.SIGKILL)
|
||||
status = self._reap_test()
|
||||
self.assertTrue(os.WIFSIGNALED(status))
|
||||
self.assertEqual(os.WTERMSIG(status), signal.SIGKILL)
|
||||
|
||||
def test_terminate_sigterm(self):
|
||||
self._terminate_with_signal(signal.SIGTERM)
|
||||
status = self._reap_test()
|
||||
self.assertTrue(os.WIFEXITED(status))
|
||||
self.assertEqual(os.WEXITSTATUS(status), 0)
|
@ -40,7 +40,7 @@ test_service_opts = [
|
||||
default="nova.tests.test_service.FakeManager",
|
||||
help="Manager for testing"),
|
||||
cfg.StrOpt("test_service_listen",
|
||||
default=None,
|
||||
default='127.0.0.1',
|
||||
help="Host to bind test service to"),
|
||||
cfg.IntOpt("test_service_listen_port",
|
||||
default=0,
|
||||
@ -202,7 +202,6 @@ class TestWSGIService(test.TestCase):
|
||||
|
||||
def test_service_random_port(self):
|
||||
test_service = service.WSGIService("test_service")
|
||||
self.assertEquals(0, test_service.port)
|
||||
test_service.start()
|
||||
self.assertNotEqual(0, test_service.port)
|
||||
test_service.stop()
|
||||
@ -216,10 +215,7 @@ class TestLauncher(test.TestCase):
|
||||
self.service = service.WSGIService("test_service")
|
||||
|
||||
def test_launch_app(self):
|
||||
self.assertEquals(0, self.service.port)
|
||||
launcher = service.Launcher()
|
||||
launcher.launch_server(self.service)
|
||||
# Give spawned thread a chance to execute
|
||||
greenthread.sleep(0)
|
||||
self.assertNotEquals(0, self.service.port)
|
||||
launcher.stop()
|
||||
|
@ -84,8 +84,8 @@ class TestWSGIServer(unittest.TestCase):
|
||||
self.assertEquals("test_app", server.name)
|
||||
|
||||
def test_start_random_port(self):
|
||||
server = nova.wsgi.Server("test_random_port", None, host="127.0.0.1")
|
||||
self.assertEqual(0, server.port)
|
||||
server = nova.wsgi.Server("test_random_port", None,
|
||||
host="127.0.0.1", port=0)
|
||||
server.start()
|
||||
self.assertNotEqual(0, server.port)
|
||||
server.stop()
|
||||
|
48
nova/wsgi.py
48
nova/wsgi.py
@ -44,8 +44,8 @@ class Server(object):
|
||||
|
||||
default_pool_size = 1000
|
||||
|
||||
def __init__(self, name, app, host=None, port=None, pool_size=None,
|
||||
protocol=eventlet.wsgi.HttpProtocol):
|
||||
def __init__(self, name, app, host='0.0.0.0', port=0, pool_size=None,
|
||||
protocol=eventlet.wsgi.HttpProtocol, backlog=128):
|
||||
"""Initialize, but do not start, a WSGI server.
|
||||
|
||||
:param name: Pretty name for logging.
|
||||
@ -53,48 +53,38 @@ class Server(object):
|
||||
:param host: IP address to serve the application.
|
||||
:param port: Port number to server the application.
|
||||
:param pool_size: Maximum number of eventlets to spawn concurrently.
|
||||
:param backlog: Maximum number of queued connections.
|
||||
:returns: None
|
||||
|
||||
:raises: nova.exception.InvalidInput
|
||||
"""
|
||||
self.name = name
|
||||
self.app = app
|
||||
self.host = host or "0.0.0.0"
|
||||
self.port = port or 0
|
||||
self._server = None
|
||||
self._socket = None
|
||||
self._protocol = protocol
|
||||
self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
|
||||
self._logger = logging.getLogger("eventlet.wsgi.server")
|
||||
self._logger = logging.getLogger("nova.%s.wsgi.server" % self.name)
|
||||
self._wsgi_logger = logging.WritableLogger(self._logger)
|
||||
|
||||
def _start(self):
|
||||
"""Run the blocking eventlet WSGI server.
|
||||
if backlog < 1:
|
||||
raise exception.InvalidInput(
|
||||
reason='The backlog must be more than 1')
|
||||
|
||||
self._socket = eventlet.listen((host, port), backlog=backlog)
|
||||
(self.host, self.port) = self._socket.getsockname()
|
||||
LOG.info(_("%(name)s listening on %(host)s:%(port)s") % self.__dict__)
|
||||
|
||||
def start(self):
|
||||
"""Start serving a WSGI application.
|
||||
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
eventlet.wsgi.server(self._socket,
|
||||
self._server = eventlet.spawn(eventlet.wsgi.server,
|
||||
self._socket,
|
||||
self.app,
|
||||
protocol=self._protocol,
|
||||
custom_pool=self._pool,
|
||||
log=self._wsgi_logger)
|
||||
|
||||
def start(self, backlog=128):
|
||||
"""Start serving a WSGI application.
|
||||
|
||||
:param backlog: Maximum number of queued connections.
|
||||
:returns: None
|
||||
:raises: nova.exception.InvalidInput
|
||||
|
||||
"""
|
||||
if backlog < 1:
|
||||
raise exception.InvalidInput(
|
||||
reason='The backlog must be more than 1')
|
||||
self._socket = eventlet.listen((self.host, self.port), backlog=backlog)
|
||||
self._server = eventlet.spawn(self._start)
|
||||
(self.host, self.port) = self._socket.getsockname()
|
||||
LOG.info(_("Started %(name)s on %(host)s:%(port)s") % self.__dict__)
|
||||
|
||||
def stop(self):
|
||||
"""Stop this server.
|
||||
|
||||
@ -105,6 +95,10 @@ class Server(object):
|
||||
|
||||
"""
|
||||
LOG.info(_("Stopping WSGI server."))
|
||||
|
||||
if self._server is not None:
|
||||
# Resize pool to stop new requests from being processed
|
||||
self._pool.resize(0)
|
||||
self._server.kill()
|
||||
|
||||
def wait(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user