Adds a Zaqar publisher

This adds a new publisher supporting samples and events which can be
used in the pipelines.

Change-Id: I5f95228c38656522a14b20370e2cfd67cb911f80
This commit is contained in:
Thomas Herve 2017-04-25 15:23:20 +02:00
parent 7290b4ec8c
commit c1c56d6aae
7 changed files with 219 additions and 3 deletions

View File

@ -23,7 +23,7 @@ DEFAULT_GROUP = "service_credentials"
# List of group that can set auth_section to use a different
# credentials section
OVERRIDABLE_GROUPS = ['dispatcher_gnocchi']
OVERRIDABLE_GROUPS = ['dispatcher_gnocchi', 'zaqar']
def get_session(conf, requests_session=None, group=None, timeout=None):

View File

@ -0,0 +1,83 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves.urllib import parse as urlparse
from ceilometer import keystone_client
from ceilometer import publisher
from zaqarclient.queues.v2 import client as zaqarclient
DEFAULT_TTL = 3600
class ZaqarPublisher(publisher.ConfigPublisherBase):
"""Publish metering data to a Zaqar queue.
The target queue name must be configured in the ceilometer pipeline
configuration file. The TTL can also optionally be specified as a query
argument::
meter:
- name: meter_zaqar
interval: 600
counters:
- "*"
transformers:
sinks:
- zaqar_sink
sinks:
- name: zaqar_sink
transformers:
publishers:
- zaqar://?queue=meter_queue&ttl=1200
The credentials to access Zaqar must be set in the [zaqar] section in the
configuration.
"""
def __init__(self, conf, parsed_url):
super(ZaqarPublisher, self).__init__(conf, parsed_url)
options = urlparse.parse_qs(parsed_url.query)
self.queue_name = options.get('queue', [None])[0]
if not self.queue_name:
raise ValueError('Must specify a queue in the zaqar publisher')
self.ttl = int(options.pop('ttl', [DEFAULT_TTL])[0])
self._client = None
@property
def client(self):
if self._client is None:
session = keystone_client.get_session(
self.conf, group=self.conf.zaqar.auth_section)
self._client = zaqarclient.Client(session=session)
return self._client
def publish_samples(self, samples):
"""Send a metering message for publishing
:param samples: Samples from pipeline after transformation
"""
queue = self.client.queue(self.queue_name)
messages = [{'body': sample.as_dict(), 'ttl': self.ttl}
for sample in samples]
queue.post(messages)
def publish_events(self, events):
"""Send an event message for publishing
:param events: events from pipeline after transformation
"""
queue = self.client.queue(self.queue_name)
messages = [{'body': event.serialize(), 'ttl': self.ttl}
for event in events]
queue.post(messages)

View File

