Refactor th to have one gearman worker thread

Change-Id: I78f95a67b80ce0627b4a3bbb20578f3d16028714
This commit is contained in:
Joshua Hesketh 2013-11-20 12:11:55 +11:00
parent 527966b2e4
commit 4343b956c5
8 changed files with 116 additions and 95 deletions

View File

@ -67,10 +67,11 @@ Modify the config.json appropriately::
for projects. This is the cache directory used by pip. for projects. This is the cache directory used by pip.
**plugins** **plugins**
A list of enabled plugins and their settings in a dictionary. A list of enabled plugins and their settings in a dictionary.
The only required parameter is *name* which should be the The only required parameters are *name*, which should be the
same as the folder containing the plugin module. Any other same as the folder containing the plugin module, and
parameters are specified by the plugin themselves as *function*, which is the function registered with zuul.
required. Any other parameters are specified by the plugin themselves
as required.
**publish_logs** **publish_logs**
Log results from plugins can be published using multiple Log results from plugins can be published using multiple
methods. Currently only a local copy is fully implemented. methods. Currently only a local copy is fully implemented.

View File

@ -11,8 +11,8 @@
"plugins": [ "plugins": [
{ {
"name": "gate_real_db_upgrade", "name": "gate_real_db_upgrade",
"datasets_dir": "/var/lib/turbo-hipster/datasets", "function": "build:gate-real-db-upgrade_nova_mysql",
"job": "gate-real-db-upgrade_nova_mysql" "datasets_dir": "/var/lib/turbo-hipster/datasets"
} }
], ],
"publish_logs": { "publish_logs": {

View File

@ -21,15 +21,15 @@ import os
import re import re
import time import time
from turbo_hipster.worker_manager import GearmanManager from turbo_hipster.worker_manager import ZuulManager
from turbo_hipster.task_plugins.gate_real_db_upgrade.task import Runner\ from turbo_hipster.task_plugins.gate_real_db_upgrade.task import Runner\
as RealDbUpgradeRunner as RealDbUpgradeRunner
class FakeGearmanManager(GearmanManager): class FakeZuulManager(ZuulManager):
def __init__(self, config, tasks, test): def __init__(self, config, tasks, test):
self.test = test self.test = test
super(FakeGearmanManager, self).__init__(config, tasks) super(FakeZuulManager, self).__init__(config, tasks)
def setup_gearman(self): def setup_gearman(self):
hostname = os.uname()[1] hostname = os.uname()[1]
@ -160,16 +160,6 @@ class FakeRealDbUpgradeRunner(RealDbUpgradeRunner):
plugin_config, plugin_config,
worker_name) worker_name)
def setup_gearman(self):
self.log.debug("Set up real_db gearman worker")
self.gearman_worker = FakeWorker('FakeRealDbUpgradeRunner_worker',
self.test)
self.gearman_worker.addServer(
self.global_config['zuul_server']['gearman_host'],
self.global_config['zuul_server']['gearman_port']
)
self.register_functions()
class BuildHistory(object): class BuildHistory(object):
def __init__(self, **kw): def __init__(self, **kw):

View File

@ -19,7 +19,7 @@ import json
import os import os
import testtools import testtools
import time import time
from fakes import FakeGearmanManager, FakeGearmanServer,\ from fakes import FakeZuulManager, FakeGearmanServer,\
FakeRealDbUpgradeRunner FakeRealDbUpgradeRunner
CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'etc') CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'etc')
@ -40,9 +40,7 @@ class TestGearmanManager(testtools.TestCase):
'test-worker-1', self) 'test-worker-1', self)
self.tasks = dict(FakeRealDbUpgradeRunner_worker=self.task) self.tasks = dict(FakeRealDbUpgradeRunner_worker=self.task)
self.gearman_manager = FakeGearmanManager(self.config, self.gearman_manager = FakeZuulManager(self.config, self.tasks, self)
self.tasks,
self)
def test_manager_function_registered(self): def test_manager_function_registered(self):
""" Check the manager is set up correctly and registered with the """ Check the manager is set up correctly and registered with the

View File

