Create a DAG for scheduling data pipeline on Zetaris Lightning UI

 Prerequisites:

  • The pipeline should be created on the environment for which we are using the API for.
    Image: Data Pipeline on Zetaris UI
  • IP Address of the ubuntu box should have access to the API VM.

  1. Create a DAG python file to inform airflow that we are trying to schedule the pipeline. To connect to the pipeline we are using the Zetaris API.
    cd /home/zetaris/airflow/dags

    touch dag_2.py

    chmod 775 dag_2.py
    The content inside the dag_2.py file is pasted below:
    • Please ensure URL is updated to reflect your API environment
    #step1 - Importing Modules
    from airflow import DAG
    from datetime import datetime, timedelta
    from airflow.operators.bash import BashOperator
    from airflow.operators.email_operator import EmailOperator
    from airflow.operators.python import PythonOperator
    from airflow.providers.ssh.operators.ssh import SSHOperator
    from airflow.contrib.hooks.ssh_hook import SSHHook
    import requests
    import json
    from getpass import getpass

    def get_encrypted_pass ():
      password=<password> #Password is used after decrypting the exncrypted password and not used as plain text
      obj = {
          'target':password
      }
      response = requests.post(url='https://apiservice.trial.enterprise.zetaris.com:8443/enc',params=obj)
      return response.text

    def get_bearer_token ():
      encrypted_password=get_encrypted_pass()
      username = <email> #Email is used after decrypting the exncrypted text and not used as plain text
      obj = {
          "password":encrypted_password,
          "username":username
      }
      response = requests.post(url='https://apiservice.trial.enterprise.zetaris.com:8443/login',data=obj)
      response = json.loads(response.text)
        return response['bearer_token']

    bearer_token = get_bearer_token()

    def runQuery(query : str, r=False):
      print(query+"\n")
      obj={
          "sql":query,
      }
      auth={
            "Authorization": "Bearer " + bearer_token

        }

      response = requests.post(url='https://apiservice.trial.enterprise.zetaris.com/query', data=obj, headers=auth)
      print(response.text+"\n---------------------------------------------------\n")
      response=json.loads(response.text)
        if response.get('data'):

            querytoken=response['data']['query_token']

            close_obj={

              "query_token":querytoken,
                "close_query": "true"

            }

            requests.post(url='https://apiservice.trial.enterprise.zetaris.com/closequery', data=close_obj, headers=auth)

        if(r):

          data = response['data']['data']
            return data

    def SelectQuery():
      select_query="select * from crash.test1"   #Select call to the pipeline
        return runQuery(select_query,True)

    #step2 - Define Default Arguments

    default_args={
      'Owner':'airflow',
      'depends_on_past':False,
      'start_date':datetime(2022,3,11),
        'retries':0

    }

    #step3 - Instatiate the DAG
    dag=DAG(dag_id='DAG-2',default_args=default_args,catchup=False,schedule_interval='@once')

    #step4 - Define Tasks
    #start=DummyOperator(task_id='start',dag = dag)
    #end=DummyOperator(task_id='end',dag=dag)
    t1 = PythonOperator(task_id = "main_etl", 

                        python_callable = SelectQuery,

                        dag = dag)

    #step5 - Define Dependencies
    #start >> end
  2. If you wish to run the webserver and scheduler overnight, use the below commands to start the webserver and the scheduler.
    nohup airflow webserver &
    <Press Enter Key>
    nohup airflow scheduler &