diff --git a/ceilometer/keystone_client.py b/ceilometer/keystone_client.py index 6b36f2d00d..f6b7559b8b 100644 --- a/ceilometer/keystone_client.py +++ b/ceilometer/keystone_client.py @@ -23,7 +23,7 @@ DEFAULT_GROUP = "service_credentials" # List of group that can set auth_section to use a different # credentials section -OVERRIDABLE_GROUPS = ['dispatcher_gnocchi'] +OVERRIDABLE_GROUPS = ['dispatcher_gnocchi', 'zaqar'] def get_session(conf, requests_session=None, group=None, timeout=None): diff --git a/ceilometer/publisher/zaqar.py b/ceilometer/publisher/zaqar.py new file mode 100644 index 0000000000..aee72c5cc1 --- /dev/null +++ b/ceilometer/publisher/zaqar.py @@ -0,0 +1,83 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from six.moves.urllib import parse as urlparse + +from ceilometer import keystone_client +from ceilometer import publisher + +from zaqarclient.queues.v2 import client as zaqarclient + +DEFAULT_TTL = 3600 + + +class ZaqarPublisher(publisher.ConfigPublisherBase): + """Publish metering data to a Zaqar queue. + + The target queue name must be configured in the ceilometer pipeline + configuration file. The TTL can also optionally be specified as a query + argument:: + + meter: + - name: meter_zaqar + interval: 600 + counters: + - "*" + transformers: + sinks: + - zaqar_sink + sinks: + - name: zaqar_sink + transformers: + publishers: + - zaqar://?queue=meter_queue&ttl=1200 + + The credentials to access Zaqar must be set in the [zaqar] section in the + configuration. + """ + def __init__(self, conf, parsed_url): + super(ZaqarPublisher, self).__init__(conf, parsed_url) + options = urlparse.parse_qs(parsed_url.query) + self.queue_name = options.get('queue', [None])[0] + if not self.queue_name: + raise ValueError('Must specify a queue in the zaqar publisher') + self.ttl = int(options.pop('ttl', [DEFAULT_TTL])[0]) + self._client = None + + @property + def client(self): + if self._client is None: + session = keystone_client.get_session( + self.conf, group=self.conf.zaqar.auth_section) + self._client = zaqarclient.Client(session=session) + return self._client + + def publish_samples(self, samples): + """Send a metering message for publishing + + :param samples: Samples from pipeline after transformation + """ + queue = self.client.queue(self.queue_name) + messages = [{'body': sample.as_dict(), 'ttl': self.ttl} + for sample in samples] + queue.post(messages) + + def publish_events(self, events): + """Send an event message for publishing + + :param events: events from pipeline after transformation + """ + queue = self.client.queue(self.queue_name) + messages = [{'body': event.serialize(), 'ttl': self.ttl} + for event in events] + queue.post(messages) diff --git a/ceilometer/tests/unit/publisher/test_zaqar.py b/ceilometer/tests/unit/publisher/test_zaqar.py new file mode 100644 index 0000000000..88a9dc10e0 --- /dev/null +++ b/ceilometer/tests/unit/publisher/test_zaqar.py @@ -0,0 +1,126 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import mock +from oslotest import base +from six.moves.urllib import parse as urlparse +import uuid + +from ceilometer.event.storage import models as event +from ceilometer.publisher import zaqar +from ceilometer import sample +from ceilometer import service + + +class TestZaqarPublisher(base.BaseTestCase): + + resource_id = str(uuid.uuid4()) + + sample_data = [ + sample.Sample( + name='alpha', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id=resource_id, + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='beta', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id=resource_id, + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='gamma', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id=resource_id, + timestamp=datetime.datetime.now().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + ] + + event_data = [event.Event( + message_id=str(uuid.uuid4()), event_type='event_%d' % i, + generated=datetime.datetime.utcnow().isoformat(), + traits=[], raw={'payload': {'some': 'aa'}}) for i in range(3)] + + def setUp(self): + super(TestZaqarPublisher, self).setUp() + self.CONF = service.prepare_service([], []) + + def test_zaqar_publisher_config(self): + """Test publisher config parameters.""" + parsed_url = urlparse.urlparse('zaqar://') + self.assertRaises(ValueError, zaqar.ZaqarPublisher, + self.CONF, parsed_url) + + parsed_url = urlparse.urlparse('zaqar://?queue=foo&ttl=bar') + self.assertRaises(ValueError, zaqar.ZaqarPublisher, + self.CONF, parsed_url) + + parsed_url = urlparse.urlparse('zaqar://?queue=foo&ttl=60') + publisher = zaqar.ZaqarPublisher(self.CONF, parsed_url) + self.assertEqual(60, publisher.ttl) + + parsed_url = urlparse.urlparse('zaqar://?queue=foo') + publisher = zaqar.ZaqarPublisher(self.CONF, parsed_url) + self.assertEqual(3600, publisher.ttl) + self.assertEqual('foo', publisher.queue_name) + + @mock.patch('zaqarclient.queues.v2.queues.Queue') + def test_zaqar_post_samples(self, mock_queue): + """Test publisher post.""" + parsed_url = urlparse.urlparse('zaqar://?queue=foo') + publisher = zaqar.ZaqarPublisher(self.CONF, parsed_url) + mock_post = mock.Mock() + mock_queue.return_value = mock_post + + publisher.publish_samples(self.sample_data) + + mock_queue.assert_called_once_with(mock.ANY, 'foo') + self.assertEqual( + 3, len(mock_post.post.call_args_list[0][0][0])) + self.assertEqual( + mock_post.post.call_args_list[0][0][0][0]['body'], + self.sample_data[0].as_dict()) + + @mock.patch('zaqarclient.queues.v2.queues.Queue') + def test_zaqar_post_events(self, mock_queue): + """Test publisher post.""" + parsed_url = urlparse.urlparse('zaqar://?queue=foo') + publisher = zaqar.ZaqarPublisher(self.CONF, parsed_url) + mock_post = mock.Mock() + mock_queue.return_value = mock_post + + publisher.publish_events(self.event_data) + + mock_queue.assert_called_once_with(mock.ANY, 'foo') + self.assertEqual( + 3, len(mock_post.post.call_args_list[0][0][0])) + self.assertEqual( + mock_post.post.call_args_list[0][0][0][0]['body'], + self.event_data[0].serialize()) diff --git a/releasenotes/notes/zaqar-publisher-f7efa030b71731f4.yaml b/releasenotes/notes/zaqar-publisher-f7efa030b71731f4.yaml new file mode 100644 index 0000000000..eb390b4937 --- /dev/null +++ b/releasenotes/notes/zaqar-publisher-f7efa030b71731f4.yaml @@ -0,0 +1,3 @@ +--- +features: + - Add a new publisher for pushing samples or events to a Zaqar queue. diff --git a/setup.cfg b/setup.cfg index 5403c242c0..da188c47a7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -39,6 +39,8 @@ postgresql = psycopg2>=2.5 # LGPL/ZPL mysql = PyMySQL>=0.6.2 # MIT License +zaqar = + python-zaqarclient>=1.0.0 # Apache-2.0 [entry_points] ceilometer.notification = @@ -236,6 +238,7 @@ ceilometer.sample.publisher = database = ceilometer.publisher.direct:DirectPublisher file_alt = ceilometer.publisher.direct:DirectPublisher http_alt = ceilometer.publisher.direct:DirectPublisher + zaqar = ceilometer.publisher.zaqar:ZaqarPublisher ceilometer.event.publisher = test = ceilometer.publisher.test:TestPublisher @@ -248,6 +251,7 @@ ceilometer.event.publisher = database = ceilometer.publisher.direct:DirectPublisher file_alt = ceilometer.publisher.direct:DirectPublisher http_alt = ceilometer.publisher.direct:DirectPublisher + zaqar = ceilometer.publisher.zaqar:ZaqarPublisher ceilometer.event.trait_plugin = split = ceilometer.event.trait_plugins:SplitterTraitPlugin diff --git a/test-requirements.txt b/test-requirements.txt index 11c46e4394..f7f93a27a1 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -28,4 +28,4 @@ os-testr>=0.4.1 # Apache-2.0 tempest>=14.0.0 # Apache-2.0 WebTest>=2.0 # MIT pifpaf>=0.0.11 # Apache-2.0 -os-api-ref>=0.1.0 # Apache-2.0s +os-api-ref>=0.1.0 # Apache-2.0 diff --git a/tox.ini b/tox.ini index 9abbb85790..b363d9e862 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ skipsdist = True envlist = py{27,35},{debug,py,py27,py35}-{mongodb,mysql,postgresql,functional},pep8 [testenv] -deps = .[mongo,mysql,postgresql,gnocchi] +deps = .[mongo,mysql,postgresql,gnocchi,zaqar] -r{toxinidir}/test-requirements.txt # NOTE(tonyb): This project has chosen to *NOT* consume upper-constraints.txt install_command = pip install -U {opts} {packages}