# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # import logging import string from neutronclient.common import utils from neutronclient.neutron import v2_0 as neutronV20 from neutronclient.openstack.common.gettextutils import _ from oslo.serialization import jsonutils def _format_network_service_params(net_svc_policy): try: return '\n'.join([jsonutils.dumps(param) for param in net_svc_policy['network_service_params']]) except (TypeError, KeyError): return '' class ListPolicyTarget(neutronV20.ListCommand): """List policy_targets that belong to a given tenant.""" resource = 'policy_target' log = logging.getLogger(__name__ + '.ListPolicyTarget') _formatters = {} list_columns = ['id', 'name', 'description', 'policy_target_group_id'] pagination_support = True sorting_support = True class ShowPolicyTarget(neutronV20.ShowCommand): """Show information of a given policy_target.""" resource = 'policy_target' log = logging.getLogger(__name__ + '.ShowPolicyTarget') class CreatePolicyTarget(neutronV20.CreateCommand): """Create a policy_target for a given tenant.""" resource = 'policy_target' log = logging.getLogger(__name__ + '.CreatePolicyTarget') def add_known_arguments(self, parser): parser.add_argument( '--description', help=_('Description of the Policy Target')) parser.add_argument( '--policy-target-group', metavar='PTG', default='', help=_('Policy Target Group uuid')) parser.add_argument( '--port', default='', help=_('Neutron Port')) parser.add_argument( 'name', metavar='NAME', help=_('Name of policy target to create')) def args2body(self, parsed_args): body = {self.resource: {}, } neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'tenant_id', 'description']) if parsed_args.policy_target_group: body[self.resource]['policy_target_group_id'] = \ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_target_group', parsed_args.policy_target_group) if parsed_args.port: body[self.resource]['port_id'] = \ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'port', parsed_args.port) return body class DeletePolicyTarget(neutronV20.DeleteCommand): """Delete a given Policy Target.""" resource = 'policy_target' log = logging.getLogger(__name__ + '.DeletePolicyTarget') class UpdatePolicyTarget(neutronV20.UpdateCommand): """Update Policy Target's information.""" resource = 'policy_target' log = logging.getLogger(__name__ + '.UpdatePolicyTarget') class ListPolicyTargetGroup(neutronV20.ListCommand): """List Policy Target Groups that belong to a given tenant.""" resource = 'policy_target_group' log = logging.getLogger(__name__ + '.ListPolicyTargetGroup') list_columns = ['id', 'name', 'description'] pagination_support = True sorting_support = True class ShowPolicyTargetGroup(neutronV20.ShowCommand): """Show information of a given Policy Target Group.""" resource = 'policy_target_group' log = logging.getLogger(__name__ + '.ShowPolicyTargetGroup') class CreatePolicyTargetGroup(neutronV20.CreateCommand): """Create a Policy Target Group for a given tenant.""" resource = 'policy_target_group' log = logging.getLogger(__name__ + '.CreatePolicyTargetGroup') def add_known_arguments(self, parser): parser.add_argument( '--description', help=_('Description of the Policy Target Group')) parser.add_argument( 'name', metavar='NAME', help=_('Name of Policy Target Group to create')) parser.add_argument( '--l2-policy', metavar='L2_POLICY', default='', help=_('L2 policy uuid')) parser.add_argument( '--provided-policy-rule-sets', type=utils.str2dict, # default={}, help=_('Dictionary of provided policy rule set uuids')) parser.add_argument( '--consumed-policy-rule-sets', type=utils.str2dict, # default={}, help=_('Dictionary of consumed policy rule set uuids')) parser.add_argument( '--network-service-policy', metavar='NETWORK_SERVICE_POLICY', default='', help=_('Network service policy uuid')) parser.add_argument( '--subnets', type=string.split, help=_('List of neutron subnet uuids')) def args2body(self, parsed_args): body = {self.resource: {}, } if parsed_args.l2_policy: body[self.resource]['l2_policy_id'] = \ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'l2_policy', parsed_args.l2_policy) if parsed_args.network_service_policy: body[self.resource]['network_service_policy_id'] = \ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'network_service_policy', parsed_args.network_service_policy) if parsed_args.provided_policy_rule_sets: for key in parsed_args.provided_policy_rule_sets.keys(): id_key = neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_rule_set', key) parsed_args.provided_policy_rule_sets[id_key] = \ parsed_args.provided_policy_rule_sets.pop(key) if parsed_args.consumed_policy_rule_sets: for key in parsed_args.consumed_policy_rule_sets.keys(): id_key = neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_rule_set', key) parsed_args.consumed_policy_rule_sets[id_key] = \ parsed_args.consumed_policy_rule_sets.pop(key) if parsed_args.subnets: for subnet in parsed_args.subnets: subnet_id = neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'subnet', subnet) parsed_args.subnets.remove(subnet) parsed_args.subnets.append(subnet_id) neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'tenant_id', 'description', 'provided_policy_rule_sets', 'subnets', 'consumed_policy_rule_sets']) return body class DeletePolicyTargetGroup(neutronV20.DeleteCommand): """Delete a given Policy Target Group.""" resource = 'policy_target_group' log = logging.getLogger(__name__ + '.DeletePolicyTargetGroup') class UpdatePolicyTargetGroup(neutronV20.UpdateCommand): """Update Policy Target Group's information.""" resource = 'policy_target_group' log = logging.getLogger(__name__ + '.UpdatePolicyTargetGroup') def add_known_arguments(self, parser): parser.add_argument( '--description', help=_('Description of the Policy Target Group')) parser.add_argument( '--l2-policy', metavar='L2_POLICY', help=_('L2 policy uuid')) parser.add_argument( '--network-service-policy', metavar='NETWORK_SERVICE_POLICY', help=_('Network Service Policy uuid')) parser.add_argument( '--provided-policy-rule-sets', type=utils.str2dict, help=_('Dictionary of provided policy rule set uuids')) parser.add_argument( '--consumed-policy-rule-sets', type=utils.str2dict, help=_('Dictionary of consumed policy rule set uuids')) parser.add_argument( '--subnets', type=string.split, help=_('Subnet for the Policy Target Group')) def args2body(self, parsed_args): body = {self.resource: {}, } if parsed_args.l2_policy: body[self.resource]['l2_policy_id'] = \ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'l2_policy', parsed_args.l2_policy) if parsed_args.network_service_policy: body[self.resource]['network_service_policy_id'] = \ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'network_service_policy', parsed_args.l2_policy) if parsed_args.provided_policy_rule_sets: for key in parsed_args.provided_policy_rule_sets.keys(): id_key = neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_rule_set', key) parsed_args.provided_policy_rule_sets[id_key] = \ parsed_args.provided_policy_rule_sets.pop(key) if parsed_args.consumed_policy_rule_sets: for key in parsed_args.consumed_policy_rule_sets.keys(): id_key = neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_rule_set', key) parsed_args.consumed_policy_rule_sets[id_key] = \ parsed_args.consumed_policy_rule_sets.pop(key) if parsed_args.subnets: for subnet in parsed_args.subnets: subnet_id = neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'subnet', subnet) parsed_args.subnets.remove(subnet) parsed_args.subnets.append(subnet_id) neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'tenant_id', 'description', 'provided_policy_rule_sets', 'subnets', 'consumed_policy_rule_sets']) return body class ListL2Policy(neutronV20.ListCommand): """List L2 Policies that belong to a given tenant.""" resource = 'l2_policy' log = logging.getLogger(__name__ + '.ListL2Policy') _formatters = {} list_columns = ['id', 'name', 'description', 'l3_policy_id'] pagination_support = True sorting_support = True class ShowL2Policy(neutronV20.ShowCommand): """Show information of a given L2 Policy.""" resource = 'l2_policy' log = logging.getLogger(__name__ + '.ShowL2Policy') class CreateL2Policy(neutronV20.CreateCommand): """Create a L2 Policy for a given tenant.""" resource = 'l2_policy' log = logging.getLogger(__name__ + '.CreateL2Policy') def add_known_arguments(self, parser): parser.add_argument( '--description', help=_('Description of the L2 Policy')) parser.add_argument( '--network', help=_('Network to map the L2 Policy')) parser.add_argument( '--l3-policy', default='', help=_('L3 Policy uuid')) parser.add_argument( 'name', metavar='NAME', help=_('Name of L2 Policy to create')) def args2body(self, parsed_args): body = {self.resource: {}, } neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'tenant_id', 'description']) if parsed_args.l3_policy: body[self.resource]['l3_policy_id'] = \ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'l3_policy', parsed_args.l3_policy) if parsed_args.network: body[self.resource]['network_id'] = \ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'network', parsed_args.network) return body class DeleteL2Policy(neutronV20.DeleteCommand): """Delete a given L2 Policy.""" resource = 'l2_policy' log = logging.getLogger(__name__ + '.DeleteL2Policy') class UpdateL2Policy(neutronV20.UpdateCommand): """Update L2 Policy's information.""" resource = 'l2_policy' log = logging.getLogger(__name__ + '.UpdateL2Policy') class ListL3Policy(neutronV20.ListCommand): """List l3_policies that belong to a given tenant.""" resource = 'l3_policy' log = logging.getLogger(__name__ + '.ListL3Policy') _formatters = {} list_columns = ['id', 'name', 'description', 'ip_pool', 'subnet_prefix_length'] pagination_support = True sorting_support = True class ShowL3Policy(neutronV20.ShowCommand): """Show information of a given L3 Policy.""" resource = 'l3_policy' log = logging.getLogger(__name__ + '.ShowL3Policy') class CreateL3Policy(neutronV20.CreateCommand): """Create a L3 Policy for a given tenant.""" resource = 'l3_policy' log = logging.getLogger(__name__ + '.CreateL3Policy') def add_known_arguments(self, parser): parser.add_argument( '--description', help=_('Description of the L3 Policy')) parser.add_argument( '--ip-version', type=int, # default=4, choices=[4, 6], help=_('IP version, default is 4')) parser.add_argument( '--ip-pool', help=_('CIDR of IP pool to create, default is 10.0.0.0/8')) parser.add_argument( '--subnet-prefix-length', type=int, # default=24, help=_('Subnet prefix length, default is 24')) parser.add_argument( 'name', metavar='NAME', help=_('Name of L3 policy to create')) def args2body(self, parsed_args): body = {self.resource: {}, } neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'tenant_id', 'description', 'ip_version', 'ip_pool', 'subnet_prefix_length']) return body class DeleteL3Policy(neutronV20.DeleteCommand): """Delete a given L3 Policy.""" resource = 'l3_policy' log = logging.getLogger(__name__ + '.DeleteL3Policy') class UpdateL3Policy(neutronV20.UpdateCommand): """Update L3 Policy's information.""" resource = 'l3_policy' log = logging.getLogger(__name__ + '.UpdateL3Policy') class ListNetworkServicePolicy(neutronV20.ListCommand): """List Network Service Policies that belong to a given tenant.""" resource = 'network_service_policy' log = logging.getLogger(__name__ + '.ListNetworkServicePolicy') _formatters = {'network_servie_params': _format_network_service_params} list_columns = ['id', 'name', 'description', 'network_service_params'] pagination_support = True sorting_support = True class ShowNetworkServicePolicy(neutronV20.ShowCommand): """Show information of a given network_service_policy.""" resource = 'network_service_policy' log = logging.getLogger(__name__ + '.ShowNetworkServicePolicy') class CreateNetworkServicePolicy(neutronV20.CreateCommand): """Create a Network Service Policy for a given tenant.""" resource = 'network_service_policy' log = logging.getLogger(__name__ + '.CreateNetworkServicePolicy') def add_known_arguments(self, parser): parser.add_argument( '--description', help=_('Description of the network_service_policy')) parser.add_argument( 'name', help=_('Name of network_service_policy to create')) parser.add_argument( '--network-service-params', metavar='type=PARAM_TYPE,name=PARAM_NAME,value=PARAM_VALUE', action='append', dest='network_service_params', type=utils.str2dict, help=_('Network service params for this network service policy' '(This option can be repeated).')) def args2body(self, parsed_args): body = {self.resource: {}, } """ if parsed_args.name: body[self.resource].update({'name': parsed_args.name}) if parsed_args.description: body[self.resource].update({'description': parsed_args.name}) if parsed_args.network_service_params: body[self.resource]['network_service_params'] = ( parsed_args.network_sercice_params) """ neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'tenant_id', 'description', 'network_service_params']) return body class DeleteNetworkServicePolicy(neutronV20.DeleteCommand): """Delete a given network_service_policy.""" resource = 'network_service_policy' log = logging.getLogger(__name__ + '.DeleteNetworkServicePolicy') class UpdateNetworkServicePolicy(neutronV20.UpdateCommand): """Update network_service_policy's information.""" resource = 'network_service_policy' log = logging.getLogger(__name__ + '.UpdateNetworkServicePolicy') def add_known_arguments(self, parser): parser.add_argument( '--description', help=_('Description of the network_service_policy')) parser.add_argument( '--name', help=_('Name of network_service_policy to create')) parser.add_argument( '--network-service-params', metavar='type=PARAM_TYPE,name=PARAM_NAME,value=PARAM_VALUE', action='append', dest='network_service_params', type=utils.str2dict, help=_('Network service params for this network service policy' '(This option can be repeated).')) def args2body(self, parsed_args): body = {self.resource: {}, } neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'tenant_id', 'description', 'network_service_params']) return body class ListPolicyClassifier(neutronV20.ListCommand): """List classifiers that belong to a given tenant.""" resource = 'policy_classifier' log = logging.getLogger(__name__ + '.ListPolicyClassifier') _formatters = {} list_columns = ['id', 'name', 'protocol', 'port_range', 'direction'] pagination_support = True sorting_support = True class ShowPolicyClassifier(neutronV20.ShowCommand): """Show information of a given classifier.""" resource = 'policy_classifier' log = logging.getLogger(__name__ + '.ShowPolicyClassifier') class CreatePolicyClassifier(neutronV20.CreateCommand): """Create a classifier for a given tenant.""" resource = 'policy_classifier' log = logging.getLogger(__name__ + '.CreatePolicyClassifier') def add_known_arguments(self, parser): parser.add_argument( '--description', help=_('Description of the policy classifier')) parser.add_argument( '--protocol', choices=['tcp', 'udp', 'icmp'], help=_('Protocol')) parser.add_argument( '--port-range', help=_('Port range')) parser.add_argument( '--direction', choices=['in', 'out', 'bi', ''], help=_('Direction')) parser.add_argument( 'name', metavar='NAME', help=_('Name of classifier to create')) def args2body(self, parsed_args): body = {self.resource: {}, } neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'tenant_id', 'description', 'protocol', 'port_range', 'direction']) return body class DeletePolicyClassifier(neutronV20.DeleteCommand): """Delete a given classifier.""" resource = 'policy_classifier' log = logging.getLogger(__name__ + '.DeletePolicyClassifier') class UpdatePolicyClassifier(neutronV20.UpdateCommand): """Update classifier's information.""" resource = 'policy_classifier' log = logging.getLogger(__name__ + '.UpdatePolicyClassifier') class ListPolicyAction(neutronV20.ListCommand): """List actions that belong to a given tenant.""" resource = 'policy_action' log = logging.getLogger(__name__ + '.ListPolicyAction') _formatters = {} list_columns = ['id', 'name', 'action_type', 'action_value'] pagination_support = True sorting_support = True class ShowPolicyAction(neutronV20.ShowCommand): """Show information of a given action.""" resource = 'policy_action' log = logging.getLogger(__name__ + '.ShowPolicyAction') class CreatePolicyAction(neutronV20.CreateCommand): """Create a action for a given tenant.""" resource = 'policy_action' log = logging.getLogger(__name__ + '.CreatePolicyAction') def add_known_arguments(self, parser): parser.add_argument( '--description', help=_('Description of the policy action')) parser.add_argument( '--action-type', help=_('Type of action')) parser.add_argument( '--action-value', help=_('uuid of service for redirect action')) parser.add_argument( 'name', metavar='NAME', help=_('Name of action to create')) def args2body(self, parsed_args): body = {self.resource: {}, } neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'tenant_id', 'description', 'action_type', 'action_value']) return body class DeletePolicyAction(neutronV20.DeleteCommand): """Delete a given action.""" resource = 'policy_action' log = logging.getLogger(__name__ + '.DeletePolicyAction') class UpdatePolicyAction(neutronV20.UpdateCommand): """Update action's information.""" resource = 'policy_action' log = logging.getLogger(__name__ + '.UpdatePolicyAction') class ListPolicyRule(neutronV20.ListCommand): """List policy_rules that belong to a given tenant.""" resource = 'policy_rule' log = logging.getLogger(__name__ + '.ListPolicyRule') _formatters = {} list_columns = ['id', 'name', 'enabled', 'classifier_id', 'actions'] pagination_support = True sorting_support = True class ShowPolicyRule(neutronV20.ShowCommand): """Show information of a given policy_rule.""" resource = 'policy_rule' log = logging.getLogger(__name__ + '.ShowPolicyRule') class CreatePolicyRule(neutronV20.CreateCommand): """Create a policy_rule for a given tenant.""" resource = 'policy_rule' log = logging.getLogger(__name__ + '.CreatePolicyRule') def add_known_arguments(self, parser): parser.add_argument( '--description', help=_('Description of the policy_rule')) parser.add_argument( '--enabled', type=bool, help=_('Enable flag')) parser.add_argument( '--classifier', help=_('uuid of policy classifier')) parser.add_argument( '--actions', type=string.split, help=_('List of policy actions')) parser.add_argument( 'name', metavar='NAME', help=_('Name of policy_rule to create')) def args2body(self, parsed_args): body = {self.resource: {}, } if parsed_args.actions: body[self.resource]['policy_actions'] = [ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_action', elem) for elem in parsed_args.actions] if parsed_args.classifier: body[self.resource]['policy_classifier_id'] = \ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_classifier', parsed_args.classifier) neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'tenant_id', 'description', 'enabled']) return body class DeletePolicyRule(neutronV20.DeleteCommand): """Delete a given policy_rule.""" resource = 'policy_rule' log = logging.getLogger(__name__ + '.DeletePolicyRule') class UpdatePolicyRule(neutronV20.UpdateCommand): """Update policy_rule's information.""" resource = 'policy_rule' log = logging.getLogger(__name__ + '.UpdatePolicyRule') def add_known_arguments(self, parser): parser.add_argument( '--enabled', type=bool, help=_('Enable flag')) parser.add_argument( '--classifier', help=_('uuid of policy classifier')) parser.add_argument( '--actions', type=string.split, help=_('List of policy actions')) def args2body(self, parsed_args): body = {self.resource: {}, } if parsed_args.actions: body[self.resource]['policy_actions'] = [ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_action', elem) for elem in parsed_args.actions] if parsed_args.classifier: body[self.resource]['policy_classifier_id'] = \ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_classifier', parsed_args.classifier) neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'description', 'enabled']) return body class ListPolicyRuleSet(neutronV20.ListCommand): """List policy_rule_sets that belong to a given tenant.""" resource = 'policy_rule_set' log = logging.getLogger(__name__ + '.ListPolicyRuleSet') _formatters = {} list_columns = ['id', 'name', 'ploicy_rules'] pagination_support = True sorting_support = True class ShowPolicyRuleSet(neutronV20.ShowCommand): """Show information of a given policy_rule_set.""" resource = 'policy_rule_set' log = logging.getLogger(__name__ + '.ShowPolicyRuleSet') class CreatePolicyRuleSet(neutronV20.CreateCommand): """Create a policy rule set for a given tenant.""" resource = 'policy_rule_set' log = logging.getLogger(__name__ + '.CreatePolicyRuleSet') def add_known_arguments(self, parser): parser.add_argument( '--description', help=_('Description of the policy rule set')) parser.add_argument( '--policy-rules', type=string.split, help=_('List of policy rules')) parser.add_argument( '--child-policy-rule-sets', type=string.split, help=_('List of child policy rule sets')) parser.add_argument( 'name', metavar='NAME', help=_('Name of policy rule set to create')) def args2body(self, parsed_args): body = {self.resource: {}, } if parsed_args.policy_rules: body[self.resource]['policy_rules'] = [ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_rule', elem) for elem in parsed_args.policy_rules] if parsed_args.child_policy_rule_sets: body[self.resource]['child_policy_rule_sets'] = [ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_rule_set', elem) for elem in parsed_args.child_policy_rule_sets] neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'tenant_id', 'description']) return body class DeletePolicyRuleSet(neutronV20.DeleteCommand): """Delete a given policy rule set.""" resource = 'policy_rule_set' log = logging.getLogger(__name__ + '.DeletePolicyRuleSet') class UpdatePolicyRuleSet(neutronV20.UpdateCommand): """Update policy rule set's information.""" resource = 'policy_rule_set' log = logging.getLogger(__name__ + '.UpdatePolicyRuleSet') def add_known_arguments(self, parser): parser.add_argument( '--policy-rules', type=string.split, help=_('List of policy rules')) parser.add_argument( '--child-policy-rule-sets', type=string.split, help=_('List of child policy rule sets')) def args2body(self, parsed_args): body = {self.resource: {}, } if parsed_args.policy_rules: body[self.resource]['policy_rules'] = [ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_rule', elem) for elem in parsed_args.policy_rules] parsed_args.policy_rules = body[self.resource]['policy_rules'] if parsed_args.child_policy_rule_sets: body[self.resource]['child_policy_rule_sets'] = [ neutronV20.find_resourceid_by_name_or_id( self.get_client(), 'policy_rule_set', elem) for elem in parsed_args.child_policy_rule_sets] parsed_args.child_policy_rule_sets = ( parsed_args.child_policy_rule_sets) neutronV20.update_dict(parsed_args, body[self.resource], ['name', 'description', 'policy_rules', 'child_policy_rule_sets']) return body