diff --git a/ceilometer/cmd/polling.py b/ceilometer/cmd/polling.py index e61c099ad0..aa7e829bd7 100644 --- a/ceilometer/cmd/polling.py +++ b/ceilometer/cmd/polling.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import multiprocessing import shlex import cotyledon @@ -79,11 +80,20 @@ def _prepare_config(): return conf -def create_polling_service(worker_id, conf=None): +def create_polling_service(worker_id, conf=None, queue=None): if conf is None: conf = _prepare_config() conf.log_opt_values(LOG, log.DEBUG) - return manager.AgentManager(worker_id, conf, conf.polling_namespaces) + return manager.AgentManager(worker_id, conf, + conf.polling_namespaces, queue) + + +def create_heartbeat_service(worker_id, conf, queue=None): + if conf is None: + conf = _prepare_config() + conf.log_opt_values(LOG, log.DEBUG) + return manager.AgentHeartBeatManager(worker_id, conf, + conf.polling_namespaces, queue) def main(): @@ -91,5 +101,11 @@ def main(): conf = _prepare_config() priv_context.init(root_helper=shlex.split(utils._get_root_helper())) oslo_config_glue.setup(sm, conf) - sm.add(create_polling_service, args=(conf,)) + + if conf.polling.heartbeat_socket_dir is not None: + queue = multiprocessing.Queue() + sm.add(create_heartbeat_service, args=(conf, queue)) + else: + queue = None + sm.add(create_polling_service, args=(conf, queue)) sm.run() diff --git a/ceilometer/polling/manager.py b/ceilometer/polling/manager.py index c2543b9d42..58c9c5e3e8 100644 --- a/ceilometer/polling/manager.py +++ b/ceilometer/polling/manager.py @@ -19,7 +19,10 @@ import glob import itertools import logging import os +import queue import random +import socket +import threading import uuid from concurrent import futures @@ -51,6 +54,10 @@ POLLING_OPTS = [ default="polling.yaml", help="Configuration file for polling definition." ), + cfg.StrOpt('heartbeat_socket_dir', + default=None, + help="Path to directory where socket file for polling " + "heartbeat will be created."), cfg.StrOpt('partitioning_group_prefix', deprecated_group='central', help='Work-load partitioning group prefix. Use only if you ' @@ -89,6 +96,11 @@ class PollingException(agent.ConfigException): super(PollingException, self).__init__('Polling', message, cfg) +class HeartBeatException(agent.ConfigException): + def __init__(self, message, cfg): + super(HeartBeatException, self).__init__('Polling', message, cfg) + + class Resources(object): def __init__(self, agent_manager): self.agent_manager = agent_manager @@ -207,6 +219,8 @@ class PollingTask(object): ) sample_batch = [] + self.manager.heartbeat(pollster.name, polling_timestamp) + for sample in samples: # Note(yuywz): Unify the timestamp of polled samples sample.set_timestamp(polling_timestamp) @@ -289,15 +303,100 @@ class PollingTask(object): ) +class AgentHeartBeatManager(cotyledon.Service): + def __init__(self, worker_id, conf, namespaces=None, queue=None): + super(AgentHeartBeatManager, self).__init__(worker_id) + self.conf = conf + + if conf.polling.heartbeat_socket_dir is None: + raise HeartBeatException("path to a directory containing " + "heart beat sockets is required", conf) + + if type(namespaces) is not list: + if namespaces is None: + namespaces = "" + namespaces = [namespaces] + + self._lock = threading.Lock() + self._queue = queue + self._status = dict() + self._sock_pth = os.path.join( + conf.polling.heartbeat_socket_dir, + f"ceilometer-{'-'.join(sorted(namespaces))}.socket" + ) + + self._delete_socket() + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + self._sock.bind(self._sock_pth) + self._sock.listen(1) + except socket.error as err: + raise HeartBeatException("Failed to open socket file " + f"({self._sock_pth}): {err}", conf) + + LOG.info("Starting heartbeat child service. Listening" + f" on {self._sock_pth}") + + def _delete_socket(self): + try: + os.remove(self._sock_pth) + except OSError: + pass + + def terminate(self): + self._tpe.shutdown(wait=False, cancel_futures=True) + self._sock.close() + self._delete_socket() + + def _update_status(self): + hb = self._queue.get() + with self._lock: + self._status[hb['pollster']] = hb['timestamp'] + LOG.debug(f"Updated heartbeat for {hb['pollster']} " + f"({hb['timestamp']})") + + def _send_heartbeat(self): + s, addr = self._sock.accept() + LOG.debug("Heartbeat status report requested " + f"at {self._sock_pth}") + with self._lock: + out = '\n'.join([f"{k} {v}" + for k, v in self._status.items()]) + s.sendall(out.encode('utf-8')) + s.close() + LOG.debug(f"Reported heartbeat status:\n{out}") + + def run(self): + super(AgentHeartBeatManager, self).run() + + LOG.debug("Started heartbeat child process.") + + def _read_queue(): + LOG.debug("Started heartbeat update thread") + while True: + self._update_status() + + def _report_status(): + LOG.debug("Started heartbeat reporting thread") + while True: + self._send_heartbeat() + + with futures.ThreadPoolExecutor(max_workers=2) as executor: + self._tpe = executor + executor.submit(_read_queue) + executor.submit(_report_status) + + class AgentManager(cotyledon.Service): - def __init__(self, worker_id, conf, namespaces=None): + def __init__(self, worker_id, conf, namespaces=None, queue=None): namespaces = namespaces or ['compute', 'central'] group_prefix = conf.polling.partitioning_group_prefix super(AgentManager, self).__init__(worker_id) self.conf = conf + self._queue = queue if type(namespaces) is not list: namespaces = [namespaces] @@ -350,6 +449,19 @@ class AgentManager(cotyledon.Service): self._keystone = None self._keystone_last_exception = None + def heartbeat(self, name, timestamp): + """Send heartbeat data if the agent is configured to do so.""" + if self._queue is not None: + try: + hb = { + 'timestamp': timestamp, + 'pollster': name + } + self._queue.put_nowait(hb) + LOG.debug(f"Polster heartbeat update: {name}") + except queue.Full: + LOG.warning(f"Heartbeat queue full. Update failed: {hb}") + def create_dynamic_pollsters(self, namespaces): """Creates dynamic pollsters diff --git a/ceilometer/tests/unit/polling/test_heartbeat.py b/ceilometer/tests/unit/polling/test_heartbeat.py new file mode 100644 index 0000000000..b172de7742 --- /dev/null +++ b/ceilometer/tests/unit/polling/test_heartbeat.py @@ -0,0 +1,113 @@ +# +# Copyright 2024 Red Hat, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Tests for ceilometer polling heartbeat process""" + +import multiprocessing +import shutil +import tempfile + +from oslo_utils import timeutils +from unittest import mock + +from ceilometer.polling import manager +from ceilometer import service +from ceilometer.tests import base + + +class TestHeartBeatManagert(base.BaseTestCase): + def setUp(self): + super(TestHeartBeatManagert, self).setUp() + self.conf = service.prepare_service([], []) + self.tmpdir = tempfile.mkdtemp() + + self.queue = multiprocessing.Queue() + self.mgr = manager.AgentManager(0, self.conf, namespaces='central', + queue=self.queue) + + def tearDown(self): + super(TestHeartBeatManagert, self).tearDown() + shutil.rmtree(self.tmpdir) + + def setup_polling(self, poll_cfg=None): + name = self.cfg2file(poll_cfg or self.polling_cfg) + self.conf.set_override('cfg_file', name, group='polling') + self.mgr.polling_manager = manager.PollingManager(self.conf) + + def test_hb_not_configured(self): + self.assertRaises(manager.HeartBeatException, + manager.AgentHeartBeatManager, + 0, self.conf, + namespaces='ipmi', + queue=self.queue) + + @mock.patch('ceilometer.polling.manager.LOG') + def test_hb_startup(self, LOG): + # activate heartbeat agent + self.conf.set_override('heartbeat_socket_dir', self.tmpdir, + group='polling') + manager.AgentHeartBeatManager(0, self.conf, namespaces='compute', + queue=self.queue) + calls = [mock.call("Starting heartbeat child service. Listening" + f" on {self.tmpdir}/ceilometer-compute.socket")] + LOG.info.assert_has_calls(calls) + + @mock.patch('ceilometer.polling.manager.LOG') + def test_hb_update(self, LOG): + self.conf.set_override('heartbeat_socket_dir', self.tmpdir, + group='polling') + hb = manager.AgentHeartBeatManager(0, self.conf, namespaces='central', + queue=self.queue) + + timestamp = timeutils.utcnow().isoformat() + self.queue.put_nowait({'timestamp': timestamp, 'pollster': 'test'}) + + hb._update_status() + calls = [mock.call(f"Updated heartbeat for test ({timestamp})")] + LOG.debug.assert_has_calls(calls) + + @mock.patch('ceilometer.polling.manager.LOG') + def test_hb_send(self, LOG): + with mock.patch('socket.socket') as FakeSocket: + sub_skt = mock.Mock() + sub_skt.sendall.return_value = None + sub_skt.sendall.return_value = None + + skt = FakeSocket.return_value + skt.bind.return_value = mock.Mock() + skt.listen.return_value = mock.Mock() + skt.accept.return_value = (sub_skt, "") + + self.conf.set_override('heartbeat_socket_dir', self.tmpdir, + group='polling') + hb = manager.AgentHeartBeatManager(0, self.conf, + namespaces='central', + queue=self.queue) + timestamp = timeutils.utcnow().isoformat() + self.queue.put_nowait({'timestamp': timestamp, + 'pollster': 'test1'}) + hb._update_status() + self.queue.put_nowait({'timestamp': timestamp, + 'pollster': 'test2'}) + hb._update_status() + + # test status report + hb._send_heartbeat() + calls = [mock.call("Heartbeat status report requested " + f"at {self.tmpdir}/ceilometer-central.socket"), + mock.call("Reported heartbeat status:\n" + f"test1 {timestamp}\n" + f"test2 {timestamp}")] + LOG.debug.assert_has_calls(calls) diff --git a/ceilometer/tests/unit/polling/test_manager.py b/ceilometer/tests/unit/polling/test_manager.py index 564143774e..0a2c75febd 100644 --- a/ceilometer/tests/unit/polling/test_manager.py +++ b/ceilometer/tests/unit/polling/test_manager.py @@ -18,6 +18,7 @@ """Tests for ceilometer agent manager""" import copy import datetime +import multiprocessing import shutil import tempfile from unittest import mock @@ -92,7 +93,8 @@ class TestManager(base.BaseTestCase): self.assertNotEqual(manager.hash_of_set(y), manager.hash_of_set(z)) def test_load_plugins(self): - mgr = manager.AgentManager(0, self.conf) + mgr = manager.AgentManager(0, self.conf, + queue=multiprocessing.Queue()) self.assertIsNotNone(list(mgr.extensions)) # Test plugin load behavior based on Node Manager pollsters. @@ -101,8 +103,8 @@ class TestManager(base.BaseTestCase): @mock.patch('ceilometer.ipmi.pollsters.sensor.SensorPollster.__init__', mock.Mock(return_value=None)) def test_load_normal_plugins(self): - mgr = manager.AgentManager(0, self.conf, - namespaces=['ipmi']) + mgr = manager.AgentManager(0, self.conf, namespaces=['ipmi'], + queue=multiprocessing.Queue()) # 8 pollsters for Node Manager self.assertEqual(13, len(mgr.extensions)) @@ -114,7 +116,8 @@ class TestManager(base.BaseTestCase): def test_load_failed_plugins(self, LOG): # Here we additionally check that namespaces will be converted to the # list if param was not set as a list. - manager.AgentManager(0, self.conf, namespaces='ipmi') + manager.AgentManager(0, self.conf, namespaces='ipmi', + queue=multiprocessing.Queue()) err_msg = 'Skip loading extension for %s: %s' pollster_names = [ 'power', 'temperature', 'outlet_temperature', @@ -132,7 +135,8 @@ class TestManager(base.BaseTestCase): @mock.patch('ceilometer.polling.manager.LOG') def test_import_error_in_plugin(self, LOG): namespaces = ['ipmi'] - manager.AgentManager(0, self.conf, namespaces=namespaces) + manager.AgentManager(0, self.conf, namespaces=namespaces, + queue=multiprocessing.Queue()) LOG.warning.assert_called_with( 'No valid pollsters can be loaded from %s namespaces', namespaces) @@ -282,7 +286,8 @@ class BaseAgent(base.BaseTestCase): self.mgr.polling_manager = manager.PollingManager(self.CONF) def create_manager(self): - return manager.AgentManager(0, self.CONF) + queue = multiprocessing.Queue() + return manager.AgentManager(0, self.CONF, queue=queue) def fake_notifier_sample(self, ctxt, event_type, payload): for m in payload['samples']: @@ -301,7 +306,8 @@ class BaseAgent(base.BaseTestCase): self.CONF = service.prepare_service([], []) self.CONF.set_override( 'cfg_file', - self.path_get('etc/ceilometer/polling_all.yaml'), group='polling' + self.path_get('etc/ceilometer/polling_all.yaml'), + group='polling' ) self.polling_cfg = { 'sources': [{ @@ -703,6 +709,9 @@ class TestPollingAgent(BaseAgent): mock.call('Finished polling pollster %(poll)s in the context ' 'of %(src)s', {'poll': 'test', 'src': 'test_polling'}) ]) + LOG.debug.assert_has_calls([ + mock.call('Polster heartbeat update: test') + ]) @mock.patch('ceilometer.polling.manager.LOG') def test_skip_polling_and_notify_with_no_resources(self, LOG):