Deprecate kafka publisher

Original kafka publisher has some restrictions on demand. oslo.messaging
has introduced kafka driver and make it scalable, so it's recommended to
integrate kafka driver with NotifierPublisher and push events to broker.
And this patch also implements a generic NotifierPublisher to push data
to external server with customized transport driver.

Closes-Bug: #1649947
Change-Id: I3eecd74b61a42ba1c760b53032ac6e80e85d1dda
This commit is contained in:
Hanxi Liu 2017-07-20 17:57:49 +08:00 committed by gordon chung
parent 0f3a86232b
commit af23b6eeaf
4 changed files with 99 additions and 5 deletions

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from debtcollector import removals
import kafka
from oslo_log import log
from oslo_serialization import jsonutils
@ -24,6 +25,9 @@ from ceilometer.publisher import messaging
LOG = log.getLogger(__name__)
@removals.removed_class("KafkaBrokerPublisher",
message="use NotifierPublisher instead",
removal_version='10.0')
class KafkaBrokerPublisher(messaging.MessagingPublisher):
"""Publish metering data to kafka broker.

View File

@ -186,11 +186,49 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
class NotifierPublisher(MessagingPublisher):
"""Publish metering data from notifer publisher.
The ip address and port number of notifer can be configured in
ceilometer pipeline configuration file.
User can customize the transport driver such as rabbit, kafka and
so on. The Notifer uses `sample` method as default method to send
notifications.
This publisher has transmit options such as queue, drop, and
retry. These options are specified using policy field of URL parameter.
When queue option could be selected, local queue length can be determined
using max_queue_length field as well. When the transfer fails with retry
option, try to resend the data as many times as specified in max_retry
field. If max_retry is not specified, by default the number of retry
is 100.
To enable this publisher, add the following section to the
/etc/ceilometer/pipeline.yaml file or simply add it to an existing
pipeline::
meter:
- name: meter_notifier
meters:
- "*"
sinks:
- notifier_sink
sinks:
- name: notifier_sink
transformers:
publishers:
- notifer://[notifier_ip]:[notifier_port]?topic=[topic]&
driver=driver&max_retry=100
"""
def __init__(self, conf, parsed_url, default_topic):
super(NotifierPublisher, self).__init__(conf, parsed_url)
options = urlparse.parse_qs(parsed_url.query)
topic = options.pop('topic', [default_topic])
topics = options.pop('topic', [default_topic])
driver = options.pop('driver', ['rabbit'])[0]
self.max_retry = int(options.get('max_retry', [100])[-1])
url = None
if parsed_url.netloc != '':
url = urlparse.urlunsplit([driver, parsed_url.netloc,
@ -201,7 +239,7 @@ class NotifierPublisher(MessagingPublisher):
messaging.get_transport(self.conf, url),
driver=self.conf.publisher_notifier.telemetry_driver,
publisher_id='telemetry.publisher.%s' % self.conf.host,
topics=topic,
topics=topics,
retry=self.retry
)

View File

@ -18,6 +18,8 @@ import datetime
import uuid
import mock
import oslo_messaging
from oslo_messaging._drivers import impl_kafka as kafka_driver
from oslo_utils import netutils
import testscenarios.testcase
@ -147,6 +149,42 @@ class NotifierOnlyPublisherTest(BasePublisherTestCase):
cgt.assert_called_with(self.CONF, 'amqp://foo:foo@127.0.0.1:1234/foo'
'?amqp_auto_delete=true')
@mock.patch('ceilometer.messaging.get_transport')
def test_publish_with_none_rabbit_driver(self, cgt):
sample_publisher = msg_publisher.SampleNotifierPublisher(
self.CONF,
netutils.urlsplit('notifier://127.0.0.1:9092?driver=kafka'))
cgt.assert_called_with(self.CONF, 'kafka://127.0.0.1:9092')
transport = oslo_messaging.get_transport(self.CONF,
'kafka://127.0.0.1:9092')
self.assertIsInstance(transport._driver, kafka_driver.KafkaDriver)
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(sample_publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
msg_publisher.DeliveryFailure,
sample_publisher.publish_samples,
self.test_sample_data)
self.assertEqual(0, len(sample_publisher.local_queue))
self.assertEqual(100, len(fake_send.mock_calls))
fake_send.assert_called_with('metering', mock.ANY)
event_publisher = msg_publisher.EventNotifierPublisher(
self.CONF,
netutils.urlsplit('notifier://127.0.0.1:9092?driver=kafka'))
cgt.assert_called_with(self.CONF, 'kafka://127.0.0.1:9092')
with mock.patch.object(event_publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
msg_publisher.DeliveryFailure,
event_publisher.publish_events,
self.test_event_data)
self.assertEqual(0, len(event_publisher.local_queue))
self.assertEqual(100, len(fake_send.mock_calls))
fake_send.assert_called_with('event', mock.ANY)
class TestPublisher(testscenarios.testcase.WithScenarios,
BasePublisherTestCase):
@ -186,7 +224,8 @@ class TestPublisherPolicy(TestPublisher):
self.assertTrue(mylog.info.called)
self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with(
self.assertEqual(100, len(fake_send.mock_calls))
fake_send.assert_called_with(
self.topic, mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG')
@ -203,7 +242,8 @@ class TestPublisherPolicy(TestPublisher):
self.test_data)
self.assertTrue(mylog.info.called)
self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with(
self.assertEqual(100, len(fake_send.mock_calls))
fake_send.assert_called_with(
self.topic, mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG')
@ -221,7 +261,8 @@ class TestPublisherPolicy(TestPublisher):
self.assertTrue(mylog.warning.called)
self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with(
self.assertEqual(100, len(fake_send.mock_calls))
fake_send.assert_called_with(
self.topic, mock.ANY)

View File

@ -0,0 +1,11 @@
---
features:
- |
Ceilometer supports generic notifier to publish data and allow user to
customize parameters such as topic, transport driver and priority. The
publisher configuration in pipeline.yaml can be
notifer://[notifier_ip]:[notifier_port]?topic=[topic]&driver=driver&max_retry=100
Not only rabbit driver, but also other driver like kafka can be used.
deprecations:
- |
Kafka publisher is deprecated to use generic notifier instead.