diff --git a/charms/cinder-ceph-k8s/config.yaml b/charms/cinder-ceph-k8s/config.yaml index e1254b13..ce1469e4 100644 --- a/charms/cinder-ceph-k8s/config.yaml +++ b/charms/cinder-ceph-k8s/config.yaml @@ -1,5 +1,236 @@ options: - debug: + ceph-osd-replication-count: + default: 3 + type: int + description: | + This value dictates the number of replicas ceph must make of any + object it stores within the cinder rbd pool. Of course, this only + applies if using Ceph as a backend store. Note that once the cinder + rbd pool has been created, changing this value will not have any + effect (although it can be changed in ceph by manually configuring + your ceph cluster). + ceph-pool-weight: + type: int + default: 40 + description: | + Defines a relative weighting of the pool as a percentage of the total + amount of data in the Ceph cluster. This effectively weights the number + of placement groups for the pool created to be appropriately portioned + to the amount of data expected. For example, if the ephemeral volumes + for the OpenStack compute instances are expected to take up 20% of the + overall configuration then this value would be specified as 20. Note - + it is important to choose an appropriate value for the pool weight as + this directly affects the number of placement groups which will be + created for the pool. The number of placement groups for a pool can + only be increased, never decreased - so it is important to identify the + percent of data that will likely reside in the pool. + volume-backend-name: + default: + type: string + description: | + Volume backend name for the backend. The default value is the + application name in the Juju model, e.g. "cinder-ceph-mybackend" + if it's deployed as `juju deploy cinder-ceph cinder-ceph-mybackend`. + A common backend name can be set to multiple backends with the + same characters so that those can be treated as a single virtual + backend associated with a single volume type. + backend-availability-zone: + default: + type: string + description: | + Availability zone name of this volume backend. If set, it will + override the default availability zone. Supported for Pike or + newer releases. + use-syslog: + type: boolean default: False - description: Enable debug logging. - type: boolean \ No newline at end of file + description: | + Setting this to True will configure services to log to syslog. + restrict-ceph-pools: + default: False + type: boolean + description: | + Optionally restrict Ceph key permissions to access pools as required. + rbd-pool-name: + default: + type: string + description: | + Optionally specify an existing rbd pool that cinder should map to. + rbd-flatten-volume-from-snapshot: + default: + type: boolean + default: False + description: | + Flatten volumes created from snapshots to remove dependency from + volume to snapshot. Supported on Queens+ + rbd-mirroring-mode: + type: string + default: pool + description: | + The RBD mirroring mode used for the Ceph pool. This option is only used + with 'replicated' pool type, as it's not supported for 'erasure-coded' + pool type - valid values: 'pool' and 'image' + pool-type: + type: string + default: replicated + description: | + Ceph pool type to use for storage - valid values include ‘replicated’ + and ‘erasure-coded’. + ec-profile-name: + type: string + default: + description: | + Name for the EC profile to be created for the EC pools. If not defined + a profile name will be generated based on the name of the pool used by + the application. + ec-rbd-metadata-pool: + type: string + default: + description: | + Name of the metadata pool to be created (for RBD use-cases). If not + defined a metadata pool name will be generated based on the name of + the data pool used by the application. The metadata pool is always + replicated, not erasure coded. + ec-profile-k: + type: int + default: 1 + description: | + Number of data chunks that will be used for EC data pool. K+M factors + should never be greater than the number of available zones (or hosts) + for balancing. + ec-profile-m: + type: int + default: 2 + description: | + Number of coding chunks that will be used for EC data pool. K+M factors + should never be greater than the number of available zones (or hosts) + for balancing. + ec-profile-locality: + type: int + default: + description: | + (lrc plugin - l) Group the coding and data chunks into sets of size l. + For instance, for k=4 and m=2, when l=3 two groups of three are created. + Each set can be recovered without reading chunks from another set. Note + that using the lrc plugin does incur more raw storage usage than isa or + jerasure in order to reduce the cost of recovery operations. + ec-profile-crush-locality: + type: string + default: + description: | + (lrc plugin) The type of the crush bucket in which each set of chunks + defined by l will be stored. For instance, if it is set to rack, each + group of l chunks will be placed in a different rack. It is used to + create a CRUSH rule step such as step choose rack. If it is not set, + no such grouping is done. + ec-profile-durability-estimator: + type: int + default: + description: | + (shec plugin - c) The number of parity chunks each of which includes + each data chunk in its calculation range. The number is used as a + durability estimator. For instance, if c=2, 2 OSDs can be down + without losing data. + ec-profile-helper-chunks: + type: int + default: + description: | + (clay plugin - d) Number of OSDs requested to send data during + recovery of a single chunk. d needs to be chosen such that + k+1 <= d <= k+m-1. Larger the d, the better the savings. + ec-profile-scalar-mds: + type: string + default: + description: | + (clay plugin) specifies the plugin that is used as a building + block in the layered construction. It can be one of jerasure, + isa, shec (defaults to jerasure). + ec-profile-plugin: + type: string + default: jerasure + description: | + EC plugin to use for this applications pool. The following list of + plugins acceptable - jerasure, lrc, isa, shec, clay. + ec-profile-technique: + type: string + default: + description: | + EC profile technique used for this applications pool - will be + validated based on the plugin configured via ec-profile-plugin. + Supported techniques are ‘reed_sol_van’, ‘reed_sol_r6_op’, + ‘cauchy_orig’, ‘cauchy_good’, ‘liber8tion’ for jerasure, + ‘reed_sol_van’, ‘cauchy’ for isa and ‘single’, ‘multiple’ + for shec. + ec-profile-device-class: + type: string + default: + description: | + Device class from CRUSH map to use for placement groups for + erasure profile - valid values: ssd, hdd or nvme (or leave + unset to not use a device class). + bluestore-compression-algorithm: + type: string + default: + description: | + Compressor to use (if any) for pools requested by this charm. + . + NOTE: The ceph-osd charm sets a global default for this value (defaults + to 'lz4' unless configured by the end user) which will be used unless + specified for individual pools. + bluestore-compression-mode: + type: string + default: + description: | + Policy for using compression on pools requested by this charm. + . + 'none' means never use compression. + 'passive' means use compression when clients hint that data is + compressible. + 'aggressive' means use compression unless clients hint that + data is not compressible. + 'force' means use compression under all circumstances even if the clients + hint that the data is not compressible. + bluestore-compression-required-ratio: + type: float + default: + description: | + The ratio of the size of the data chunk after compression relative to the + original size must be at least this small in order to store the + compressed version on pools requested by this charm. + bluestore-compression-min-blob-size: + type: int + default: + description: | + Chunks smaller than this are never compressed on pools requested by + this charm. + bluestore-compression-min-blob-size-hdd: + type: int + default: + description: | + Value of bluestore compression min blob size for rotational media on + pools requested by this charm. + bluestore-compression-min-blob-size-ssd: + type: int + default: + description: | + Value of bluestore compression min blob size for solid state media on + pools requested by this charm. + bluestore-compression-max-blob-size: + type: int + default: + description: | + Chunks larger than this are broken into smaller blobs sizing bluestore + compression max blob size before being compressed on pools requested by + this charm. + bluestore-compression-max-blob-size-hdd: + type: int + default: + description: | + Value of bluestore compression max blob size for rotational media on + pools requested by this charm. + bluestore-compression-max-blob-size-ssd: + type: int + default: + description: | + Value of bluestore compression max blob size for solid state media on + pools requested by this charm. diff --git a/charms/cinder-ceph-k8s/lib/charms/ceph/v0/ceph_client.py b/charms/cinder-ceph-k8s/lib/charms/ceph/v0/ceph_client.py new file mode 100755 index 00000000..04b5797d --- /dev/null +++ b/charms/cinder-ceph-k8s/lib/charms/ceph/v0/ceph_client.py @@ -0,0 +1,522 @@ +#!/usr/bin/env python3 + +import logging +import json +import sys +sys.path.append('lib') # noqa + +import charmhelpers.contrib.storage.linux.ceph as ch_ceph +import charmhelpers.contrib.network.ip as ch_ip + +from ops.framework import ( + StoredState, + EventBase, + ObjectEvents, + EventSource, + Object +) + +logger = logging.getLogger(__name__) + + +class BrokerAvailableEvent(EventBase): + pass + + +class PoolAvailableEvent(EventBase): + pass + + +class CephClientEvents(ObjectEvents): + broker_available = EventSource(BrokerAvailableEvent) + pools_available = EventSource(PoolAvailableEvent) + + +class CephClientRequires(Object): + + on = CephClientEvents() + _stored = StoredState() + + def __init__(self, charm, relation_name): + super().__init__(charm, relation_name) + self.name = relation_name + self.this_unit = self.model.unit + self.relation_name = relation_name + self._stored.set_default( + pools_available=False, + broker_available=False, + broker_req={}) + self.framework.observe( + charm.on[relation_name].relation_joined, + self.on_joined) + self.framework.observe( + charm.on[relation_name].relation_changed, + self.on_changed) + self.new_request = ch_ceph.CephBrokerRq() + self.previous_requests = self.get_previous_requests_from_relations() + + def on_joined(self, event): + relation = self.model.relations[self.relation_name] + if relation: + logging.info("emiting broker_available") + self._stored.broker_available = True + self.on.broker_available.emit() + + def request_osd_settings(self, settings): + for relation in self.model.relations[self.relation_name]: + relation.data[self.model.unit]['osd-settings'] = json.dumps( + settings, + sort_keys=True) + + @property + def pools_available(self): + return self._stored.pools_available + + @property + def broker_available(self): + return self._stored.broker_available + + def mon_hosts(self, mon_ips): + """List of all monitor host public addresses""" + hosts = [] + for ceph_addrs in mon_ips: + # NOTE(jamespage): This looks odd but deals with + # use with ceph-proxy which + # presents all monitors in + # a single space delimited field. + for addr in ceph_addrs.split(' '): + hosts.append(ch_ip.format_ipv6_addr(addr) or addr) + hosts.sort() + return hosts + + def get_relation_data(self): + data = {} + mon_ips = [] + for relation in self.model.relations[self.relation_name]: + for unit in relation.units: + _data = { + 'key': relation.data[unit].get('key'), + 'auth': relation.data[unit].get('auth')} + mon_ip = relation.data[unit].get('ceph-public-address') + if mon_ip: + mon_ips.append(mon_ip) + if all(_data.values()): + data = _data + if data: + data['mon_hosts'] = self.mon_hosts(mon_ips) + return data + + def existing_request_complete(self): + rq = self.get_existing_request() + if rq and self.is_request_complete(rq, + self.model.relations[self.name]): + return True + return False + + def on_changed(self, event): + logging.info("ceph client on_changed") + relation_data = self.get_relation_data() + if relation_data: + if self.existing_request_complete(): + logging.info("emiting pools available") + self._stored.pools_available = True + self.on.pools_available.emit() + else: + logging.info("incomplete request. broker_req not found") + + def get_broker_rsp_key(self): + return 'broker-rsp-{}'.format(self.this_unit.name.replace('/', '-')) + + def get_existing_request(self): + logging.info("get_existing_request") + # json.dumps of the CephBrokerRq() + rq = ch_ceph.CephBrokerRq() + + if self._stored.broker_req: + try: + j = json.loads(self._stored.broker_req) + logging.info( + "Json request: {}".format(self._stored.broker_req)) + rq.set_ops(j['ops']) + except ValueError as err: + logging.info( + "Unable to decode broker_req: %s. Error %s", + self._stored.broker_req, + err) + return rq + + def _handle_broker_request(self, request_method, **kwargs): + """Handle a broker request + + Add a ceph broker request using `request_method` and the provided + `kwargs`. + + :param request_method: ch_ceph.CephBrokerRq method name to use for + request. + :type request_method,: str + """ + relations = self.model.relations[self.name] + logging.info("%s: %s", request_method, relations) + if not relations: + return + rq = self.new_request + logging.info("Adding %s request", request_method) + getattr(rq, request_method)(**kwargs) + logging.info("Storing request") + self._stored.broker_req = rq.request + logging.info("Calling send_request_if_needed") + self.send_request_if_needed(rq, relations) + + def _handle_pool_create_broker_request(self, request_method, **kwargs): + """Process request to create a pool. + + :param request_method: ch_ceph.CephBrokerRq method name to use for + request. + :type request_method: str + :param app_name: Tag pool with application name. Note that there is + certain protocols emerging upstream with regard to + meaningful application names to use. + Examples are 'rbd' and 'rgw'. + :type app_name: Optional[str] + :param compression_algorithm: Compressor to use, one of: + ('lz4', 'snappy', 'zlib', 'zstd') + :type compression_algorithm: Optional[str] + :param compression_mode: When to compress data, one of: + ('none', 'passive', 'aggressive', 'force') + :type compression_mode: Optional[str] + :param compression_required_ratio: Minimum compression ratio for data + chunk, if the requested ratio is not + achieved the compressed version will + be thrown away and the original + stored. + :type compression_required_ratio: Optional[float] + :param compression_min_blob_size: Chunks smaller than this are never + compressed (unit: bytes). + :type compression_min_blob_size: Optional[int] + :param compression_min_blob_size_hdd: Chunks smaller than this are not + compressed when destined to + rotational media (unit: bytes). + :type compression_min_blob_size_hdd: Optional[int] + :param compression_min_blob_size_ssd: Chunks smaller than this are not + compressed when destined to flash + media (unit: bytes). + :type compression_min_blob_size_ssd: Optional[int] + :param compression_max_blob_size: Chunks larger than this are broken + into N * compression_max_blob_size + chunks before being compressed + (unit: bytes). + :type compression_max_blob_size: Optional[int] + :param compression_max_blob_size_hdd: Chunks larger than this are + broken into + N * compression_max_blob_size_hdd + chunks before being compressed + when destined for rotational + media (unit: bytes) + :type compression_max_blob_size_hdd: Optional[int] + :param compression_max_blob_size_ssd: Chunks larger than this are + broken into + N * compression_max_blob_size_ssd + chunks before being compressed + when destined for flash media + (unit: bytes). + :type compression_max_blob_size_ssd: Optional[int] + :param group: Group to add pool to + :type group: Optional[str] + :param max_bytes: Maximum bytes quota to apply + :type max_bytes: Optional[int] + :param max_objects: Maximum objects quota to apply + :type max_objects: Optional[int] + :param namespace: Group namespace + :type namespace: Optional[str] + :param weight: The percentage of data that is expected to be contained + in the pool from the total available space on the OSDs. + Used to calculate number of Placement Groups to create + for pool. + :type weight: Optional[float] + :raises: AssertionError + """ + self._handle_broker_request( + request_method, + **kwargs) + + def create_replicated_pool(self, name, replicas=3, pg_num=None, + **kwargs): + """Adds an operation to create a replicated pool. + + See docstring of `_handle_pool_create_broker_request` for additional + common pool creation arguments. + + :param name: Name of pool to create + :type name: str + :param replicas: Number of copies Ceph should keep of your data. + :type replicas: int + :param pg_num: Request specific number of Placement Groups to create + for pool. + :type pg_num: int + :raises: AssertionError if provided data is of invalid type/range + """ + self._handle_pool_create_broker_request( + 'add_op_create_replicated_pool', + name=name, + replica_count=replicas, + pg_num=pg_num, + **kwargs) + + def create_erasure_pool(self, name, erasure_profile=None, + allow_ec_overwrites=False, **kwargs): + """Adds an operation to create a erasure coded pool. + + See docstring of `_handle_pool_create_broker_request` for additional + common pool creation arguments. + + :param name: Name of pool to create + :type name: str + :param erasure_profile: Name of erasure code profile to use. If not + set the ceph-mon unit handling the broker + request will set its default value. + :type erasure_profile: str + :param allow_ec_overwrites: allow EC pools to be overriden + :type allow_ec_overwrites: bool + :raises: AssertionError if provided data is of invalid type/range + """ + self._handle_pool_create_broker_request( + 'add_op_create_erasure_pool', + name=name, + erasure_profile=erasure_profile, + allow_ec_overwrites=allow_ec_overwrites, + **kwargs) + + def create_erasure_profile(self, name, + erasure_type='jerasure', + erasure_technique=None, + k=None, m=None, + failure_domain=None, + lrc_locality=None, + shec_durability_estimator=None, + clay_helper_chunks=None, + device_class=None, + clay_scalar_mds=None, + lrc_crush_locality=None): + """Adds an operation to create a erasure coding profile. + + :param name: Name of profile to create + :type name: str + :param erasure_type: Which of the erasure coding plugins should be used + :type erasure_type: string + :param erasure_technique: EC plugin technique to use + :type erasure_technique: string + :param k: Number of data chunks + :type k: int + :param m: Number of coding chunks + :type m: int + :param lrc_locality: Group the coding and data chunks into sets of size + locality (lrc plugin) + :type lrc_locality: int + :param durability_estimator: The number of parity chuncks each of which + includes a data chunk in its calculation + range (shec plugin) + :type durability_estimator: int + :param helper_chunks: The number of helper chunks to use for recovery + operations (clay plugin) + :type: helper_chunks: int + :param failure_domain: Type of failure domain from Ceph bucket types + to be used + :type failure_domain: string + :param device_class: Device class to use for profile (ssd, hdd) + :type device_class: string + :param clay_scalar_mds: Plugin to use for CLAY layered construction + (jerasure|isa|shec) + :type clay_scaler_mds: string + :param lrc_crush_locality: Type of crush bucket in which set of chunks + defined by lrc_locality will be stored. + :type lrc_crush_locality: string + """ + self._handle_broker_request( + 'add_op_create_erasure_profile', + name=name, + erasure_type=erasure_type, + erasure_technique=erasure_technique, + k=k, m=m, + failure_domain=failure_domain, + lrc_locality=lrc_locality, + shec_durability_estimator=shec_durability_estimator, + clay_helper_chunks=clay_helper_chunks, + device_class=device_class, + clay_scalar_mds=clay_scalar_mds, + lrc_crush_locality=lrc_crush_locality + ) + + def request_ceph_permissions(self, client_name, permissions): + logging.info("request_ceph_permissions") + relations = self.model.relations[self.name] + if not relations: + return + rq = self.new_request + rq.add_op({'op': 'set-key-permissions', + 'permissions': permissions, + 'client': client_name}) + self._stored.broker_req = rq.request + # ch_ceph.send_request_if_needed(rq, relation=self.name) + self.send_request_if_needed(rq, relations) + + def request_access_to_group(self, + name, + object_prefix_permissions, + permission): + """Request access to a specific group of pools + + :param name: the group name to request access for + :type name: string + :param object_prefix_permissions: any hierarchy permissions neded + :type object_prefix_permissions: dict + :param permission: permissions for the specificed group of pools + :type permission: string + """ + logging.info("request_access_to_group") + self._handle_broker_request( + 'add_op_request_access_to_group', + name, object_prefix_permissions, permission + ) + + def get_previous_requests_from_relations(self): + """Get the previous requests. + + :returns: The previous ceph requests. + :rtype: Dict[str, ch_ceph.CephBrokerRq] + """ + requests = {} + for relation in self.model.relations[self.relation_name]: + broker_req = relation.data[self.this_unit].get('broker_req') + rid = "{}:{}".format(relation.name, relation.id) + if broker_req: + request_data = json.loads(broker_req) + request = ch_ceph.CephBrokerRq( + api_version=request_data['api-version'], + request_id=request_data['request-id']) + request.set_ops(request_data['ops']) + requests[rid] = request + return requests + + def get_request_states(self, request, relations): + """Get the existing requests and their states. + + :param request: A CephBrokerRq object + :type request: ch_ceph.CephBrokerRq + :param relations: List of relations to check for existing request. + :type relations: [ops.model.Relation, ...] + :returns: Whether request is complete. + :rtype: bool + """ + complete = [] + requests = {} + for relation in relations: + rid = "{}:{}".format(relation.name, relation.id) + complete = False + previous_request = self.previous_requests.get( + rid, + ch_ceph.CephBrokerRq()) + if request == previous_request: + sent = True + complete = self.is_request_complete_for_relation( + previous_request, + relation) + else: + sent = False + complete = False + + requests[rid] = { + 'sent': sent, + 'complete': complete, + } + + return requests + + def is_request_complete_for_relation(self, request, relation): + """Check if a given request has been completed on the given relation + + :param request: A CephBrokerRq object + :type request: ch_ceph.CephBrokerRq + :param relation: A relation to check for an existing request. + :type relation: ops.model.Relation + :returns: Whether request is complete. + :rtype: bool + """ + broker_key = self.get_broker_rsp_key() + for unit in relation.units: + if relation.data[unit].get(broker_key): + rsp = ch_ceph.CephBrokerRsp(relation.data[unit][broker_key]) + if rsp.request_id == request.request_id: + if not rsp.exit_code: + return True + else: + if relation.data[unit].get('broker_rsp'): + logging.info('No response for this unit yet') + return False + + def is_request_complete(self, request, relations): + """Check a functionally equivalent request has already been completed + + Returns True if a similair request has been completed + + :param request: A CephBrokerRq object + :type request: ch_ceph.CephBrokerRq + :param relations: List of relations to check for existing request. + :type relations: [ops.model.Relation, ...] + :returns: Whether request is complete. + :rtype: bool + """ + states = self.get_request_states(request, relations) + for rid in states.keys(): + if not states[rid]['complete']: + return False + + return True + + def is_request_sent(self, request, relations): + """Check if a functionally equivalent request has already been sent + + Returns True if a similair request has been sent + + :param request: A CephBrokerRq object + :type request: ch_ceph.CephBrokerRq + :param relations: List of relations to check for existing request. + :type relations: [ops.model.Relation, ...] + :returns: Whether equivalent request has been sent. + :rtype: bool + """ + states = self.get_request_states(request, relations) + for rid in states.keys(): + if not states[rid]['sent']: + return False + + return True + + def send_request_if_needed(self, request, relations): + """Send request if an equivalent request has not already been sent + + :param request: A CephBrokerRq object + :type request: ch_ceph.CephBrokerRq + :param relations: List of relations to check for existing request. + :type relations: [ops.model.Relation, ...] + """ + states = self.get_request_states(request, relations) + for relation in relations: + rid = "{}:{}".format(relation.name, relation.id) + if states[rid]['sent']: + logging.debug( + ('Request %s is a duplicate of the previous broker request' + ' %s. Restoring previous broker request'), + request.request_id, + self.previous_requests[rid].request_id) + # The previous request was stored at the beggining. The ops of + # the new request match that of the old. But as the new request + # was constructed broker data may have been set on the relation + # during the construction of this request. This is because the + # interface has no explicit commit method. Every op request has + # in implicit send which updates the relation data. So make + # sure # the relation data matches the data at the beggining so + # that a new request is not triggered. + request = self.previous_requests[rid] + else: + logging.debug('Sending request %s', request.request_id) + relation.data[self.this_unit]['broker_req'] = request.request diff --git a/charms/cinder-ceph-k8s/lib/charms/sunbeam_rabbitmq_operator/v0/amqp.py b/charms/cinder-ceph-k8s/lib/charms/sunbeam_rabbitmq_operator/v0/amqp.py index 1d09f4a7..df585d5a 100644 --- a/charms/cinder-ceph-k8s/lib/charms/sunbeam_rabbitmq_operator/v0/amqp.py +++ b/charms/cinder-ceph-k8s/lib/charms/sunbeam_rabbitmq_operator/v0/amqp.py @@ -1,10 +1,10 @@ -"""RabbitMQAMQPProvides and Requires module. +"""AMQPProvides and Requires module. This library contains the Requires and Provides classes for handling the amqp interface. -Import `RabbitMQAMQPRequires` in your charm, with the charm object and the +Import `AMQPRequires` in your charm, with the charm object and the relation name: - self - "amqp" @@ -14,43 +14,56 @@ Also provide two additional parameters to the charm object: - vhost Two events are also available to respond to: - - has_amqp_servers - - ready_amqp_servers + - connected + - ready + - goneaway A basic example showing the usage of this relation follows: ``` -from charms.sunbeam_rabbitmq_operator.v0.amqp import RabbitMQAMQPRequires +from charms.sunbeam_rabbitmq_operator.v0.amqp import AMQPRequires class AMQPClientCharm(CharmBase): def __init__(self, *args): super().__init__(*args) # AMQP Requires - self.amqp_requires = RabbitMQAMQPRequires( + self.amqp = AMQPRequires( self, "amqp", - username = "amqp-client", - vhost = "amqp-client-vhost" + username="myusername", + vhost="vhostname" ) self.framework.observe( - self.amqp_requires.on.has_amqp_servers, self._on_has_amqp_servers) + self.amqp.on.connected, self._on_amqp_connected) self.framework.observe( - self.amqp_requires.on.ready_amqp_servers, self._on_ready_amqp_servers) + self.amqp.on.ready, self._on_amqp_ready) + self.framework.observe( + self.amqp.on.goneaway, self._on_amqp_goneaway) - def _on_has_amqp_servers(self, event): - '''React to the AMQP relation joined. + def _on_amqp_connected(self, event): + '''React to the AMQP connected event. - The AMQP interface will use the provided username and vhost to commuicate - with the. + This event happens when n AMQP relation is added to the + model before credentials etc have been provided. ''' # Do something before the relation is complete + pass - def _on_ready_amqp_servers(self, event): - '''React to the AMQP relation joined. + def _on_amqp_ready(self, event): + '''React to the AMQP ready event. The AMQP interface will use the provided username and vhost for the request to the rabbitmq server. ''' # AMQP Relation is ready. Do something with the completed relation. + pass + + def _on_amqp_goneaway(self, event): + '''React to the AMQP goneaway event. + + This event happens when an AMQP relation is removed. + ''' + # AMQP Relation has goneaway. shutdown services or suchlike + pass ``` """ @@ -82,31 +95,37 @@ from typing import List logger = logging.getLogger(__name__) -class HasAMQPServersEvent(EventBase): - """Has AMQPServers Event.""" +class AMQPConnectedEvent(EventBase): + """AMQP connected Event.""" pass -class ReadyAMQPServersEvent(EventBase): - """Ready AMQPServers Event.""" +class AMQPReadyEvent(EventBase): + """AMQP ready for use Event.""" pass -class RabbitMQAMQPServerEvents(ObjectEvents): +class AMQPGoneAwayEvent(EventBase): + """AMQP relation has gone-away Event""" + + pass + +class AMQPServerEvents(ObjectEvents): """Events class for `on`""" - has_amqp_servers = EventSource(HasAMQPServersEvent) - ready_amqp_servers = EventSource(ReadyAMQPServersEvent) + connected = EventSource(AMQPConnectedEvent) + ready = EventSource(AMQPReadyEvent) + goneaway = EventSource(AMQPGoneAwayEvent) -class RabbitMQAMQPRequires(Object): +class AMQPRequires(Object): """ - RabbitMQAMQPRequires class + AMQPRequires class """ - on = RabbitMQAMQPServerEvents() + on = AMQPServerEvents() _stored = StoredState() def __init__(self, charm, relation_name: str, username: str, vhost: str): @@ -123,6 +142,10 @@ class RabbitMQAMQPRequires(Object): self.charm.on[relation_name].relation_changed, self._on_amqp_relation_changed, ) + self.framework.observe( + self.charm.on[relation_name].relation_departed, + self._on_amqp_relation_changed, + ) self.framework.observe( self.charm.on[relation_name].relation_broken, self._on_amqp_relation_broken, @@ -131,19 +154,19 @@ class RabbitMQAMQPRequires(Object): def _on_amqp_relation_joined(self, event): """AMQP relation joined.""" logging.debug("RabbitMQAMQPRequires on_joined") - self.on.has_amqp_servers.emit() + self.on.connected.emit() self.request_access(self.username, self.vhost) def _on_amqp_relation_changed(self, event): """AMQP relation changed.""" logging.debug("RabbitMQAMQPRequires on_changed") if self.password: - self.on.ready_amqp_servers.emit() + self.on.ready.emit() def _on_amqp_relation_broken(self, event): """AMQP relation broken.""" - # TODO clear data on the relation - logging.debug("RabbitMQAMQPRequires on_departed") + logging.debug("RabbitMQAMQPRequires on_broken") + self.on.goneaway.emit() @property def _amqp_rel(self) -> Relation: @@ -188,19 +211,19 @@ class ReadyAMQPClientsEvent(EventBase): pass -class RabbitMQAMQPClientEvents(ObjectEvents): +class AMQPClientEvents(ObjectEvents): """Events class for `on`""" has_amqp_clients = EventSource(HasAMQPClientsEvent) ready_amqp_clients = EventSource(ReadyAMQPClientsEvent) -class RabbitMQAMQPProvides(Object): +class AMQPProvides(Object): """ - RabbitMQAMQPProvides class + AMQPProvides class """ - on = RabbitMQAMQPClientEvents() + on = AMQPClientEvents() _stored = StoredState() def __init__(self, charm, relation_name): diff --git a/charms/cinder-ceph-k8s/requirements.txt b/charms/cinder-ceph-k8s/requirements.txt index a4a5aac3..b95ea7e1 100644 --- a/charms/cinder-ceph-k8s/requirements.txt +++ b/charms/cinder-ceph-k8s/requirements.txt @@ -2,3 +2,4 @@ jinja2 git+https://github.com/canonical/operator@2875e73e#egg=ops git+https://opendev.org/openstack/charm-ops-openstack#egg=ops_openstack +git+https://github.com/openstack-charmers/advanced-sunbeam-openstack#egg=advanced_sunbeam_openstack diff --git a/charms/cinder-ceph-k8s/src/charm.py b/charms/cinder-ceph-k8s/src/charm.py index 6fea2acc..6b299a11 100755 --- a/charms/cinder-ceph-k8s/src/charm.py +++ b/charms/cinder-ceph-k8s/src/charm.py @@ -9,75 +9,78 @@ import logging from ops.charm import CharmBase from ops.framework import StoredState from ops.main import main -from ops.model import ActiveStatus +from ops.model import ActiveStatus, BlockedStatus, WaitingStatus -from charms.sunbeam_rabbitmq_operator.v0.amqp import RabbitMQAMQPRequires +from charms.sunbeam_rabbitmq_operator.v0.amqp import AMQPRequires +from charms.ceph.v0.ceph_client import CephClientRequires + +from typing import List + +# NOTE: rename sometime +import advanced_sunbeam_openstack.core as core +import advanced_sunbeam_openstack.adapters as adapters logger = logging.getLogger(__name__) +CINDER_VOLUME_CONTAINER = 'cinder-volume' -class CinderCephOperatorCharm(CharmBase): - """Charm the service.""" - _stored = StoredState() - - def __init__(self, *args): - super().__init__(*args) - - self.framework.observe( - self.on.cinder_volume_pebble_ready, - self._on_cinder_volume_pebble_ready, - ) - - self.framework.observe(self.on.config_changed, self._on_config_changed) - - self._stored.set_default(amqp_ready=False) - self._stored.set_default(ceph_ready=False) - - self.amqp = RabbitMQAMQPRequires( - self, "amqp", username="cinder", vhost="openstack" - ) - self.framework.observe( - self.amqp.on.ready_amqp_servers, self._on_amqp_ready - ) - # TODO - # State modelling - # AMQP + Ceph -> +Volume +class CinderCephAdapters(adapters.OPSRelationAdapters): @property - def _pebble_cinder_volume_layer(self): - """Pebble layer for Cinder volume""" - return { - "summary": "cinder layer", - "description": "pebble configuration for cinder services", - "services": { - "cinder-volume": { - "override": "replace", - "summary": "Cinder Volume", - "command": "cinder-volume --use-syslog", - "startup": "enabled", - } - }, - } + def interface_map(self): + _map = super().interface_map + _map.update({ + 'rabbitmq': adapters.AMQPAdapter}) + return _map - def _on_cinder_volume_pebble_ready(self, event): - """Define and start a workload using the Pebble API.""" - container = event.workload - container.add_layer( - "cinder-volume", self._pebble_cinder_volume_layer, combine=True + +class CinderCephOperatorCharm(core.OSBaseOperatorCharm): + """Cinder/Ceph Operator charm""" + + # NOTE: service_name == container_name + service_name = 'cinder-volume' + + service_user = 'cinder' + service_group = 'cinder' + + cinder_conf = '/etc/cinder/cinder.conf' + + def __init__(self, framework): + super().__init__( + framework, + adapters=CinderCephAdapters(self) ) - container.autostart() - def _on_config_changed(self, _): - """Just an example to show how to deal with changed configuration""" - # TODO - # Set debug logging and restart services + def get_relation_handlers(self) -> List[core.RelationHandler]: + """Relation handlers for the service.""" + self.amqp = core.AMQPHandler( + self, "amqp", self.configure_charm + ) + return [self.amqp] + + @property + def container_configs(self) -> List[core.ContainerConfigFile]: + _cconfigs = super().container_configs + _cconfigs.extend([ + core.ContainerConfigFile( + [self.service_name], + self.cinder_conf, + self.service_user, + self.service_group + ) + ]) + + def _do_bootstrap(self): + """No-op the bootstrap method as none required""" pass - def _on_amqp_ready(self, event): - """AMQP service ready for use""" - self._stored.amqp_ready = True + + +class CinderCephVictoriaOperatorCharm(CinderCephOperatorCharm): + + openstack_relesae = 'victoria' if __name__ == "__main__": - main(CinderCephOperatorCharm) + main(CinderCephVictoriaOperatorCharm, use_juju_for_storage=True)