
There are some flake8 issues in each component that are not based on coding style rules . And There are some unittest issues in some component that need to be fixed Change-Id: Ic9a3f2c3b779ced225a42f69a495a606cb62517e Closes-Bug: #1350160
1334 lines
58 KiB
Python
1334 lines
58 KiB
Python
# Copyright 2013 IBM Corp.
|
|
|
|
"""
|
|
Handles all of the Neutron logic necessary for PowerVC driver.
|
|
|
|
The :py:class:`PowerVCNeutronAgent` class is a Neutron agent.
|
|
"""
|
|
|
|
'''
|
|
Created on Jul 30, 2013
|
|
|
|
@author: John Kasperski
|
|
'''
|
|
|
|
import Queue
|
|
import threading
|
|
import os
|
|
import time
|
|
from exceptions import KeyboardInterrupt
|
|
|
|
from neutron.common import rpc
|
|
from neutron.openstack.common import log as logging
|
|
|
|
from oslo.config import cfg
|
|
|
|
from powervc.common.constants import LOCAL_OS
|
|
from powervc.common.constants import POWERVC_OS
|
|
from powervc.common.constants import PVC_TOPIC
|
|
from powervc.common.constants import SERVICE_TYPES
|
|
from powervc.common.client import factory
|
|
from powervc.common.gettextutils import _
|
|
from powervc.neutron.api import powervc_rpc
|
|
from powervc.neutron.client import local_os_bindings
|
|
from powervc.neutron.client import powervc_bindings
|
|
from powervc.neutron.common import constants
|
|
from powervc.neutron.common import utils
|
|
from powervc.neutron.db import powervc_db_v2
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
agent_opts = [
|
|
cfg.ListOpt('map_powervc_networks',
|
|
default=['*'],
|
|
help=_('List of <PowerVC network names> '
|
|
'to be mapped up to the local OS')),
|
|
cfg.IntOpt('polling_interval',
|
|
default=60,
|
|
help=_("The number of seconds the agent will wait between "
|
|
"polling for network changes.")),
|
|
]
|
|
|
|
CONF = cfg.CONF
|
|
CONF.register_opts(agent_opts, "AGENT")
|
|
|
|
|
|
class PowerVCNeutronAgent(object):
|
|
"""
|
|
This is the main PowerVC Neutron agent class
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.end_thread = False
|
|
self.polling_interval = CONF.AGENT.polling_interval
|
|
self.retry_sync = time.time() + self.polling_interval
|
|
self.db = powervc_db_v2.PowerVCAgentDB()
|
|
self.event_q = Queue.Queue()
|
|
self.handlers = {}
|
|
self._register_handler(LOCAL_OS, constants.EVENT_NETWORK_CREATE,
|
|
self._handle_local_network_create)
|
|
self._register_handler(LOCAL_OS, constants.EVENT_NETWORK_UPDATE,
|
|
self._handle_local_network_update)
|
|
self._register_handler(LOCAL_OS, constants.EVENT_NETWORK_DELETE,
|
|
self._handle_local_network_delete)
|
|
self._register_handler(LOCAL_OS, constants.EVENT_SUBNET_CREATE,
|
|
self._handle_local_subnet_create)
|
|
self._register_handler(LOCAL_OS, constants.EVENT_SUBNET_UPDATE,
|
|
self._handle_local_subnet_update)
|
|
self._register_handler(LOCAL_OS, constants.EVENT_SUBNET_DELETE,
|
|
self._handle_local_subnet_delete)
|
|
self._register_handler(LOCAL_OS, constants.EVENT_PORT_CREATE,
|
|
self._handle_local_port_create)
|
|
self._register_handler(LOCAL_OS, constants.EVENT_PORT_UPDATE,
|
|
self._handle_local_port_update)
|
|
self._register_handler(LOCAL_OS, constants.EVENT_PORT_DELETE,
|
|
self._handle_local_port_delete)
|
|
self._register_handler(POWERVC_OS, constants.EVENT_NETWORK_CREATE,
|
|
self._handle_pvc_network_create)
|
|
self._register_handler(POWERVC_OS, constants.EVENT_NETWORK_UPDATE,
|
|
self._handle_pvc_network_update)
|
|
self._register_handler(POWERVC_OS, constants.EVENT_NETWORK_DELETE,
|
|
self._handle_pvc_network_delete)
|
|
self._register_handler(POWERVC_OS, constants.EVENT_SUBNET_CREATE,
|
|
self._handle_pvc_subnet_create)
|
|
self._register_handler(POWERVC_OS, constants.EVENT_SUBNET_UPDATE,
|
|
self._handle_pvc_subnet_update)
|
|
self._register_handler(POWERVC_OS, constants.EVENT_SUBNET_DELETE,
|
|
self._handle_pvc_subnet_delete)
|
|
self._register_handler(POWERVC_OS, constants.EVENT_PORT_CREATE,
|
|
self._handle_pvc_port_create)
|
|
self._register_handler(POWERVC_OS, constants.EVENT_PORT_UPDATE,
|
|
self._handle_pvc_port_update)
|
|
self._register_handler(POWERVC_OS, constants.EVENT_PORT_DELETE,
|
|
self._handle_pvc_port_delete)
|
|
self.pvc = powervc_bindings.Client(None, self)
|
|
self.pvc = factory.POWERVC.new_client(str(SERVICE_TYPES.network),
|
|
powervc_bindings.Client,
|
|
self)
|
|
self.local = local_os_bindings.Client(None, self)
|
|
self.local = factory.LOCAL.new_client(str(SERVICE_TYPES.network),
|
|
local_os_bindings.Client,
|
|
self)
|
|
self._setup_rpc()
|
|
|
|
def _generate_db_stats(self):
|
|
net_creating, net_active, net_deleting = self.db.get_network_stats()
|
|
sub_creating, sub_active, sub_deleting = self.db.get_subnet_stats()
|
|
port_creating, port_active, port_deleting = self.db.get_port_stats()
|
|
stat_n = '{0:d}/{1:d}/{2:d}'.format(net_creating,
|
|
net_active,
|
|
net_deleting)
|
|
stat_s = '{0:d}/{1:d}/{2:d}'.format(sub_creating,
|
|
sub_active,
|
|
sub_deleting)
|
|
stat_p = '{0:d}/{1:d}/{2:d}'.format(port_creating,
|
|
port_active,
|
|
port_deleting)
|
|
return '(n:{0}, s:{1}, p:{2})'.format(stat_n, stat_s, stat_p)
|
|
|
|
def _handle_local_network_create(self, network):
|
|
net_id = network.get('id')
|
|
db_net = self.db.get_network(local_id=net_id)
|
|
if db_net:
|
|
LOG.info(_("DB entry for local network %s already exists"), net_id)
|
|
return
|
|
# verify that if local network has no subnet, not handle it.
|
|
if not utils.network_has_subnet(network):
|
|
# No subnet, but maybe one was created when this event was queued
|
|
# up waiting to be processed. Refresh with current network
|
|
# that is actually on Local
|
|
local_net = self.local.get_network(net_id)
|
|
if not local_net:
|
|
LOG.info(_("Local network %s might have been deleted"),
|
|
local_net.get('name'))
|
|
return
|
|
if not utils.network_has_subnet(local_net):
|
|
LOG.info(_("Local network % has no subnet"),
|
|
local_net.get('name'))
|
|
return
|
|
if not utils.network_has_mappable_subnet(self.local, local_net):
|
|
LOG.info(_("Local network % has no mappable subnet"),
|
|
local_net.get('name'))
|
|
return
|
|
|
|
sync_key = utils.gen_network_sync_key(network)
|
|
db_net = self.db.get_network(sync_key=sync_key)
|
|
if db_net:
|
|
self.db.set_network_local_id(db_net, net_id)
|
|
else:
|
|
db_net = self.db.create_network(network, sync_key, local_id=net_id)
|
|
new_net = self.pvc.create_network(network)
|
|
if new_net:
|
|
self.db.set_network_pvc_id(db_net, new_net.get('id'))
|
|
|
|
def _handle_pvc_network_create(self, network):
|
|
net_id = network.get('id')
|
|
db_net = self.db.get_network(pvc_id=net_id)
|
|
if db_net:
|
|
LOG.info(_("DB entry for PowerVC network %s already exists"),
|
|
net_id)
|
|
return
|
|
# Verify that the PVC network has a subnet (most likely it won't)
|
|
if not utils.network_has_subnet(network):
|
|
# No subnet, but maybe one was created when this event was queued
|
|
# up waiting to be processed. Refresh with current network
|
|
# that is actually on PowerVC
|
|
network = self.pvc.get_network(net_id)
|
|
if not network:
|
|
LOG.warning(_("Unable to retrieve PowerVC network %s. "
|
|
"Network may have been deleted."), net_id)
|
|
return
|
|
# Check to see if the network has a subnet now (it might)
|
|
if not utils.network_has_subnet(network):
|
|
LOG.info(_("PowerVC network has no subnets: %s"),
|
|
network.get('name'))
|
|
return
|
|
sync_key = utils.gen_network_sync_key(network)
|
|
db_net = self.db.get_network(sync_key=sync_key)
|
|
if db_net:
|
|
self.db.set_network_pvc_id(db_net, net_id)
|
|
else:
|
|
# Create at local only if the name is in the white list.
|
|
if utils.is_network_in_white_list(network):
|
|
db_net = self.db.create_network(network, sync_key,
|
|
pvc_id=net_id)
|
|
new_net = self.local.create_network(network)
|
|
if new_net:
|
|
self.db.set_network_local_id(db_net, new_net.get('id'))
|
|
else:
|
|
LOG.info(_("PowerVC network is not allowed: %s"),
|
|
network.get('name'))
|
|
|
|
def _handle_local_network_update(self, network):
|
|
net_id = network.get('id')
|
|
db_net = self.db.get_network(local_id=net_id)
|
|
if not db_net:
|
|
LOG.info(_("DB entry for local network %s does not exist"), net_id)
|
|
return
|
|
pvc_id = db_net.get('pvc_id')
|
|
if not pvc_id:
|
|
LOG.info(_("No PowerVC network for local network %s"), net_id)
|
|
return
|
|
pvc_net = self.pvc.get_network(pvc_id)
|
|
if not pvc_net:
|
|
LOG.warning(_("Unable to retrieve PowerVC network %s. "
|
|
"Network may have been deleted."), pvc_id)
|
|
return
|
|
if not utils.equal_networks(pvc_net, network):
|
|
self.pvc.update_network(pvc_net, network)
|
|
update_data = utils.gen_network_update_data(network)
|
|
self.db.set_network_update_data(db_net, update_data)
|
|
else:
|
|
LOG.info(_("Network changes do not need to be updated"))
|
|
|
|
def _handle_pvc_network_update(self, network):
|
|
net_id = network.get('id')
|
|
db_net = self.db.get_network(pvc_id=net_id)
|
|
if not db_net:
|
|
LOG.info(_("DB entry for PowerVC network %s does not exist"),
|
|
net_id)
|
|
return
|
|
local_id = db_net.get('local_id')
|
|
if not local_id:
|
|
LOG.info(_("No local network for PowerVC network %s"), net_id)
|
|
return
|
|
local_net = self.local.get_network(local_id)
|
|
if not local_net:
|
|
LOG.warning(_("Unable to retrieve local network %s. "
|
|
"Network may have been deleted."), local_id)
|
|
return
|
|
if not utils.equal_networks(local_net, network):
|
|
self.local.update_network(local_net, network)
|
|
update_data = utils.gen_network_update_data(network)
|
|
self.db.set_network_update_data(db_net, update_data)
|
|
else:
|
|
LOG.info(_("Network changes do not need to be updated"))
|
|
|
|
def _handle_local_network_delete(self, net_id):
|
|
db_net = self.db.get_network(local_id=net_id)
|
|
if not db_net:
|
|
LOG.info(_("DB entry for local network %s does not exist"), net_id)
|
|
return
|
|
pvc_id = db_net.get('pvc_id')
|
|
self.db.set_network_local_id(db_net, None)
|
|
if pvc_id:
|
|
port_list = self.pvc.get_ports_on_network(pvc_id)
|
|
if len(port_list) > 0:
|
|
LOG.info(_("Ports still defined on PowerVC network %s"),
|
|
pvc_id)
|
|
return
|
|
self.pvc.delete_network(pvc_id)
|
|
network = self.pvc.get_network(pvc_id)
|
|
if network:
|
|
return
|
|
self.db.delete_network(db_net)
|
|
|
|
def _handle_pvc_network_delete(self, net_id):
|
|
db_net = self.db.get_network(pvc_id=net_id)
|
|
if not db_net:
|
|
LOG.info(_("DB entry for PowerVC network %s does not exist"),
|
|
net_id)
|
|
return
|
|
local_id = db_net.get('local_id')
|
|
self.db.set_network_pvc_id(db_net, None)
|
|
if local_id:
|
|
port_list = self.local.get_ports_on_network(local_id)
|
|
if len(port_list) > 0:
|
|
LOG.info(_("Ports still defined on local network %s"),
|
|
local_id)
|
|
return
|
|
self.local.delete_network(local_id)
|
|
network = self.local.get_network(local_id)
|
|
if network:
|
|
return
|
|
self.db.delete_network(db_net)
|
|
|
|
def _handle_local_subnet_create(self, subnet):
|
|
local_id = subnet.get('id')
|
|
db_sub = self.db.get_subnet(local_id=local_id)
|
|
if db_sub:
|
|
LOG.info(_("DB entry for local subnet %s already exists"),
|
|
local_id)
|
|
return
|
|
net_id = subnet.get('network_id')
|
|
db_net = self.db.get_network(local_id=net_id)
|
|
if not db_net:
|
|
# No database entry for the network. This may be the first subnet
|
|
# created on the network -or- the network may be not "mappable".
|
|
# Retrieve the network and pass it into the handler routine if
|
|
# it is valid.
|
|
network = self.local.get_network(net_id)
|
|
if network and utils.is_network_mappable(network):
|
|
self._handle_local_network_create(network)
|
|
db_net = self.db.get_network(local_id=net_id)
|
|
if not db_net:
|
|
LOG.info(_("Unable to find DB entry for local network %s"),
|
|
net_id)
|
|
return
|
|
if db_net.get('status') == constants.STATUS_DELETING:
|
|
LOG.info(_("Network %s is currently being deleted"), net_id)
|
|
return
|
|
sync_key = utils.gen_subnet_sync_key(subnet, db_net)
|
|
db_sub = self.db.get_subnet(sync_key=sync_key)
|
|
if db_sub:
|
|
self.db.set_subnet_local_id(db_sub, local_id)
|
|
else:
|
|
db_sub = self.db.create_subnet(subnet, sync_key, local_id=local_id)
|
|
new_sub = self.pvc.create_subnet(subnet)
|
|
if new_sub:
|
|
self.db.set_subnet_pvc_id(db_sub, new_sub.get('id'))
|
|
|
|
def _handle_pvc_subnet_create(self, subnet):
|
|
pvc_id = subnet.get('id')
|
|
db_sub = self.db.get_subnet(pvc_id=pvc_id)
|
|
if db_sub:
|
|
LOG.info(_("DB entry for PowerVC subnet %s already exists"),
|
|
pvc_id)
|
|
return
|
|
net_id = subnet.get('network_id')
|
|
db_net = self.db.get_network(pvc_id=net_id)
|
|
if not db_net:
|
|
# No database entry for the network. This may be the first subnet
|
|
# created on the network -or- the network may be not "mappable".
|
|
# Retrieve the network and pass it into the handler routine if
|
|
# it is valid.
|
|
pvc_net = self.pvc.get_network(net_id)
|
|
if pvc_net and utils.is_network_mappable(pvc_net):
|
|
self._handle_pvc_network_create(pvc_net)
|
|
# Database entry for the network should exist now
|
|
db_net = self.db.get_network(pvc_id=net_id)
|
|
if not db_net:
|
|
LOG.info(_("Unable to find DB entry for PowerVC network %s"),
|
|
net_id)
|
|
return
|
|
if db_net.get('status') == constants.STATUS_DELETING:
|
|
LOG.info(_("Network %s is currently being deleted"), net_id)
|
|
return
|
|
sync_key = utils.gen_subnet_sync_key(subnet, db_net)
|
|
db_sub = self.db.get_subnet(sync_key=sync_key)
|
|
if db_sub:
|
|
self.db.set_subnet_pvc_id(db_sub, pvc_id)
|
|
else:
|
|
db_sub = self.db.create_subnet(subnet, sync_key, pvc_id=pvc_id)
|
|
new_sub = self.local.create_subnet(subnet)
|
|
if new_sub:
|
|
self.db.set_subnet_local_id(db_sub, new_sub.get('id'))
|
|
|
|
def _handle_local_subnet_update(self, subnet):
|
|
local_id = subnet.get('id')
|
|
db_sub = self.db.get_subnet(local_id=local_id)
|
|
if not db_sub:
|
|
LOG.info(_("DB entry for local subnet %s does not exist"),
|
|
local_id)
|
|
return
|
|
pvc_id = db_sub.get('pvc_id')
|
|
if not pvc_id:
|
|
LOG.info(_("No PowerVC subnet for local subnet %s"), local_id)
|
|
return
|
|
pvc_sub = self.pvc.get_subnet(pvc_id)
|
|
if not pvc_sub:
|
|
LOG.warning(_("Unable to retrieve PowerVC subnet %s. "
|
|
"Subnet may have been deleted."), pvc_id)
|
|
return
|
|
if not utils.equal_subnets(pvc_sub, subnet):
|
|
self.pvc.update_subnet(pvc_sub, subnet)
|
|
update_data = utils.gen_subnet_update_data(subnet)
|
|
self.db.set_subnet_update_data(db_sub, update_data)
|
|
else:
|
|
LOG.info(_("Subnet changes do not need to be updated"))
|
|
|
|
def _handle_pvc_subnet_update(self, subnet):
|
|
pvc_id = subnet.get('id')
|
|
db_sub = self.db.get_subnet(pvc_id=pvc_id)
|
|
if not db_sub:
|
|
LOG.info(_("DB entry for PowerVC subnet %s does not exist"),
|
|
pvc_id)
|
|
return
|
|
local_id = db_sub.get('local_id')
|
|
if not local_id:
|
|
LOG.info(_("No local subnet for PowerVC subnet %s"), pvc_id)
|
|
return
|
|
local_sub = self.local.get_subnet(local_id)
|
|
if not local_sub:
|
|
LOG.warning(_("Unable to retrieve local subnet %s. "
|
|
"Subnet may have been deleted."), local_id)
|
|
return
|
|
if not utils.equal_subnets(local_sub, subnet):
|
|
self.local.update_subnet(local_sub, subnet)
|
|
update_data = utils.gen_subnet_update_data(subnet)
|
|
self.db.set_subnet_update_data(db_sub, update_data)
|
|
else:
|
|
LOG.info(_("Subnet changes do not need to be updated"))
|
|
|
|
def _handle_local_subnet_delete(self, sub_id):
|
|
db_sub = self.db.get_subnet(local_id=sub_id)
|
|
if not db_sub:
|
|
LOG.info(_("DB entry for local subnet %s does not exist"), sub_id)
|
|
return
|
|
pvc_id = db_sub.get('pvc_id')
|
|
self.db.set_subnet_local_id(db_sub, None)
|
|
if not pvc_id:
|
|
# Other half of database object has already been cleaned up
|
|
return
|
|
subnet = self.pvc.get_subnet(pvc_id)
|
|
if not subnet:
|
|
LOG.warning(_("Unable to retrieve PowerVC subnet %s. "
|
|
"Subnet may have been deleted."), pvc_id)
|
|
self.db.delete_subnet(db_sub)
|
|
return
|
|
net_id = subnet.get('network_id')
|
|
port_list = self.pvc.get_ports_on_subnet(net_id, pvc_id)
|
|
if len(port_list) > 0:
|
|
LOG.info(_("Ports still defined on PowerVC subnet %s"), pvc_id)
|
|
return
|
|
self.pvc.delete_subnet(pvc_id)
|
|
subnet = self.pvc.get_subnet(pvc_id)
|
|
if subnet:
|
|
return
|
|
self.db.delete_subnet(db_sub)
|
|
|
|
def _handle_pvc_subnet_delete(self, sub_id):
|
|
db_sub = self.db.get_subnet(pvc_id=sub_id)
|
|
if not db_sub:
|
|
LOG.info(_("DB entry for PowerVC subnet %s does not exist"),
|
|
sub_id)
|
|
return
|
|
local_id = db_sub.get('local_id')
|
|
self.db.set_subnet_pvc_id(db_sub, None)
|
|
if not local_id:
|
|
# Other half of database object has already been cleaned up
|
|
return
|
|
subnet = self.local.get_subnet(local_id)
|
|
if not subnet:
|
|
LOG.warning(_("Unable to retrieve local subnet %s. "
|
|
"Subnet may have been deleted."), local_id)
|
|
self.db.delete_subnet(db_sub)
|
|
return
|
|
net_id = subnet.get('network_id')
|
|
port_list = self.local.get_ports_on_subnet(net_id, local_id)
|
|
|
|
if len(port_list) > 0:
|
|
if (self._ports_valid(port_list)):
|
|
LOG.info(_("Ports still defined on local subnet %s"), local_id)
|
|
return
|
|
# no local ports left, delete the subnet
|
|
self.local.delete_subnet(local_id)
|
|
subnet = self.local.get_subnet(local_id)
|
|
if subnet:
|
|
return
|
|
self.db.delete_subnet(db_sub)
|
|
|
|
def _ports_valid(self, port_list):
|
|
"""
|
|
Check if these ports are still valid
|
|
:returns: True, if any of the ports is still valid;
|
|
False if none of them is valid.
|
|
"""
|
|
# handle case:
|
|
# local port is created and pvc port is not created;
|
|
# local port status will be 'Creating', delete such port.
|
|
deleted = 0
|
|
for local_port in port_list:
|
|
local_port_id = local_port.get('id')
|
|
db_port = self.db.get_port(local_id=local_port_id)
|
|
if db_port and db_port.get('status') == constants.STATUS_CREATING:
|
|
# delete this local port
|
|
self.local.delete_port(local_port_id)
|
|
# if it is really deleted
|
|
local_port = self.local.get_port(local_port_id)
|
|
if not local_port:
|
|
self.db.delete_port(db_port)
|
|
deleted += 1
|
|
# still some ports left there
|
|
if (deleted != len(port_list)):
|
|
return True
|
|
# No port left
|
|
return False
|
|
|
|
def _handle_local_port_create(self, port):
|
|
local_id = port.get('id')
|
|
db_port = self.db.get_port(local_id=local_id)
|
|
if db_port:
|
|
LOG.info(_("DB entry for local port %s already exists"), local_id)
|
|
return
|
|
net_id = port.get('network_id')
|
|
db_net = self.db.get_network(local_id=net_id)
|
|
if not db_net:
|
|
LOG.info(_("Unable to find DB entry for local network %s"), net_id)
|
|
return
|
|
if db_net.get('status') == constants.STATUS_DELETING:
|
|
LOG.info(_("Network %s is currently being deleted"), net_id)
|
|
return
|
|
valid_subnet = False
|
|
subnet_ids = utils.extract_subnets_from_port(port)
|
|
for local_sub_id in subnet_ids:
|
|
db_sub = self.db.get_subnet(local_id=local_sub_id)
|
|
if db_sub:
|
|
valid_subnet = True
|
|
break
|
|
if not valid_subnet:
|
|
LOG.info(_("Unable to map local port %s. The subnet %s "
|
|
"is not mapped."), local_id, subnet_ids)
|
|
return
|
|
sync_key = utils.gen_port_sync_key(port, db_net)
|
|
db_port = self.db.get_port(sync_key=sync_key)
|
|
if db_port:
|
|
self.db.set_port_local_id(db_port, local_id)
|
|
return
|
|
# Create the database entry for this new port
|
|
db_port = self.db.create_port(port, sync_key, local_id=local_id)
|
|
# Determine which instance owns this port
|
|
device_id = port.get('device_id')
|
|
# Determine if the instance is (HyperV / KVM) or PowerVC
|
|
# if PowerVC, return.
|
|
# If HyperV/KVM, reserve IP address in PowerVC
|
|
if not self.local.is_instance_on_power(device_id):
|
|
new_port = self.pvc.create_port(port)
|
|
if new_port:
|
|
self.db.set_port_pvc_id(db_port, new_port.get('id'))
|
|
|
|
def _handle_pvc_port_create(self, port):
|
|
pvc_id = port.get('id')
|
|
db_port = self.db.get_port(pvc_id=pvc_id)
|
|
if db_port:
|
|
LOG.info(_("DB entry for PowerVC port %s already exists"), pvc_id)
|
|
return
|
|
net_id = port.get('network_id')
|
|
db_net = self.db.get_network(pvc_id=net_id)
|
|
if not db_net:
|
|
LOG.info(_("Unable to find DB entry for PowerVC network %s"),
|
|
net_id)
|
|
return
|
|
if db_net.get('status') == constants.STATUS_DELETING:
|
|
LOG.info(_("Network %s is currently being deleted"), net_id)
|
|
return
|
|
valid_subnet = False
|
|
subnet_ids = utils.extract_subnets_from_port(port)
|
|
for pvc_sub_id in subnet_ids:
|
|
db_sub = self.db.get_subnet(pvc_id=pvc_sub_id)
|
|
if db_sub:
|
|
valid_subnet = True
|
|
break
|
|
if not valid_subnet:
|
|
LOG.info(_("Unable to map PowerVC port %s. The subnet %s "
|
|
"is not mapped."), pvc_id, subnet_ids)
|
|
return
|
|
sync_key = utils.gen_port_sync_key(port, db_net)
|
|
db_port = self.db.get_port(sync_key=sync_key)
|
|
if db_port:
|
|
self.db.set_port_pvc_id(db_port, pvc_id)
|
|
return
|
|
db_port = self.db.create_port(port, sync_key, pvc_id=pvc_id)
|
|
new_port = self.local.create_port(port)
|
|
if new_port:
|
|
self.db.set_port_local_id(db_port, new_port.get('id'))
|
|
|
|
def _handle_local_port_update(self, port):
|
|
local_id = port.get('id')
|
|
db_port = self.db.get_port(local_id=local_id)
|
|
if not db_port:
|
|
LOG.info(_("DB entry for local port %s does not exist"), local_id)
|
|
return
|
|
pvc_id = db_port.get('pvc_id')
|
|
if not pvc_id:
|
|
LOG.info(_("No PowerVC port for local port %s"), local_id)
|
|
return
|
|
pvc_port = self.pvc.get_port(pvc_id)
|
|
if not pvc_port:
|
|
LOG.warning(_("Unable to retrieve PowerVC port %s. "
|
|
"Port may have been deleted."), pvc_id)
|
|
return
|
|
if not utils.equal_ports(pvc_port, port):
|
|
self.pvc.update_port(pvc_port, port)
|
|
update_data = utils.gen_port_update_data(port)
|
|
self.db.set_port_update_data(db_port, update_data)
|
|
else:
|
|
LOG.info(_("Port changes do not need to be updated"))
|
|
|
|
def _handle_pvc_port_update(self, port):
|
|
pvc_id = port.get('id')
|
|
db_port = self.db.get_port(pvc_id=pvc_id)
|
|
if not db_port:
|
|
LOG.info(_("DB entry for PowerVC port %s does not exist"), pvc_id)
|
|
return
|
|
local_id = db_port.get('local_id')
|
|
if not local_id:
|
|
LOG.info(_("No local port for PowerVC port %s"), pvc_id)
|
|
return
|
|
local_port = self.local.get_port(local_id)
|
|
if not local_port:
|
|
LOG.warning(_("Unable to retrieve local port %s. "
|
|
"Port may have been deleted."), local_id)
|
|
return
|
|
if not utils.equal_ports(local_port, port):
|
|
self.local.update_port(local_port, port)
|
|
update_data = utils.gen_port_update_data(port)
|
|
self.db.set_port_update_data(db_port, update_data)
|
|
else:
|
|
LOG.info(_("Port changes do not need to be updated"))
|
|
|
|
def _handle_local_port_delete(self, port_id):
|
|
db_port = self.db.get_port(local_id=port_id)
|
|
if not db_port:
|
|
LOG.info(_("DB entry for local port %s does not exist"), port_id)
|
|
return
|
|
pvc_id = db_port.get('pvc_id')
|
|
self.db.set_port_local_id(db_port, None)
|
|
if not pvc_id:
|
|
# Other half of database object has already been cleaned up
|
|
return
|
|
pvc_port = self.pvc.get_port(pvc_id)
|
|
if not pvc_port:
|
|
LOG.warning(_("Unable to retrieve PowerVC port %s. "
|
|
"Port may have been deleted."), pvc_id)
|
|
self.db.delete_port(db_port)
|
|
return
|
|
device_id = pvc_port.get('device_id')
|
|
if device_id and len(device_id) > 0:
|
|
LOG.info(_("PowerVC port %s can not be deleted. Port is in-use "
|
|
"by VM %s."), pvc_id, device_id)
|
|
LOG.info(_("Recreate the local port to prevent this IP "
|
|
"address from being used by another instance."))
|
|
new_port = self.local.create_port(pvc_port)
|
|
if new_port:
|
|
# Update the database entry with new port uuid
|
|
self.db.set_port_local_id(db_port, new_port.get('id'))
|
|
return
|
|
self.pvc.delete_port(pvc_id)
|
|
pvc_port = self.pvc.get_port(pvc_id)
|
|
if pvc_port:
|
|
return
|
|
self.db.delete_port(db_port)
|
|
|
|
def _handle_pvc_port_delete(self, port_id):
|
|
db_port = self.db.get_port(pvc_id=port_id)
|
|
if not db_port:
|
|
LOG.info(_("DB entry for PowerVC port %s does not exist"), port_id)
|
|
return
|
|
local_id = db_port.get('local_id')
|
|
self.db.set_port_pvc_id(db_port, None)
|
|
if not local_id:
|
|
# Other half of database object has already been cleaned up
|
|
return
|
|
local_port = self.local.get_port(local_id)
|
|
if not local_port:
|
|
LOG.warning(_("Unable to retrieve local port %s. "
|
|
"Port may have been deleted."), local_id)
|
|
self.db.delete_port(db_port)
|
|
return
|
|
self._delete_local_port(local_port, db_port)
|
|
|
|
def _delete_local_port(self, local_port, db_port):
|
|
# complex logic here on how to handle it
|
|
# some possible cases for this local port:
|
|
# 1) device_id = None occurs when lock IP address done using SCE UI.
|
|
# Delete the local port
|
|
# 2) device_owner = network:router_interface (see issue 173350).
|
|
# re-create the PVC port
|
|
# 3) device_id = instance that no longer exists.
|
|
# Delete the local port
|
|
# 4) device_id = HyperV/KVM instance.
|
|
# Re-create the PVC port
|
|
# 5) device_id = PowerVC deployed instance.
|
|
# Delete the local port
|
|
#
|
|
local_id = db_port.get('local_id')
|
|
# case 2
|
|
device_owner = local_port.get('device_owner')
|
|
if device_owner and (device_owner == "network:router_interface"):
|
|
LOG.info(_("Local port %s can not be deleted. Port is in-use "
|
|
"by device_owner %s."), local_id, device_owner)
|
|
new_port = self.pvc.create_port(local_port)
|
|
if new_port:
|
|
self.db.set_port_pvc_id(db_port, new_port.get('id'))
|
|
return
|
|
|
|
device_id = local_port.get('device_id')
|
|
if device_id and self.local.is_instance_valid(device_id):
|
|
if not self.local.is_instance_on_power(device_id):
|
|
# case 4)
|
|
LOG.info(_("Local port %s can not be deleted. Port is in-use "
|
|
"by VM %s."), local_id, device_id)
|
|
new_port = self.pvc.create_port(local_port)
|
|
if new_port:
|
|
self.db.set_port_pvc_id(db_port, new_port.get('id'))
|
|
return
|
|
# for case 1) 3) 5)
|
|
self.local.delete_port(local_id)
|
|
local_port = self.local.get_port(local_id)
|
|
if local_port:
|
|
return
|
|
self.db.delete_port(db_port)
|
|
|
|
def _register_handler(self, event_os, event_type, handler):
|
|
key = event_type
|
|
if event_os:
|
|
key = event_os + ':' + event_type
|
|
self.handlers[key] = handler
|
|
|
|
def _handle_event(self, event):
|
|
event_os = event.get(constants.EVENT_OS)
|
|
event_type = event.get(constants.EVENT_TYPE)
|
|
event_obj = event.get(constants.EVENT_OBJECT)
|
|
if event_type == constants.EVENT_END_THREAD:
|
|
return
|
|
elif event_type == constants.EVENT_FULL_SYNC:
|
|
self._synchronize(event_os)
|
|
return
|
|
key = event_type
|
|
if event_os:
|
|
key = event_os + ':' + event_type
|
|
handler = self.handlers.get(key)
|
|
if not handler:
|
|
LOG.error(_("No handler found for: %s"), key)
|
|
return
|
|
return handler(event_obj)
|
|
|
|
def queue_event(self, event_os, event_type, event_obj):
|
|
event = {}
|
|
event[constants.EVENT_OS] = event_os
|
|
event[constants.EVENT_TYPE] = event_type
|
|
event[constants.EVENT_OBJECT] = event_obj
|
|
self.event_q.put(event)
|
|
|
|
def _setup_rpc(self):
|
|
"""
|
|
set up RPC support
|
|
"""
|
|
rpc.init(CONF)
|
|
self.topic = PVC_TOPIC
|
|
self.conn = rpc.create_connection(new=True)
|
|
self.endpoints = [powervc_rpc.PVCRpcCallbacks(self)]
|
|
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
|
|
self.conn.consume_in_threads()
|
|
LOG.info(_("RPC listener created"))
|
|
|
|
def _synchronize(self, default_target=LOCAL_OS):
|
|
"""
|
|
Main synchronize routine
|
|
"""
|
|
start = time.time()
|
|
LOG.info(_("Synchronizing all networks/subnets/ports..."))
|
|
self._synchronize_networks(default_target)
|
|
self._synchronize_subnets(default_target)
|
|
self._synchronize_ports(default_target)
|
|
db_stats = self._generate_db_stats()
|
|
end = time.time()
|
|
elapsed = '{0:.4} seconds'.format(end - start)
|
|
LOG.info(_("Full sync elapsed time: %s %s"), elapsed, db_stats)
|
|
self.retry_sync = time.time() + self.polling_interval
|
|
|
|
def _synchronize_networks(self, target=LOCAL_OS):
|
|
pvc_nets = self.pvc.get_networks()
|
|
local_nets = self.local.get_networks()
|
|
self._sync_deleted_nets(pvc_nets, local_nets)
|
|
self._sync_new_pvc_nets(pvc_nets)
|
|
self._sync_new_local_nets(local_nets)
|
|
self._sync_updated_nets(pvc_nets, local_nets, target)
|
|
self._sync_deleting_nets()
|
|
self._sync_creating_nets(pvc_nets, local_nets)
|
|
|
|
def _sync_deleted_nets(self, pvc_nets, local_nets):
|
|
db_networks = self.db.get_networks()
|
|
for db_net in db_networks:
|
|
pvc_id, local_id = utils.extract_ids_from_entry(db_net)
|
|
if pvc_id and pvc_id not in pvc_nets.keys():
|
|
self.db.set_network_pvc_id(db_net, None)
|
|
if local_id and local_id not in local_nets.keys():
|
|
self.db.set_network_local_id(db_net, None)
|
|
|
|
def _sync_new_pvc_nets(self, pvc_nets):
|
|
for pvc_net in pvc_nets.values():
|
|
pvc_id = pvc_net.get('id')
|
|
db_net = self.db.get_network(pvc_id=pvc_id)
|
|
if db_net:
|
|
# DB entry for this PVC network already exists
|
|
continue
|
|
# Verify that the PVC network has a subnet. A network without
|
|
# a subnet is considered a DHCP network by PowerVC. We do not
|
|
# support DHCP networks
|
|
if not utils.network_has_subnet(pvc_net):
|
|
LOG.info(_("PowerVC network has no subnets: %s"),
|
|
pvc_net.get('name'))
|
|
continue
|
|
sync_key = utils.gen_network_sync_key(pvc_net)
|
|
db_net = self.db.get_network(sync_key=sync_key)
|
|
if db_net:
|
|
self.db.set_network_pvc_id(db_net, pvc_id)
|
|
else:
|
|
# Check if the pvc network is allowed to sync.
|
|
if utils.is_network_in_white_list(pvc_net):
|
|
self.db.create_network(pvc_net, sync_key, pvc_id=pvc_id)
|
|
else:
|
|
LOG.info(_("PowerVC network is not allowed: %s"),
|
|
pvc_net.get('name'))
|
|
|
|
def _sync_new_local_nets(self, local_nets):
|
|
for local_net in local_nets.values():
|
|
local_id = local_net.get('id')
|
|
db_net = self.db.get_network(local_id=local_id)
|
|
if db_net:
|
|
# DB entry for this local network already exists
|
|
continue
|
|
# if local network has no subnet, not handle it.
|
|
if not utils.network_has_subnet(local_net):
|
|
LOG.info(_("Local network %s has no subnet"),
|
|
local_net.get('name'))
|
|
continue
|
|
# if local network has subnet, verify if the subnet is mappable
|
|
if not utils.network_has_mappable_subnet(self.local,
|
|
local_net):
|
|
LOG.info(_("Local network %s has no mappable subnet"),
|
|
local_net.get('name'))
|
|
continue
|
|
sync_key = utils.gen_network_sync_key(local_net)
|
|
db_net = self.db.get_network(sync_key=sync_key)
|
|
if db_net:
|
|
self.db.set_network_local_id(db_net, local_id)
|
|
else:
|
|
self.db.create_network(local_net, sync_key, local_id=local_id)
|
|
|
|
def _sync_updated_nets(self, pvc_nets, local_nets, target):
|
|
db_active_list = self.db.get_networks(constants.STATUS_ACTIVE)
|
|
for db_net in db_active_list:
|
|
pvc_id, local_id = utils.extract_ids_from_entry(db_net)
|
|
pvc_net = pvc_nets.get(pvc_id)
|
|
local_net = local_nets.get(local_id)
|
|
result = utils.compare_networks(local_net, pvc_net, db_net, target)
|
|
if result:
|
|
if result == LOCAL_OS:
|
|
self.local.update_network(local_net, pvc_net)
|
|
update_data = utils.gen_network_update_data(pvc_net)
|
|
else:
|
|
self.pvc.update_network(pvc_net, local_net)
|
|
update_data = utils.gen_network_update_data(local_net)
|
|
self.db.set_network_update_data(db_net, update_data)
|
|
|
|
def _sync_deleting_nets(self):
|
|
db_delete_list = self.db.get_networks(constants.STATUS_DELETING)
|
|
for db_net in db_delete_list:
|
|
pvc_id, local_id = utils.extract_ids_from_entry(db_net)
|
|
if pvc_id and local_id:
|
|
self.db.fix_incorrect_state(db_net)
|
|
continue
|
|
if pvc_id:
|
|
pvc_ports = self.pvc.get_ports_on_network(pvc_id)
|
|
if len(pvc_ports) > 0:
|
|
LOG.info(_("Ports are still defined on PowerVC network "
|
|
"%s. Network can not be deleted."), pvc_id)
|
|
continue
|
|
self.pvc.delete_network(pvc_id)
|
|
pvc_net = self.pvc.get_network(pvc_id)
|
|
if pvc_net:
|
|
continue
|
|
if local_id:
|
|
local_ports = self.local.get_ports_on_network(local_id)
|
|
if len(local_ports) > 0:
|
|
LOG.info(_("Ports are still defined on local network "
|
|
"%s. Network can not be deleted."), local_id)
|
|
continue
|
|
self.local.delete_network(local_id)
|
|
local_net = self.local.get_network(local_id)
|
|
if local_net:
|
|
continue
|
|
self.db.delete_network(db_net)
|
|
|
|
def _sync_creating_nets(self, pvc_nets, local_nets):
|
|
db_create_list = self.db.get_networks(constants.STATUS_CREATING)
|
|
for db_net in db_create_list:
|
|
pvc_id, local_id = utils.extract_ids_from_entry(db_net)
|
|
if pvc_id:
|
|
pvc_net = pvc_nets.get(pvc_id)
|
|
local_net = self.local.create_network(pvc_net)
|
|
if local_net:
|
|
local_id = local_net.get('id')
|
|
self.db.set_network_local_id(db_net, local_id)
|
|
continue
|
|
if local_id:
|
|
local_net = local_nets.get(local_id)
|
|
pvc_net = self.pvc.create_network(local_net)
|
|
if pvc_net:
|
|
pvc_id = pvc_net.get('id')
|
|
self.db.set_network_pvc_id(db_net, pvc_id)
|
|
continue
|
|
|
|
def _synchronize_subnets(self, target=LOCAL_OS):
|
|
pvc_subnets = self.pvc.get_subnets()
|
|
local_subnets = self.local.get_subnets()
|
|
self._sync_deleted_subnets(pvc_subnets, local_subnets)
|
|
self._sync_new_pvc_subnets(pvc_subnets)
|
|
self._sync_new_local_subnets(local_subnets)
|
|
self._sync_updated_subnets(pvc_subnets, local_subnets, target)
|
|
self._sync_deleting_subnets(pvc_subnets, local_subnets)
|
|
self._sync_creating_subnets(pvc_subnets, local_subnets)
|
|
|
|
def _sync_deleted_subnets(self, pvc_subnets, local_subnets):
|
|
db_subnets = self.db.get_subnets()
|
|
for db_sub in db_subnets:
|
|
pvc_id, local_id = utils.extract_ids_from_entry(db_sub)
|
|
if pvc_id and pvc_id not in pvc_subnets.keys():
|
|
self.db.set_subnet_pvc_id(db_sub, None)
|
|
if local_id and local_id not in local_subnets.keys():
|
|
self.db.set_subnet_local_id(db_sub, None)
|
|
|
|
def _sync_new_pvc_subnets(self, pvc_subnets):
|
|
for pvc_sub in pvc_subnets.values():
|
|
pvc_id = pvc_sub.get('id')
|
|
db_sub = self.db.get_subnet(pvc_id=pvc_id)
|
|
if db_sub:
|
|
# DB entry for this PVC subnet already exists
|
|
continue
|
|
pvc_net_id = pvc_sub.get('network_id')
|
|
db_net = self.db.get_network(pvc_id=pvc_net_id)
|
|
if not db_net:
|
|
# Subnet is associated with a network that is not mapped
|
|
continue
|
|
if db_net.get('status') == constants.STATUS_DELETING:
|
|
# Do not create new subnet if network is being deleted
|
|
continue
|
|
sync_key = utils.gen_subnet_sync_key(pvc_sub, db_net)
|
|
db_sub = self.db.get_subnet(sync_key=sync_key)
|
|
if db_sub:
|
|
self.db.set_subnet_pvc_id(db_sub, pvc_id)
|
|
else:
|
|
self.db.create_subnet(pvc_sub, sync_key, pvc_id=pvc_id)
|
|
|
|
def _sync_new_local_subnets(self, local_subnets):
|
|
for local_sub in local_subnets.values():
|
|
local_id = local_sub.get('id')
|
|
db_sub = self.db.get_subnet(local_id=local_id)
|
|
if db_sub:
|
|
# DB entry for this local subnet already exists
|
|
continue
|
|
local_net_id = local_sub.get('network_id')
|
|
db_net = self.db.get_network(local_id=local_net_id)
|
|
if not db_net:
|
|
# Subnet is associated with a network that is not mapped
|
|
continue
|
|
if db_net.get('status') == constants.STATUS_DELETING:
|
|
# Do not create new subnet if network is being deleted
|
|
continue
|
|
sync_key = utils.gen_subnet_sync_key(local_sub, db_net)
|
|
db_sub = self.db.get_subnet(sync_key=sync_key)
|
|
if db_sub:
|
|
self.db.set_subnet_local_id(db_sub, local_id)
|
|
else:
|
|
self.db.create_subnet(local_sub, sync_key, local_id=local_id)
|
|
|
|
def _sync_updated_subnets(self, pvc_subnets, local_subnets, target):
|
|
db_active_list = self.db.get_subnets(constants.STATUS_ACTIVE)
|
|
for db_sub in db_active_list:
|
|
pvc_id, local_id = utils.extract_ids_from_entry(db_sub)
|
|
pvc_sub = pvc_subnets.get(pvc_id)
|
|
local_sub = local_subnets.get(local_id)
|
|
result = utils.compare_subnets(local_sub, pvc_sub, db_sub, target)
|
|
if result:
|
|
if result == LOCAL_OS:
|
|
self.local.update_subnet(local_sub, pvc_sub)
|
|
update_data = utils.gen_subnet_update_data(pvc_sub)
|
|
else:
|
|
self.pvc.update_subnet(pvc_sub, local_sub)
|
|
update_data = utils.gen_subnet_update_data(local_sub)
|
|
self.db.set_subnet_update_data(db_sub, update_data)
|
|
|
|
def _sync_deleting_subnets(self, pvc_subnets, local_subnets):
|
|
db_delete_list = self.db.get_subnets(constants.STATUS_DELETING)
|
|
for db_sub in db_delete_list:
|
|
pvc_id, local_id = utils.extract_ids_from_entry(db_sub)
|
|
if pvc_id and local_id:
|
|
self.db.fix_incorrect_state(db_sub)
|
|
continue
|
|
if pvc_id:
|
|
pvc_sub = pvc_subnets.get(pvc_id)
|
|
pvc_net_id = pvc_sub.get('network_id')
|
|
pvc_ports = self.pvc.get_ports_on_subnet(pvc_net_id,
|
|
pvc_id)
|
|
if len(pvc_ports) > 0:
|
|
LOG.info(_("Ports are still defined on PowerVC subnet "
|
|
"%s. Subnet can not be deleted."), pvc_id)
|
|
continue
|
|
self.pvc.delete_subnet(pvc_id)
|
|
pvc_sub = self.pvc.get_subnet(pvc_id)
|
|
if pvc_sub:
|
|
continue
|
|
if local_id:
|
|
local_sub = local_subnets.get(local_id)
|
|
local_net_id = local_sub.get('network_id')
|
|
local_ports = self.local.get_ports_on_subnet(local_net_id,
|
|
local_id)
|
|
if len(local_ports) > 0:
|
|
if (self._ports_valid(local_ports)):
|
|
LOG.info(_("Ports are still defined on local OS"
|
|
" subnet %s. Subnet can not be deleted."),
|
|
local_id)
|
|
continue
|
|
self.local.delete_subnet(local_id)
|
|
local_sub = self.local.get_subnet(local_id)
|
|
if local_sub:
|
|
continue
|
|
self.db.delete_subnet(db_sub)
|
|
|
|
def _sync_creating_subnets(self, pvc_subnets, local_subnets):
|
|
db_create_list = self.db.get_subnets(constants.STATUS_CREATING)
|
|
for db_sub in db_create_list:
|
|
pvc_id, local_id = utils.extract_ids_from_entry(db_sub)
|
|
if pvc_id:
|
|
pvc_sub = pvc_subnets.get(pvc_id)
|
|
local_sub = self.local.create_subnet(pvc_sub)
|
|
if local_sub:
|
|
local_id = local_sub.get('id')
|
|
self.db.set_subnet_local_id(db_sub, local_id)
|
|
continue
|
|
if local_id:
|
|
local_sub = local_subnets.get(local_id)
|
|
pvc_sub = self.pvc.create_subnet(local_sub)
|
|
if pvc_sub:
|
|
pvc_id = pvc_sub.get('id')
|
|
self.db.set_subnet_pvc_id(db_sub, pvc_id)
|
|
continue
|
|
|
|
def _synchronize_ports(self, target=LOCAL_OS):
|
|
pvc_ports = self.pvc.get_ports()
|
|
local_ports = self.local.get_ports()
|
|
self._sync_deleted_ports(pvc_ports, local_ports)
|
|
self._sync_new_pvc_ports(pvc_ports)
|
|
self._sync_new_local_ports(local_ports)
|
|
self._sync_updated_ports(pvc_ports, local_ports, target)
|
|
self._sync_deleting_ports()
|
|
self._sync_creating_ports(pvc_ports, local_ports)
|
|
|
|
def _sync_deleted_ports(self, pvc_ports, local_ports):
|
|
db_ports = self.db.get_ports()
|
|
for db_port in db_ports:
|
|
pvc_id, local_id = utils.extract_ids_from_entry(db_port)
|
|
if pvc_id and pvc_id not in pvc_ports.keys():
|
|
self.db.set_port_pvc_id(db_port, None)
|
|
if local_id and local_id not in local_ports.keys():
|
|
self.db.set_port_local_id(db_port, None)
|
|
|
|
def _sync_new_pvc_ports(self, pvc_ports):
|
|
for pvc_port in pvc_ports.values():
|
|
pvc_id = pvc_port.get('id')
|
|
db_port = self.db.get_port(pvc_id=pvc_id)
|
|
if db_port:
|
|
# DB entry for this PVC port already exists
|
|
continue
|
|
pvc_net_id = pvc_port.get('network_id')
|
|
db_net = self.db.get_network(pvc_id=pvc_net_id)
|
|
if not db_net:
|
|
# Port is associated with a network that is not mapped
|
|
continue
|
|
if db_net.get('status') == constants.STATUS_DELETING:
|
|
# Do not create new port if network is being deleted
|
|
continue
|
|
valid_subnet = False
|
|
subnet_ids = utils.extract_subnets_from_port(pvc_port)
|
|
for pvc_sub_id in subnet_ids:
|
|
db_sub = self.db.get_subnet(pvc_id=pvc_sub_id)
|
|
if db_sub:
|
|
valid_subnet = True
|
|
break
|
|
if not valid_subnet:
|
|
LOG.info(_("Unable to map PowerVC port %s. The subnet %s "
|
|
"is not mapped."), pvc_id, subnet_ids)
|
|
continue
|
|
sync_key = utils.gen_port_sync_key(pvc_port, db_net)
|
|
db_port = self.db.get_port(sync_key=sync_key)
|
|
if db_port:
|
|
self.db.set_port_pvc_id(db_port, pvc_id)
|
|
else:
|
|
self.db.create_port(pvc_port, sync_key, pvc_id=pvc_id)
|
|
|
|
def _sync_new_local_ports(self, local_ports):
|
|
for local_port in local_ports.values():
|
|
local_id = local_port.get('id')
|
|
db_port = self.db.get_port(local_id=local_id)
|
|
if db_port:
|
|
# DB entry for this local port already exists
|
|
continue
|
|
local_net_id = local_port.get('network_id')
|
|
db_net = self.db.get_network(local_id=local_net_id)
|
|
if not db_net:
|
|
# Port is associated with a network that is not mapped
|
|
continue
|
|
if db_net.get('status') == constants.STATUS_DELETING:
|
|
# Do not create new port if network is being deleted
|
|
continue
|
|
if not db_net.get('pvc_id'):
|
|
# The PowerVC network no longer exists
|
|
continue
|
|
valid_subnet = False
|
|
subnet_ids = utils.extract_subnets_from_port(local_port)
|
|
for local_sub_id in subnet_ids:
|
|
db_sub = self.db.get_subnet(local_id=local_sub_id)
|
|
if db_sub:
|
|
valid_subnet = True
|
|
break
|
|
if not valid_subnet:
|
|
LOG.info(_("Unable to map local port %s. The subnet %s "
|
|
"is not mapped."), local_id, subnet_ids)
|
|
continue
|
|
sync_key = utils.gen_port_sync_key(local_port, db_net)
|
|
db_port = self.db.get_port(sync_key=sync_key)
|
|
if db_port:
|
|
self.db.set_port_local_id(db_port, local_id)
|
|
else:
|
|
self.db.create_port(local_port, sync_key, local_id=local_id)
|
|
|
|
def _sync_updated_ports(self, pvc_ports, local_ports, target):
|
|
db_active_list = self.db.get_ports(constants.STATUS_ACTIVE)
|
|
vm_map = None
|
|
for db_port in db_active_list:
|
|
pvc_id, local_id = utils.extract_ids_from_entry(db_port)
|
|
pvc_port = pvc_ports.get(pvc_id)
|
|
local_port = local_ports.get(local_id)
|
|
if not pvc_port or not local_port:
|
|
continue
|
|
# Fix up device id in local port (if necessary)
|
|
pvc_device = pvc_port.get('device_id')
|
|
local_device = local_port.get('device_id')
|
|
if (not local_device or len(local_device) == 0 or
|
|
local_device.startswith(constants.RSVD_PORT_PREFIX)):
|
|
if pvc_device and len(pvc_device) > 0:
|
|
if vm_map is None:
|
|
LOG.info(_("Retrieving PowerVC to local VM mappings"))
|
|
vm_map = self.local.get_power_vm_mapping()
|
|
if pvc_device in vm_map:
|
|
local_device_id = vm_map[pvc_device]
|
|
LOG.info(_("Update local port %s with device id %s"),
|
|
local_id, local_device_id)
|
|
self.local.set_port_device_id(local_port,
|
|
local_device_id)
|
|
else:
|
|
LOG.info(_("Unable to update local port %s. Local "
|
|
"instance for PowerVC %s can not be found"),
|
|
local_id, pvc_device)
|
|
# Do any of the other fields in the ports need to be updated
|
|
result = utils.compare_ports(local_port, pvc_port, db_port, target)
|
|
if result:
|
|
if result == LOCAL_OS:
|
|
self.local.update_port(local_port, pvc_port)
|
|
update_data = utils.gen_port_update_data(pvc_port)
|
|
else:
|
|
self.pvc.update_port(pvc_port, local_port)
|
|
update_data = utils.gen_port_update_data(local_port)
|
|
self.db.set_port_update_data(db_port, update_data)
|
|
|
|
def _sync_deleting_ports(self):
|
|
db_delete_list = self.db.get_ports(constants.STATUS_DELETING)
|
|
for db_port in db_delete_list:
|
|
pvc_id, local_id = utils.extract_ids_from_entry(db_port)
|
|
if pvc_id and local_id:
|
|
self.db.fix_incorrect_state(db_port)
|
|
continue
|
|
if pvc_id:
|
|
pvc_port = self.pvc.get_port(pvc_id)
|
|
if not pvc_port:
|
|
self.db.delete_port(db_port)
|
|
continue
|
|
device_id = pvc_port.get('device_id')
|
|
if device_id and len(device_id) > 0:
|
|
LOG.info(_("PVC port %s can not be deleted. Port is "
|
|
"in-use by VM %s."), pvc_id, device_id)
|
|
LOG.info(_("Recreate the local port to prevent this IP "
|
|
"address from being used by another instance."))
|
|
new_port = self.local.create_port(pvc_port)
|
|
if new_port:
|
|
# Update the database entry with new port uuid
|
|
self.db.set_port_local_id(db_port, new_port.get('id'))
|
|
continue
|
|
self.pvc.delete_port(pvc_id)
|
|
pvc_port = self.pvc.get_port(pvc_id)
|
|
if pvc_port:
|
|
continue
|
|
self.db.delete_port(db_port)
|
|
if local_id:
|
|
local_port = self.local.get_port(local_id)
|
|
if not local_port:
|
|
self.db.delete_port(db_port)
|
|
continue
|
|
self._delete_local_port(local_port, db_port)
|
|
continue
|
|
|
|
def _sync_creating_ports(self, pvc_ports, local_ports):
|
|
db_create_list = self.db.get_ports(constants.STATUS_CREATING)
|
|
for db_port in db_create_list:
|
|
pvc_id, local_id = utils.extract_ids_from_entry(db_port)
|
|
if pvc_id:
|
|
pvc_port = pvc_ports.get(pvc_id)
|
|
local_port = self.local.create_port(pvc_port)
|
|
if local_port:
|
|
local_id = local_port.get('id')
|
|
self.db.set_port_local_id(db_port, local_id)
|
|
continue
|
|
if local_id:
|
|
local_port = local_ports.get(local_id)
|
|
# Determine which instance owns this port
|
|
device_id = local_port.get('device_id')
|
|
if not self.local.is_instance_on_power(device_id):
|
|
# Create a port on PVC if this is a local instance,
|
|
# so PVC won't use its IP address.
|
|
pvc_port = self.pvc.create_port(local_port)
|
|
if pvc_port:
|
|
pvc_id = pvc_port.get('id')
|
|
self.db.set_port_pvc_id(db_port, pvc_id)
|
|
continue
|
|
|
|
def set_device_id_on_port_by_pvc_instance_uuid(self,
|
|
db_api,
|
|
device_id,
|
|
pvc_ins_uuid):
|
|
"""
|
|
Query the ports by pvc instance uuid, and set its
|
|
local instance id(device_id).
|
|
"""
|
|
local_ids = []
|
|
pvc_ports = self.pvc.get_ports_by_instance_uuid(pvc_ins_uuid)
|
|
if pvc_ports and len(pvc_ports) > 0:
|
|
for pvc_port in pvc_ports:
|
|
pvc_id = pvc_port.get('id')
|
|
# Can't use self.db because of thread sync. issue,
|
|
# so passed in one from the caller.
|
|
db_port = db_api.get_port(pvc_id=pvc_id)
|
|
if not db_port:
|
|
LOG.debug(_("No db_port found: %s"), pvc_id)
|
|
continue
|
|
local_id = db_port.get('local_id')
|
|
if not local_id:
|
|
LOG.debug(_("No local_port_id found: %s"), pvc_id)
|
|
continue
|
|
local_port = self.local.get_port(local_id)
|
|
if not local_port:
|
|
LOG.debug(_("No local_port found: %s"), pvc_id)
|
|
continue
|
|
self.local.set_port_device_id(local_port, device_id)
|
|
local_ids.append(local_id)
|
|
LOG.debug(_("Set device_id for %s with %s"), pvc_id, device_id)
|
|
return local_ids
|
|
|
|
def _process_event_queue(self):
|
|
"""
|
|
Main loop for the agent
|
|
"""
|
|
while not self.end_thread:
|
|
try:
|
|
# Perform a full synchronization of all neutron objects
|
|
self._synchronize()
|
|
except Exception as e:
|
|
LOG.exception(_("Error during synchronize: %s"), e)
|
|
# We don't want to kill the agent on a sync-error. Continue
|
|
# running and retry the operation when the polling interval
|
|
# wait_time time has elapsed.
|
|
self.retry_sync = time.time() + self.polling_interval
|
|
pass
|
|
|
|
# Process events while waiting the polling interval
|
|
while (time.time() < self.retry_sync or not self.event_q.empty()):
|
|
event = None
|
|
try:
|
|
wait = self.retry_sync - time.time()
|
|
if wait <= 0:
|
|
wait = 1
|
|
event = self.event_q.get(True, wait)
|
|
except Queue.Empty:
|
|
LOG.info(_("No events posted"))
|
|
pass
|
|
except Exception as e:
|
|
LOG.exception(_("Error while waiting for event: %s"), e)
|
|
return
|
|
if self.end_thread:
|
|
LOG.info(_("Event thread signaled to end"))
|
|
return
|
|
if event:
|
|
try:
|
|
self.event_q.task_done()
|
|
LOG.info(_("Event received: %s"), event)
|
|
self._handle_event(event)
|
|
except Exception as e:
|
|
LOG.exception(_("Error handling event: %s"), e)
|
|
# We don't want to kill the agent if an error occurs
|
|
# handling an event
|
|
pass
|
|
|
|
def daemon_loop(self):
|
|
# Start a thread here to process the event queue. If the event queue
|
|
# is called from the main thread, incoming RPC requests are delayed
|
|
# until the full sync is done. We could have dropped the event queue
|
|
# wait time and added a small sleep() to the Queue.Empty exception,
|
|
# but this would cause RPC events to be delayed until this occurs.
|
|
t = threading.Thread(target=self._process_event_queue)
|
|
t.setDaemon(True)
|
|
t.start()
|
|
|
|
# While the worker thread is alive, sleep
|
|
while t.isAlive():
|
|
try:
|
|
time.sleep(self.polling_interval)
|
|
except KeyboardInterrupt:
|
|
LOG.info(_("Waiting for worker thread to end"))
|
|
self.end_thread = True
|
|
event = {}
|
|
event[constants.EVENT_TYPE] = constants.EVENT_END_THREAD
|
|
self.event_q.put(event)
|
|
t.join(self.polling_interval)
|
|
LOG.info(_("Worker thread is dead. Exiting"))
|
|
|
|
|
|
def main():
|
|
try:
|
|
LOG.info(_("-" * 80))
|
|
LOG.info(_("Agent initializing... "))
|
|
agent = PowerVCNeutronAgent()
|
|
|
|
# Start everything.
|
|
LOG.info(_("Agent running... "))
|
|
agent.daemon_loop()
|
|
|
|
except Exception as e:
|
|
LOG.exception(_("Exception occurred in agent: %s"), e)
|
|
|
|
finally:
|
|
# Use hard exit here so that QPID threads will be killed
|
|
LOG.info(_("Agent exiting..."))
|
|
os._exit(os.EX_OK)
|