diff --git a/shipyard_airflow/dags/samples/openstack_api_call.py b/shipyard_airflow/dags/samples/openstack_api_call.py index 8a936771..e9a4ab2b 100644 --- a/shipyard_airflow/dags/samples/openstack_api_call.py +++ b/shipyard_airflow/dags/samples/openstack_api_call.py @@ -17,13 +17,13 @@ import airflow from airflow import DAG from airflow.operators import OpenStackOperator -from airflow.operators.bash_operator import BashOperator from datetime import timedelta + default_args = { 'owner': 'airflow', 'depends_on_past': False, - 'start_date': airflow.utils.dates.days_ago(2), + 'start_date': airflow.utils.dates.days_ago(1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, @@ -33,41 +33,33 @@ default_args = { dag = DAG('openstack_cli', default_args=default_args, schedule_interval=None) -# print_date -t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag) +# Location of shiyard.conf +config_path = '/usr/local/airflow/plugins/shipyard.conf' -# Note that the openrc.sh file needs to be placed on a volume that can be -# accessed by the containers +# Note that the shipyard.conf file needs to be placed on a volume +# that can be accessed by the containers # openstack endpoint list -t2 = OpenStackOperator( +t1 = OpenStackOperator( task_id='endpoint_list_task', - openrc_file='/usr/local/airflow/dags/openrc.sh', + shipyard_conf=config_path, openstack_command=['openstack', 'endpoint', 'list'], dag=dag) # openstack service list -t3 = OpenStackOperator( +t2 = OpenStackOperator( task_id='service_list_task', - openrc_file='/usr/local/airflow/dags/openrc.sh', + shipyard_conf=config_path, openstack_command=['openstack', 'service', 'list'], dag=dag) # openstack server list -t4 = OpenStackOperator( +t3 = OpenStackOperator( task_id='server_list_task', - openrc_file='/usr/local/airflow/dags/openrc.sh', + shipyard_conf=config_path, openstack_command=['openstack', 'server', 'list'], dag=dag) -# openstack network list -t5 = OpenStackOperator( - task_id='network_list_task', - openrc_file='/usr/local/airflow/dags/openrc.sh', - openstack_command=['openstack', 'network', 'list'], - dag=dag) t2.set_upstream(t1) -t3.set_upstream(t1) -t4.set_upstream(t1) -t5.set_upstream(t1) +t3.set_upstream(t2) diff --git a/shipyard_airflow/plugins/openstack_operators.py b/shipyard_airflow/plugins/openstack_operators.py index caea688c..93df3bc4 100644 --- a/shipyard_airflow/plugins/openstack_operators.py +++ b/shipyard_airflow/plugins/openstack_operators.py @@ -14,6 +14,8 @@ import logging import subprocess +import os +import configparser from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -24,36 +26,37 @@ from airflow.utils.decorators import apply_defaults class OpenStackOperator(BaseOperator): """ Performs OpenStack CLI calls - :openrc_file: Path of the openrc file + :shipyard_conf: Location of shipyard.conf :openstack_command: The OpenStack command to be executed """ - @apply_defaults def __init__(self, - openrc_file, + shipyard_conf, openstack_command=None, xcom_push=False, - *args, - **kwargs): + *args, **kwargs): super(OpenStackOperator, self).__init__(*args, **kwargs) - self.openrc_file = openrc_file + self.shipyard_conf = shipyard_conf self.openstack_command = openstack_command self.xcom_push_flag = xcom_push def execute(self, context): logging.info("Running OpenStack Command: %s", self.openstack_command) - # Emulate "source" in bash. Sets up environment variables. - pipe = subprocess.Popen( - ". %s; env" % self.openrc_file, stdout=subprocess.PIPE, shell=True) - data = pipe.communicate()[0] - os_env = dict((line.split("=", 1) for line in data.splitlines())) + # Read and parse shiyard.conf + config = configparser.ConfigParser() + config.read(self.shipyard_conf) + + # Construct Envrionment variables + for attr in ('OS_AUTH_URL', 'OS_PROJECT_ID', 'OS_PROJECT_NAME', + 'OS_USER_DOMAIN_NAME', 'OS_USERNAME', 'OS_PASSWORD', + 'OS_REGION_NAME', 'OS_IDENTITY_API_VERSION'): + os.environ[attr] = config.get('keystone', attr) # Execute the OpenStack CLI Command openstack_cli = subprocess.Popen( self.openstack_command, - env=os_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) @@ -65,7 +68,7 @@ class OpenStackOperator(BaseOperator): line = line.strip() logging.info(line) - # Wait for child process to terminate. + # Wait for child process to terminate # Set and return returncode attribute. openstack_cli.wait() logging.info("Command exited with " @@ -74,11 +77,6 @@ class OpenStackOperator(BaseOperator): # Raise Execptions if OpenStack Command Fails if openstack_cli.returncode: raise AirflowException("OpenStack Command Failed") - """ - Push response to an XCom if xcom_push is True - """ - if self.xcom_push_flag: - return line class OpenStackCliPlugin(AirflowPlugin):