Create a DAG for scheduling data pipeline on Zetaris Lightning UI


  • The pipeline should be created on the environment for which we are using the API for.
    Image: Data Pipeline on Zetaris UI
  • IP Address of the ubuntu box should have access to the API VM.

  1. Create a DAG python file to inform airflow that we are trying to schedule the pipeline. To connect to the pipeline we are using the Zetaris API.
    cd /home/zetaris/airflow/dags


    chmod 775
    The content inside the file is pasted below:
    • Please ensure URL is updated to reflect your API environment
    #step1 - Importing Modules
    from airflow import DAG
    from datetime import datetime, timedelta
    from airflow.operators.bash import BashOperator
    from airflow.operators.email_operator import EmailOperator
    from airflow.operators.python import PythonOperator
    from airflow.providers.ssh.operators.ssh import SSHOperator
    from airflow.contrib.hooks.ssh_hook import SSHHook
    import requests
    import json
    from getpass import getpass

    def get_encrypted_pass ():
      password=<password> #Password is used after decrypting the exncrypted password and not used as plain text
      obj = {
      response ='',params=obj)
      return response.text

    def get_bearer_token ():
      username = <email> #Email is used after decrypting the exncrypted text and not used as plain text
      obj = {
      response ='',data=obj)
      response = json.loads(response.text)
        return response['bearer_token']

    bearer_token = get_bearer_token()

    def runQuery(query : str, r=False):
            "Authorization": "Bearer " + bearer_token


      response ='', data=obj, headers=auth)
        if response.get('data'):



                "close_query": "true"


  '', data=close_obj, headers=auth)


          data = response['data']['data']
            return data

    def SelectQuery():
      select_query="select * from crash.test1"   #Select call to the pipeline
        return runQuery(select_query,True)

    #step2 - Define Default Arguments



    #step3 - Instatiate the DAG

    #step4 - Define Tasks
    #start=DummyOperator(task_id='start',dag = dag)
    t1 = PythonOperator(task_id = "main_etl", 

                        python_callable = SelectQuery,

                        dag = dag)

    #step5 - Define Dependencies
    #start >> end
  2. If you wish to run the webserver and scheduler overnight, use the below commands to start the webserver and the scheduler.
    nohup airflow webserver &
    <Press Enter Key>
    nohup airflow scheduler &