diff --git a/README.rst b/README.rst index 5343de67b..16b0c2575 100644 --- a/README.rst +++ b/README.rst @@ -112,40 +112,45 @@ is used for all requests. Run the benchmarking tool using the following command:: - $ zaqar-bench-pc + $ zaqar-bench -By default, the command will run a performance test for 3 seconds, using one -consumer and one producer for each CPU on the system, with 2 greenlet workers -per CPU per process. You can override these defaults in the config file or on -the command line using a variety of options. For example, the following -command runs a performance test for 10 seconds using 4 producer processes with -20 workers each, plus 1 consumer process with 4 workers:: +By default, the command will run a performance test for 5 seconds, using one +producer process with 10 greenlet workers, and one observer process with +5 workers. The consumer role is disabled by default. - $ zaqar-bench-pc -pp 4 -pw 20 -cp 1 -cw 4 -t 10 +You can override these defaults in the config file or on the command line +using a variety of options. For example, the following command runs a +performance test for 30 seconds using 4 producer processes with +20 workers each, plus 4 consumer processes with 20 workers each. Note that +the observer role is also disabled in this example by setting its number of +workers to zero:: -By default, the results are in JSON. For more human-readable output add the ``--verbose`` flag. -Verbose output looks similar to the following:: + $ zaqar-bench -pp 4 -pw 10 -cw 4 -cw 20 -ow 0 -t 30 - Starting Producer... +By default, the results are in JSON. For more human-readable output add +the ``--verbose`` flag. Verbose output looks similar to the following:: - Starting Consumer... + $ zaqar-bench --verbose - Consumer - ======== - duration_sec: 10.2 - ms_per_claim: 37.6 - ms_per_delete: 11.8 - reqs_per_sec: 82.0 - successful_reqs: 833.0 - total_reqs: 833.0 + Starting producer (pp=1 , pw=10)... + + Starting observer (op=1 , ow=5)... Producer ======== - duration_sec: 10.2 - ms_per_req: 3.8 - reqs_per_sec: 1033.6 - successful_reqs: 10523.0 - total_reqs: 10523.0 + duration_sec: 5.1 + ms_per_req: 2.9 + reqs_per_sec: 344.5 + successful_reqs: 1742.0 + total_reqs: 1742.0 + + Observer + ======== + duration_sec: 5.0 + ms_per_req: 2.9 + reqs_per_sec: 339.3 + successful_reqs: 1706.0 + total_reqs: 1706.0 .. _`OpenStack` : http://openstack.org/ diff --git a/bench-requirements.txt b/bench-requirements.txt index 939afe982..7b0d1dd25 100644 --- a/bench-requirements.txt +++ b/bench-requirements.txt @@ -1,5 +1,4 @@ argparse>=1.2.1 gevent>=1.0.1 marktime>=0.2.0 -psutil>=2.1.1 python-zaqarclient>=0.0.2 diff --git a/setup.cfg b/setup.cfg index ac53d0c05..7e507b675 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,7 +32,7 @@ source-dir = doc/source [entry_points] console_scripts = - zaqar-bench-pc = zaqar.bench.conductor:main + zaqar-bench = zaqar.bench.conductor:main zaqar-server = zaqar.cmd.server:run marconi-server = zaqar.cmd.server:run diff --git a/zaqar/bench/conductor.py b/zaqar/bench/conductor.py index 24ac8fea5..ba92fcd6a 100644 --- a/zaqar/bench/conductor.py +++ b/zaqar/bench/conductor.py @@ -17,17 +17,52 @@ from __future__ import print_function import json import multiprocessing as mp -from zaqar.bench.config import conf +from zaqarclient.queues.v1 import client + +from zaqar.bench import config from zaqar.bench import consumer +from zaqar.bench import observer from zaqar.bench import producer +CONF = config.conf + + +def _print_verbose_stats(name, stats): + print(name.capitalize()) + print('=' * len(name)) + + values = sorted(stats.items(), key=lambda v: v[0]) + formatted_vals = ['{}: {:.1f}'.format(*v) for v in values] + + print('\n'.join(formatted_vals)) + print() # Blank line + + +def _reset_queues(): + cli = client.Client(CONF.server_url) + + for i in range(CONF.num_queues): + # TODO(kgriffs): DRY up name generation so it is done + # in a helper, vs. being copy-pasted everywhere. + queue = cli.queue(CONF.queue_prefix + '-' + str(i)) + queue.delete() + def main(): - conf(project='zaqar', prog='zaqar-benchmark') + CONF(project='zaqar', prog='zaqar-benchmark') + + # NOTE(kgriffs): Reset queues since last time. We don't + # clean them up after the performance test, in case + # the user wants to examine the state of the system. + if not CONF.skip_queue_reset: + if CONF.verbose: + print('Resetting queues...') + + _reset_queues() downstream_queue = mp.Queue() procs = [mp.Process(target=worker.run, args=(downstream_queue,)) - for worker in [producer, consumer]] + for worker in [producer, consumer, observer]] for each_proc in procs: each_proc.start() @@ -39,29 +74,32 @@ def main(): for each_proc in procs: stats.update(downstream_queue.get_nowait()) - if conf.verbose: + if CONF.verbose: print() - for name, stats_group in stats.items(): - print(name.capitalize()) - print('=' * len(name)) + for name in ('producer', 'observer', 'consumer'): + stats_group = stats[name] - values = sorted(stats_group.items(), key=lambda v: v[0]) - formatted_vals = ["{}: {:.1f}".format(*v) for v in values] + # Skip disabled workers + if not stats_group['duration_sec']: + continue - print("\n".join(formatted_vals)) - print('') # Blank line + _print_verbose_stats(name, stats_group) else: stats['params'] = { 'producer': { - 'processes': conf.producer_processes, - 'workers': conf.producer_workers + 'processes': CONF.producer_processes, + 'workers': CONF.producer_workers }, 'consumer': { - 'processes': conf.consumer_processes, - 'workers': conf.consumer_workers - } + 'processes': CONF.consumer_processes, + 'workers': CONF.consumer_workers + }, + 'observer': { + 'processes': CONF.observer_processes, + 'workers': CONF.observer_workers + }, } print(json.dumps(stats)) diff --git a/zaqar/bench/config.py b/zaqar/bench/config.py index 03dcc108b..e2f7fc76a 100644 --- a/zaqar/bench/config.py +++ b/zaqar/bench/config.py @@ -13,38 +13,61 @@ # limitations under the License. from oslo.config import cfg -import psutil conf = cfg.CONF _CLI_OPTIONS = ( cfg.IntOpt( 'producer_processes', short='pp', - default=psutil.NUM_CPUS, + default=1, help='Number of Producer Processes'), cfg.IntOpt( 'producer_workers', short='pw', - default=psutil.NUM_CPUS * 2, + default=10, help='Number of Producer Workers'), + cfg.IntOpt( 'consumer_processes', short='cp', - default=psutil.NUM_CPUS, + default=1, help='Number of Consumer Processes'), cfg.IntOpt( 'consumer_workers', short='cw', - default=psutil.NUM_CPUS * 2, + default=0, help='Number of Consumer Workers'), + + cfg.IntOpt( + 'observer_processes', + short='op', + default=1, + help='Number of Observer Processes'), + cfg.IntOpt( + 'observer_workers', + short='ow', + default=5, + help='Number of Observer Workers'), + cfg.IntOpt('messages_per_claim', short='cno', default=5, help=('Number of messages the consumer will attempt to ' 'claim at a time')), - cfg.IntOpt('time', short='t', default=3, + cfg.IntOpt('messages_per_list', short='lno', default=5, + help=('Number of messages the obserer will attempt to ' + 'list at a time')), + + cfg.IntOpt('time', short='t', default=5, help="Duration of the performance test, in seconds"), + cfg.StrOpt('server_url', short='s', default='http://localhost:8888'), + cfg.StrOpt('queue_prefix', short='q', default='ogre-test-queue'), cfg.IntOpt('num_queues', short='qno', default=4), - cfg.StrOpt('messages_path', short='m') + + cfg.StrOpt('messages_path', short='m'), + + cfg.BoolOpt('skip_queue_reset', default=False, + help=('Do not reset queues before running' + 'the performance test')), ) conf.register_cli_opts(_CLI_OPTIONS) diff --git a/zaqar/bench/consumer.py b/zaqar/bench/consumer.py index 44da977a2..68d3e2658 100644 --- a/zaqar/bench/consumer.py +++ b/zaqar/bench/consumer.py @@ -27,7 +27,9 @@ import marktime from zaqarclient.queues.v1 import client from zaqarclient.transport.errors import TransportError -from zaqar.bench.config import conf +from zaqar.bench import config + +CONF = config.conf def claim_delete(queues, stats, test_duration, ttl, grace, limit): @@ -93,8 +95,8 @@ def claim_delete(queues, stats, test_duration, ttl, grace, limit): def load_generator(stats, num_workers, num_queues, test_duration, url, ttl, grace, limit): - cli = client.Client(conf.server_url) - queues = [cli.queue(conf.queue_prefix + '-' + str(i)) + cli = client.Client(CONF.server_url) + queues = [cli.queue(CONF.queue_prefix + '-' + str(i)) for i in range(num_queues)] gevent.joinall([ @@ -125,9 +127,9 @@ def crunch(stats): def run(upstream_queue): - num_procs = conf.consumer_processes - num_workers = conf.consumer_workers - num_queues = conf.num_queues + num_procs = CONF.consumer_processes + num_workers = CONF.consumer_workers + num_queues = CONF.num_queues # Stats that will be reported duration = 0 @@ -141,17 +143,18 @@ def run(upstream_queue): # Performance test if num_procs and num_workers: - test_duration = conf.time + test_duration = CONF.time stats = mp.Queue() # TODO(TheSriram) : Make ttl and grace configurable args = (stats, num_workers, num_queues, test_duration, - conf.server_url, 300, 200, conf.messages_per_claim) + CONF.server_url, 300, 200, CONF.messages_per_claim) procs = [mp.Process(target=load_generator, args=args) for _ in range(num_procs)] - if conf.verbose: - print("\nStarting Consumer...") + if CONF.verbose: + print('\nStarting consumers (cp={0}, cw={1})...'.format( + num_procs, num_workers)) start = time.time() diff --git a/zaqar/bench/observer.py b/zaqar/bench/observer.py new file mode 100644 index 000000000..d49a12a41 --- /dev/null +++ b/zaqar/bench/observer.py @@ -0,0 +1,178 @@ +# Copyright (c) 2014 Rackspace, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division +from __future__ import print_function + +import multiprocessing as mp +import random +import sys +import time + +from gevent import monkey as curious_george +curious_george.patch_all(thread=False, select=False) +import gevent +import marktime +from six.moves import urllib +from zaqarclient.queues.v1 import client +from zaqarclient.transport.errors import TransportError + +from zaqar.bench import config + +CONF = config.conf + + +# +# TODO(kgriffs): Factor out the common code from producer, consumer +# and worker (DRY all the things!) +# + + +def _extract_marker(links): + for link in links: + if link['rel'] == 'next': + href = link['href'] + break + + query = urllib.parse.urlparse(href).query + params = urllib.parse.parse_qs(query) + return params['marker'][0] + + +def observer(queues, stats, test_duration, limit): + """Observer Worker + + The observer lists messages without claiming them. + """ + + end = time.time() + test_duration + + total_elapsed = 0 + total_succeeded = 0 + total_failed = 0 + + queues = [{'q': q, 'm': None} for q in queues] + + while time.time() < end: + # NOTE(kgriffs): Distribute requests across all queues evenly. + queue = random.choice(queues) + + try: + marktime.start('list_messages') + cursor = queue['q'].messages(limit=limit, marker=queue['m']) + total_elapsed += marktime.stop('list_messages').seconds + total_succeeded += 1 + + messages = list(cursor) + + if messages: + # TODO(kgriffs): Figure out a less hacky way to do this + # while preserving the ability to measure elapsed time + # per request. + queue['m'] = _extract_marker(cursor._links) + + except TransportError as ex: + sys.stderr.write("Could not list messages : {0}\n".format(ex)) + total_failed += 1 + + total_requests = total_succeeded + total_failed + + stats.put({ + 'total_requests': total_requests, + 'total_succeeded': total_succeeded, + 'total_elapsed': total_elapsed, + }) + + +def load_generator(stats, num_workers, num_queues, + test_duration, limit): + + cli = client.Client(CONF.server_url) + queues = [cli.queue(CONF.queue_prefix + '-' + str(i)) + for i in range(num_queues)] + + gevent.joinall([ + gevent.spawn(observer, + queues, stats, test_duration, limit) + + for _ in range(num_workers) + ]) + + +def crunch(stats): + total_requests = 0 + total_succeeded = 0 + total_elapsed = 0.0 + + while not stats.empty(): + entry = stats.get_nowait() + total_requests += entry['total_requests'] + total_succeeded += entry['total_succeeded'] + total_elapsed += entry['total_elapsed'] + + return (total_requests, total_succeeded, total_elapsed) + + +def run(upstream_queue): + num_procs = CONF.observer_processes + num_workers = CONF.observer_workers + num_queues = CONF.num_queues + + # Stats that will be reported + duration = 0 + total_requests = 0 + total_succeeded = 0 + throughput = 0 + latency = 0 + + # Performance test + if num_procs and num_workers: + test_duration = CONF.time + stats = mp.Queue() + args = (stats, num_workers, num_queues, test_duration, + CONF.messages_per_list) + + procs = [mp.Process(target=load_generator, args=args) + for _ in range(num_procs)] + + if CONF.verbose: + print('\nStarting observer (op={0}, ow={1})...'.format( + num_procs, num_workers)) + + start = time.time() + + for each_proc in procs: + each_proc.start() + + for each_proc in procs: + each_proc.join() + + (total_requests, total_succeeded, total_elapsed) = crunch(stats) + + duration = time.time() - start + + throughput = total_succeeded / duration + + if total_succeeded: + latency = (1000 * total_elapsed / total_succeeded) + + upstream_queue.put({ + 'observer': { + 'duration_sec': duration, + 'total_reqs': total_requests, + 'successful_reqs': total_succeeded, + 'reqs_per_sec': throughput, + 'ms_per_req': latency, + } + }) diff --git a/zaqar/bench/producer.py b/zaqar/bench/producer.py index 38ec7e015..961a1ca8b 100644 --- a/zaqar/bench/producer.py +++ b/zaqar/bench/producer.py @@ -28,7 +28,9 @@ import marktime from zaqarclient.queues.v1 import client from zaqarclient.transport.errors import TransportError -from zaqar.bench.config import conf +from zaqar.bench import config + +CONF = config.conf def choose_message(message_pool): @@ -48,7 +50,7 @@ def choose_message(message_pool): def load_messages(): default_file_name = 'zaqar-benchmark-messages.json' - messages_path = conf.messages_path or conf.find_file(default_file_name) + messages_path = CONF.messages_path or CONF.find_file(default_file_name) if messages_path: with open(messages_path) as f: message_pool = json.load(f) @@ -102,8 +104,8 @@ def producer(queues, message_pool, stats, test_duration): # weight them, so can have some busy queues, some not.) def load_generator(stats, num_workers, num_queues, test_duration): - cli = client.Client(conf.server_url) - queues = [cli.queue(conf.queue_prefix + '-' + str(i)) + cli = client.Client(CONF.server_url) + queues = [cli.queue(CONF.queue_prefix + '-' + str(i)) for i in range(num_queues)] message_pool = load_messages() @@ -131,9 +133,9 @@ def crunch(stats): def run(upstream_queue): - num_procs = conf.producer_processes - num_workers = conf.producer_workers - num_queues = conf.num_queues + num_procs = CONF.producer_processes + num_workers = CONF.producer_workers + num_queues = CONF.num_queues duration = 0 total_requests = 0 @@ -142,7 +144,7 @@ def run(upstream_queue): latency = 0 if num_procs and num_workers: - test_duration = conf.time + test_duration = CONF.time stats = mp.Queue() args = (stats, num_workers, num_queues, test_duration) @@ -155,8 +157,9 @@ def run(upstream_queue): for _ in range(num_procs) ] - if conf.verbose: - print('\nStarting Producer...') + if CONF.verbose: + print('\nStarting producer (pp={0}, pw={1})...'.format( + num_procs, num_workers)) start = time.time()