@ -0,0 +1,126 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
from oslotest import base
from six.moves.urllib import parse as urlparse
import uuid
from ceilometer.event.storage import models as event
from ceilometer.publisher import zaqar
from ceilometer import sample
from ceilometer import service
class TestZaqarPublisher(base.BaseTestCase):
resource_id = str(uuid.uuid4())
sample_data = [
sample.Sample(
name='alpha',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
sample.Sample(
name='beta',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
sample.Sample(
name='gamma',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=datetime.datetime.now().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
]
event_data = [event.Event(
message_id=str(uuid.uuid4()), event_type='event_%d' % i,
generated=datetime.datetime.utcnow().isoformat(),
traits=[], raw={'payload': {'some': 'aa'}}) for i in range(3)]
def setUp(self):
super(TestZaqarPublisher, self).setUp()
self.CONF = service.prepare_service([], [])
def test_zaqar_publisher_config(self):
"""Test publisher config parameters."""
parsed_url = urlparse.urlparse('zaqar://')
self.assertRaises(ValueError, zaqar.ZaqarPublisher,
self.CONF, parsed_url)
parsed_url = urlparse.urlparse('zaqar://?queue=foo&ttl=bar')
self.assertRaises(ValueError, zaqar.ZaqarPublisher,
self.CONF, parsed_url)
parsed_url = urlparse.urlparse('zaqar://?queue=foo&ttl=60')
publisher = zaqar.ZaqarPublisher(self.CONF, parsed_url)
self.assertEqual(60, publisher.ttl)
parsed_url = urlparse.urlparse('zaqar://?queue=foo')
publisher = zaqar.ZaqarPublisher(self.CONF, parsed_url)
self.assertEqual(3600, publisher.ttl)
self.assertEqual('foo', publisher.queue_name)
@mock.patch('zaqarclient.queues.v2.queues.Queue')
def test_zaqar_post_samples(self, mock_queue):
"""Test publisher post."""
parsed_url = urlparse.urlparse('zaqar://?queue=foo')
publisher = zaqar.ZaqarPublisher(self.CONF, parsed_url)
mock_post = mock.Mock()
mock_queue.return_value = mock_post
publisher.publish_samples(self.sample_data)
mock_queue.assert_called_once_with(mock.ANY, 'foo')
self.assertEqual(
3, len(mock_post.post.call_args_list[0][0][0]))
self.assertEqual(
mock_post.post.call_args_list[0][0][0][0]['body'],
self.sample_data[0].as_dict())
@mock.patch('zaqarclient.queues.v2.queues.Queue')
def test_zaqar_post_events(self, mock_queue):
"""Test publisher post."""
parsed_url = urlparse.urlparse('zaqar://?queue=foo')
publisher = zaqar.ZaqarPublisher(self.CONF, parsed_url)
mock_post = mock.Mock()
mock_queue.return_value = mock_post
publisher.publish_events(self.event_data)
mock_queue.assert_called_once_with(mock.ANY, 'foo')
self.assertEqual(
3, len(mock_post.post.call_args_list[0][0][0]))
self.assertEqual(
mock_post.post.call_args_list[0][0][0][0]['body'],
self.event_data[0].serialize())

View File

@ -0,0 +1,3 @@
---
features:
- Add a new publisher for pushing samples or events to a Zaqar queue.

View File

@ -39,6 +39,8 @@ postgresql =
psycopg2>=2.5 # LGPL/ZPL
mysql =
PyMySQL>=0.6.2 # MIT License
zaqar =
python-zaqarclient>=1.0.0 # Apache-2.0
[entry_points]
ceilometer.notification =
@ -236,6 +238,7 @@ ceilometer.sample.publisher =
database = ceilometer.publisher.direct:DirectPublisher
file_alt = ceilometer.publisher.direct:DirectPublisher
http_alt = ceilometer.publisher.direct:DirectPublisher
zaqar = ceilometer.publisher.zaqar:ZaqarPublisher
ceilometer.event.publisher =
test = ceilometer.publisher.test:TestPublisher
@ -248,6 +251,7 @@ ceilometer.event.publisher =
database = ceilometer.publisher.direct:DirectPublisher
file_alt = ceilometer.publisher.direct:DirectPublisher
http_alt = ceilometer.publisher.direct:DirectPublisher
zaqar = ceilometer.publisher.zaqar:ZaqarPublisher
ceilometer.event.trait_plugin =
split = ceilometer.event.trait_plugins:SplitterTraitPlugin

View File

@ -28,4 +28,4 @@ os-testr>=0.4.1 # Apache-2.0
tempest>=14.0.0 # Apache-2.0
WebTest>=2.0 # MIT
pifpaf>=0.0.11 # Apache-2.0
os-api-ref>=0.1.0 # Apache-2.0s
os-api-ref>=0.1.0 # Apache-2.0

View File

@ -4,7 +4,7 @@ skipsdist = True
envlist = py{27,35},{debug,py,py27,py35}-{mongodb,mysql,postgresql,functional},pep8
[testenv]
deps = .[mongo,mysql,postgresql,gnocchi]
deps = .[mongo,mysql,postgresql,gnocchi,zaqar]
-r{toxinidir}/test-requirements.txt
# NOTE(tonyb): This project has chosen to *NOT* consume upper-constraints.txt
install_command = pip install -U {opts} {packages}