Create a DAG for scheduling data pipelines and scripts on remote server using SSH (with Outlook Email and Microsoft Teams Notifications)

 Prerequisites:

  1. The IP address of the source should be allowed to SSH into the remote server.

  2. The Zetaris API connection should be up and running

  3. Port 587 should be open in the Airflow Ubuntu box for SMTP

Steps:

  1. Create a DAG python file to inform airflow that we are trying to schedule testpy.py
    cd /home/zetaris/airflow/dags 2touch dag_1.py 3chmod 775 dag_1.py

    The content inside the dag_1.py file is pasted below:

    The interval we have mentioned here is ‘@once’ but we can change it as per your requirements.

     #step1 - Importing Modules
    from airflow import DAG
    from datetime import datetime, timedelta
    from airflow.operators.bash import BashOperator
    from airflow.operators.python import PythonOperator
    from airflow.providers.ssh.operators.ssh import SSHOperator
    from airflow.contrib.hooks.ssh_hook import SSHHook
    from airflow.operators.email_operator import EmailOperator
    from ms_teams_webhook_operator import MSTeamsWebhookOperator
    import logging

    def on_failure(context):

        dag_id = context['dag_run'].dag_id

        task_id = context['task_instance'].task_id
        context['task_instance'].xcom_push(key=dag_id, value=True)

        logs_url = "http://<ubuntu_public_ip>:8080/log?dag_id={}&task_id={}&execution_date={}".format(
             dag_id, task_id, context['ts'])

        teams_notification = MSTeamsWebhookOperator(
            task_id="msteams_notify_failure", trigger_rule="all_done",
            message="`{}` has failed on task: `{}`".format(dag_id, task_id),
            button_text="View log", button_url=logs_url,
            theme_color="FF0000", http_conn_id='msteams_webhook_url')
        teams_notification.execute(context)

    #step2 - Define Default Arguments
    default_args={
        'Owner':'airflow',
        'depends_on_past':False,
        'start_date':datetime(2022,4,11),
        'retries':0,
            'email': [<list_of_emails>],
            'email_on_failure': True,
            'email_on_success':True,
            'on_failure_callback': on_failure
    }

    #step3 - Instatiate the DAG
    dag=DAG(dag_id='DAG-1',default_args=default_args,catchup=False,schedule_interval='@daily')

    #step4 - Define Tasks
    #start=DummyOperator(task_id='start',dag = dag)
    #end=DummyOperator(task_id='end',dag=dag)
    #t1 = BashOperator(
    #    task_id='testairflow',
    #    bash_command='python3 /home/zetaris/airflow/dags/testpy.py',
    #    dag=dag)

    sshHook = SSHHook('ssh_dq')
    python_command_1 = "python3.9 /home/zetaris/refresh_script/dq_business_rules.py"
    python_command_2 = "python3.9 /home/zetaris/refresh_script/dq_exception_file-bkp.py"

    t2 = SSHOperator(
        ssh_hook=sshHook,
        task_id='dq_business_rules_script',
        command=python_command_1,
        dag=dag)

    t3 = SSHOperator(
        ssh_hook=sshHook,
        task_id='dq_exception_script',
        command=python_command_2,
        dag=dag)

    t2_email = EmailOperator(
            task_id='send_t2_email',
            to=[<list_of_emails>],
            subject='DQ Execution Update',
            html_content=""" <h3>DQ Business Rules Script Email Notification - Success</h3> """,
            dag=dag
    )

    t3_email = EmailOperator(
            task_id='send_t3_email',
            to=[<list_of_emails>],
            subject='DQ Execution Update',
            html_content=""" <h3>DQ Exception Script Email Notification - Success</h3> """,
            dag=dag
    )

    #step5 - Define Dependencies
    t2 >> t2_email >> t3 >> t3_email

     Image: Dependencies
  2. Add a connection on the airflow UI
    1. On the UI, click on Admin and click on connections from the dropdown list
    2. Click on Add and add the respective parameters as shown below in the image

  3. Setting up Outlook Email notifications
    1. Create an application password on google account by visiting the following link: https://myaccount.google.com/apppasswords
    2. Click Select app and choose the app you are using
    3. Click Select device and choose the device you’re using
    4. Select Generate. App password generated ( 16 character code )
    5. Select Done. (Copy the password before saving it)
  4. Change the SMTP configurations in airflow.cfg file:
    cd /home/zetaris/airflow

    vi airflow.cfg (change the below parameters)

    [email]
    email_backend = airflow.utils.email.send_email_smtp

    [smtp]
    smtp_host = smtp.googlemail.com
    smtp_starttls = True
    smtp_ssl = False
    smtp_user = <YOUR_GOOGLE_EMAIL_ADDRESS>
    smtp_password = 16_DIGIT_APP_PASSWORD
    smtp_port = 587
    smtp_mail_from = <YOUR_GOOGLE_EMAIL_ADDRESS> 
  5. Add the ‘.pem’ key of the remote server on the source box
    cd /home/zetaris/airflow/
    Paste the '.pem' key here (in our case: azure_rhel_test_box.pem)
    chmod 600 <key_name>.pem
  6. Setting up Failure Notifications on Microsoft Teams
    1. Configuring webhook in Microsoft teams
    2. Open up teams, browse to the channel you would like to configure messages to be sent to
    3. click the beside the channel name then select Connectors
    4. Search for Incoming Webhook.
    5. Click Configure, provide a webhook name and select Create. A Webhook URL will be generated.
                                  Image: Setting up Webhook on MS Teams
  7. Create two external scripts for generating alerts on Microsoft Teams
    cd /home/zetaris/airflow
    touch ms_teams_webhook_hook.py
    chmod 775 ms_teams_webhook_hook.py
    touch ms_teams_webhook_operator.py
    chmod 775 ms_teams_webhook_operator.py

    The content inside the ms_teams_webhook_hook.py file is pasted below:

     # -*- coding: utf-8 -*-
    #
    # Licensed to the Apache Software Foundation (ASF) under one
    # or more contributor license agreements.  See the NOTICE file
    # distributed with this work for additional information
    # regarding copyright ownership.  The ASF licenses this file
    # to you under the Apache License, Version 2.0 (the
    # "License"); you may not use this file except in compliance
    # with the License.  You may obtain a copy of the License at
    #
    #   http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing,
    # software distributed under the License is distributed on an
    # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    # KIND, either express or implied.  See the License for the
    # specific language governing permissions and limitations
    # under the License.
    #
    from airflow.hooks.http_hook import HttpHook
    from airflow.exceptions import AirflowException


    class MSTeamsWebhookHook(HttpHook):
        """
        This hook allows you to post messages to MS Teams using the Incoming Webhook connector.
        Takes both MS Teams webhook token directly and connection that has MS Teams webhook token.
        If both supplied, the webhook token will be appended to the host in the connection.
        :param http_conn_id: connection that has MS Teams webhook URL
        :type http_conn_id: str
        :param webhook_token: MS Teams webhook token
        :type webhook_token: str
        :param message: The message you want to send on MS Teams
        :type message: str
        :param subtitle: The subtitle of the message to send
        :type subtitle: str
        :param button_text: The text of the action button
        :type button_text: str
        :param button_url: The URL for the action button click
        :type button_url : str
        :param theme_color: Hex code of the card theme, without the #
        :type message: str
        :param proxy: Proxy to use when making the webhook request
        :type proxy: str
        """
        def __init__(self,
                     http_conn_id=None,
                     webhook_token=None,
                     message="",
                     subtitle="",
                     button_text="",
                     button_url="",
                     theme_color="00FF00",
                     proxy=None,
                     *args,
                     **kwargs
                     ):
            super(MSTeamsWebhookHook, self).__init__(*args, **kwargs)
            self.http_conn_id = http_conn_id
            self.webhook_token = self.get_token(webhook_token, http_conn_id)
            self.message = message
            self.subtitle = subtitle
            self.button_text = button_text
            self.button_url = button_url
            self.theme_color = theme_color
            self.proxy = proxy

        def get_proxy(self, http_conn_id):
            conn = self.get_connection(http_conn_id)
            extra = conn.extra_dejson
            print(extra)
            return extra.get("proxy", '')

        def get_token(self, token, http_conn_id):
            """
            Given either a manually set token or a conn_id, return the webhook_token to use
            :param token: The manually provided token
            :param conn_id: The conn_id provided
            :return: webhook_token (str) to use
            """
            if token:
                return token
            elif http_conn_id:
                conn = self.get_connection(http_conn_id)
                extra = conn.extra_dejson
                return extra.get('webhook_token', '')
            else:
                raise AirflowException('Cannot get URL: No valid MS Teams '
                                       'webhook URL nor conn_id supplied')

        def build_message(self):
            cardjson = """
                   
                            ]
                        }}
                    ]
                }}]
                }}
                    """
            return cardjson.format(self.message, self.message, self.subtitle, self.theme_color,
                                   self.button_text, self.button_url)

        def execute(self):
            """
            Remote Popen (actually execute the webhook call)
            :param cmd: command to remotely execute
            :param kwargs: extra arguments to Popen (see subprocess.Popen)
            """
            proxies = {}
            proxy_url = self.get_proxy(self.http_conn_id)
            print("Proxy is : " + proxy_url)
            if len(proxy_url) > 5:
                proxies = {'https': proxy_url}

            self.run(endpoint=self.webhook_token,
                     data=self.build_message(),
                     headers={'Content-type': 'application/json'},
                     extra_options={'proxies': proxies})

    The content inside the ms_teams_webhook_operator.py file is pasted below:

     # -*- coding: utf-8 -*-
    #
    # Licensed to the Apache Software Foundation (ASF) under one
    # or more contributor license agreements.  See the NOTICE file
    # distributed with this work for additional information
    # regarding copyright ownership.  The ASF licenses this file
    # to you under the Apache License, Version 2.0 (the
    # "License"); you may not use this file except in compliance
    # with the License.  You may obtain a copy of the License at
    #
    #   http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing,
    # software distributed under the License is distributed on an
    # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    # KIND, either express or implied.  See the License for the
    # specific language governing permissions and limitations
    # under the License.
    #
    from airflow.operators.http_operator import SimpleHttpOperator
    from airflow.utils.decorators import apply_defaults
    from ms_teams_webhook_hook import MSTeamsWebhookHook
    import logging


    class MSTeamsWebhookOperator(SimpleHttpOperator):
        """
        This operator allows you to post messages to MS Teams using the Incoming Webhooks connector.
        Takes both MS Teams webhook token directly and connection that has MS Teams webhook token.
        If both supplied, the webhook token will be appended to the host in the connection.
        :param http_conn_id: connection that has MS Teams webhook URL
        :type http_conn_id: str
        :param webhook_token: MS Teams webhook token
        :type webhook_token: str
        :param message: The message you want to send on MS Teams
        :type message: str
        :param subtitle: The subtitle of the message to send
        :type subtitle: str
        :param button_text: The text of the action button
        :type button_text: str
        :param button_url: The URL for the action button click
        :type button_url : str
        :param theme_color: Hex code of the card theme, without the #
        :type message: str
        :param proxy: Proxy to use when making the webhook request
        :type proxy: str
        """

        template_fields = ('message', 'subtitle',)

        @apply_defaults
        def __init__(self,
                     http_conn_id=None,
                     webhook_token=None,
                     message="",
                     subtitle="",
                     button_text="",
                     button_url="",
                     theme_color="00FF00",
                     proxy=None,
                     *args,
                     **kwargs):

            super(MSTeamsWebhookOperator, self).__init__(endpoint=webhook_token, *args, **kwargs)
            self.http_conn_id = http_conn_id
            self.webhook_token = webhook_token
            self.message = message
            self.subtitle = subtitle
            self.button_text = button_text
            self.button_url = button_url
            self.theme_color = theme_color
            self.proxy = proxy
            self.hook = None

        def execute(self, context):
            """
            Call the SparkSqlHook to run the provided sql query
            """
            self.hook = MSTeamsWebhookHook(
                self.http_conn_id,
                self.webhook_token,
                self.message,
                self.subtitle,
                self.button_text,
                self.button_url,
                self.theme_color,
                self.proxy
            )
            self.hook.execute()
            logging.info("Webhook request sent to MS Teams")
  8. Add a connection on the Airflow UI

    1. On the Airflow UI, click on Admin and click on connections from the dropdown list

    2. Click on Add and add the respective parameters as shown below in the image

      Adding Connection on Airflow UI

  9. Check if the updated DAG is reflected on the Airflow UI:

    1. Refresh the Airflow UI

    2. The updated DAG should appear on the top and follow the same steps as previously mentioned to run and check the logs.

  10. Trigger the DAG from the airflow UI and you should receive an email for failure or success.

    Image: Failure Email
     
    Image: Success Email
    Image: Teams Failure Notification
     
  11. Running Webserver and Scheduler Overnight

    If you wish to run the webserver and scheduler overnight, use the below commands to start the webserver and the scheduler.

    nohup airflow webserver &
    <Press Enter Key>
    nohup airflow scheduler &