@ -224,6 +224,8 @@ def scp_push_file(job_log_dir, file_path, local_config):
def determine_job_identifier(zuul_arguments, job, unique): def determine_job_identifier(zuul_arguments, job, unique):
if 'build:' in job:
job = job.split('build:')[1]
return os.path.join(zuul_arguments['ZUUL_CHANGE'][:2], return os.path.join(zuul_arguments['ZUUL_CHANGE'][:2],
zuul_arguments['ZUUL_CHANGE'], zuul_arguments['ZUUL_CHANGE'],
zuul_arguments['ZUUL_PATCHSET'], zuul_arguments['ZUUL_PATCHSET'],

View File

@ -13,12 +13,10 @@
# under the License. # under the License.
import gear
import json import json
import logging import logging
import os import os
import re import re
import threading
from turbo_hipster.lib import utils from turbo_hipster.lib import utils
@ -31,7 +29,7 @@ MIGRATION_START_RE = re.compile('([0-9]+) -> ([0-9]+)\.\.\.$')
MIGRATION_END_RE = re.compile('^done$') MIGRATION_END_RE = re.compile('^done$')
class Runner(threading.Thread): class Runner(object):
""" This thread handles the actual sql-migration tests. """ This thread handles the actual sql-migration tests.
It pulls in a gearman job from the build:gate-real-db-upgrade It pulls in a gearman job from the build:gate-real-db-upgrade
@ -39,16 +37,12 @@ class Runner(threading.Thread):
log = logging.getLogger("task_plugins.gate_real_db_upgrade.task.Runner") log = logging.getLogger("task_plugins.gate_real_db_upgrade.task.Runner")
def __init__(self, global_config, plugin_config, worker_name): def __init__(self, global_config, plugin_config, job_name):
super(Runner, self).__init__()
self._stop = threading.Event()
self.global_config = global_config self.global_config = global_config
self.plugin_config = plugin_config self.plugin_config = plugin_config
self.job_name = job_name
self.worker_name = worker_name
# Set up the runner worker # Set up the runner worker
self.gearman_worker = None
self.datasets = [] self.datasets = []
self.job = None self.job = None
@ -61,31 +55,6 @@ class Runner(threading.Thread):
self.current_step = 0 self.current_step = 0
self.total_steps = 4 self.total_steps = 4
self.setup_gearman()
def setup_gearman(self):
self.log.debug("Set up real_db gearman worker")
self.gearman_worker = gear.Worker(self.worker_name)
self.gearman_worker.addServer(
self.global_config['zuul_server']['gearman_host'],
self.global_config['zuul_server']['gearman_port']
)
self.register_functions()
def register_functions(self):
self.gearman_worker.registerFunction(
'build:' + self.plugin_config['job'])
def stop(self):
self._stop.set()
# Unblock gearman
self.log.debug("Telling gearman to stop waiting for jobs")
self.gearman_worker.stopWaitingForJobs()
self.gearman_worker.shutdown()
def stopped(self):
return self._stop.isSet()
def stop_worker(self, number): def stop_worker(self, number):
# Check the number is for this job instance # Check the number is for this job instance
# (makes it possible to run multiple workers with this task # (makes it possible to run multiple workers with this task
@ -93,22 +62,10 @@ class Runner(threading.Thread):
if number == self.job.unique: if number == self.job.unique:
self.log.debug("We've been asked to stop by our gearman manager") self.log.debug("We've been asked to stop by our gearman manager")
self.cancelled = True self.cancelled = True
# TODO: Work out how to kill current step
def run(self): def start_job(self, job):
while True and not self.stopped(): self.job = job
try:
# Reset job information:
self.current_step = 0
self.cancelled = False
self.work_data = None
# gearman_worker.getJob() blocks until a job is available
self.log.debug("Waiting for job")
self.job = self.gearman_worker.getJob()
self._handle_job()
except:
self.log.exception('Exception retrieving log event.')
def _handle_job(self):
if self.job is not None: if self.job is not None:
try: try:
self.job_arguments = \ self.job_arguments = \
@ -215,7 +172,7 @@ class Runner(threading.Thread):
dataset['config']['project'] and dataset['config']['project'] and
self._get_project_command(dataset['config']['type'])): self._get_project_command(dataset['config']['type'])):
dataset['determined_path'] = utils.determine_job_identifier( dataset['determined_path'] = utils.determine_job_identifier(
self.job_arguments, self.plugin_config['job'], self.job_arguments, self.plugin_config['function'],
self.job.unique self.job.unique
) )
dataset['job_log_file_path'] = os.path.join( dataset['job_log_file_path'] = os.path.join(
@ -315,7 +272,7 @@ class Runner(threading.Thread):
project_name + '/.git', project_name + '/.git',
os.path.join( os.path.join(
self.global_config['git_working_dir'], self.global_config['git_working_dir'],
self.worker_name, self.job_name,
project_name project_name
) )
) )
@ -333,7 +290,7 @@ class Runner(threading.Thread):
if self.work_data is None: if self.work_data is None:
hostname = os.uname()[1] hostname = os.uname()[1]
self.work_data = dict( self.work_data = dict(
name=self.worker_name, name=self.job_name,
number=self.job.unique, number=self.job.unique,
manager='turbo-hipster-manager-%s' % hostname, manager='turbo-hipster-manager-%s' % hostname,
url='http://localhost', url='http://localhost',

View File

@ -20,7 +20,7 @@ import os
import threading import threading
class GearmanManager(threading.Thread): class ZuulManager(threading.Thread):
""" This thread manages all of the launched gearman workers. """ This thread manages all of the launched gearman workers.
As required by the zuul protocol it handles stopping builds when they As required by the zuul protocol it handles stopping builds when they
@ -31,7 +31,7 @@ class GearmanManager(threading.Thread):
log = logging.getLogger("worker_manager.GearmanManager") log = logging.getLogger("worker_manager.GearmanManager")
def __init__(self, config, tasks): def __init__(self, config, tasks):
super(GearmanManager, self).__init__() super(ZuulManager, self).__init__()
self._stop = threading.Event() self._stop = threading.Event()
self.config = config self.config = config
self.tasks = tasks self.tasks = tasks
@ -81,3 +81,67 @@ class GearmanManager(threading.Thread):
except Exception as e: except Exception as e:
self.log.exception('Exception handling log event.') self.log.exception('Exception handling log event.')
job.sendWorkException(str(e).encode('utf-8')) job.sendWorkException(str(e).encode('utf-8'))
class ZuulClient(threading.Thread):
""" ..."""
log = logging.getLogger("worker_manager.ZuulClient")
def __init__(self, global_config, worker_name):
super(ZuulClient, self).__init__()
self._stop = threading.Event()
self.global_config = global_config
self.worker_name = worker_name
# Set up the runner worker
self.gearman_worker = None
self.functions = {}
self.job = None
self.cancelled = False
self.setup_gearman()
def setup_gearman(self):
self.log.debug("Set up gearman worker")
self.gearman_worker = gear.Worker(self.worker_name)
self.gearman_worker.addServer(
self.global_config['zuul_server']['gearman_host'],
self.global_config['zuul_server']['gearman_port']
)
self.register_functions()
def register_functions(self):
for function_name, plugin in self.functions.items():
self.gearman_worker.registerFunction(function_name)
def add_function(self, function_name, plugin):
self.functions[function_name] = plugin
def stop(self):
self._stop.set()
# Unblock gearman
self.log.debug("Telling gearman to stop waiting for jobs")
self.gearman_worker.stopWaitingForJobs()
self.gearman_worker.shutdown()
def stopped(self):
return self._stop.isSet()
def run(self):
while True and not self.stopped():
try:
self.cancelled = False
# gearman_worker.getJob() blocks until a job is available
self.log.debug("Waiting for job")
self.job = self.gearman_worker.getJob()
self._handle_job()
except:
self.log.exception('Exception retrieving log event.')
def _handle_job(self):
""" We have a job, give it to the right plugin """
self.functions[self.job.name].start_job(self.job)

View File

@ -40,16 +40,18 @@ class Server(object):
log = logging.getLogger("worker_server.Server") log = logging.getLogger("worker_server.Server")
def __init__(self, config): def __init__(self, config):
# Config init
self.config = config
self.manager = None
self.plugins = []
self.load_plugins()
# Python logging output file. # Python logging output file.
self.debug_log = self.config['debug_log'] self.debug_log = self.config['debug_log']
# Config init
self.config = config
self.zuul_manager = None
self.zuul_client = None
self.plugins = []
self.worker_name = os.uname()[1]
self.tasks = {} self.tasks = {}
self.load_plugins()
def setup_logging(self): def setup_logging(self):
if self.debug_log: if self.debug_log:
@ -74,23 +76,30 @@ class Server(object):
'plugin_config': plugin 'plugin_config': plugin
}) })
def run_tasks(self): def start_gearman_workers(self):
""" Run the tasks """ """ Run the tasks """
for thread_number, plugin in enumerate(self.plugins): self.zuul_client = worker_manager.ZuulClient(self.config,
self.worker_name)
for task_number, plugin in enumerate(self.plugins):
module = plugin['module'] module = plugin['module']
worker_name = '%s-%s-%s' % (plugin['plugin_config']['name'], job_name = '%s-%s-%s' % (plugin['plugin_config']['name'],
os.uname()[1], thread_number) self.worker_name, task_number)
self.tasks[worker_name] = module.Runner( self.tasks[job_name] = module.Runner(
self.config, self.config,
plugin['plugin_config'], plugin['plugin_config'],
worker_name job_name
) )
self.tasks[worker_name].daemon = True self.zuul_client.add_function(plugin['plugin_config']['function'],
self.tasks[worker_name].start() self.tasks[job_name])
self.manager = worker_manager.GearmanManager(self.config, self.tasks) self.zuul_client.register_functions()
self.manager.daemon = True self.zuul_client.daemon = True
self.manager.start() self.zuul_client.start()
self.zuul_manager = worker_manager.ZuulManager(self.config, self.tasks)
self.zuul_manager.daemon = True
self.zuul_manager.start()
def exit_handler(self, signum): def exit_handler(self, signum):
signal.signal(signal.SIGUSR1, signal.SIG_IGN) signal.signal(signal.SIGUSR1, signal.SIG_IGN)
@ -101,7 +110,7 @@ class Server(object):
def main(self): def main(self):
self.setup_logging() self.setup_logging()
self.run_tasks() self.start_gearman_workers()
while True: while True:
try: try: