Prerequisites:
-
The IP address of the source should be allowed to SSH into the remote server.
-
The Zetaris API connection should be up and running
-
Port 587 should be open in the Airflow Ubuntu box for SMTP
Steps:
- Create a DAG python file to inform airflow that we are trying to schedule testpy.py
cd /home/zetaris/airflow/dags 2touch dag_1.py 3chmod 775 dag_1.py
The content inside the dag_1.py file is pasted below:
The interval we have mentioned here is ‘@once’ but we can change it as per your requirements.
#step1 - Importing Modules
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.operators.email_operator import EmailOperator
from ms_teams_webhook_operator import MSTeamsWebhookOperator
import logging
def on_failure(context):
dag_id = context['dag_run'].dag_id
task_id = context['task_instance'].task_id
context['task_instance'].xcom_push(key=dag_id, value=True)
logs_url = "http://<ubuntu_public_ip>:8080/log?dag_id={}&task_id={}&execution_date={}".format(
dag_id, task_id, context['ts'])
teams_notification = MSTeamsWebhookOperator(
task_id="msteams_notify_failure", trigger_rule="all_done",
message="`{}` has failed on task: `{}`".format(dag_id, task_id),
button_text="View log", button_url=logs_url,
theme_color="FF0000", http_conn_id='msteams_webhook_url')
teams_notification.execute(context)
#step2 - Define Default Arguments
default_args={
'Owner':'airflow',
'depends_on_past':False,
'start_date':datetime(2022,4,11),
'retries':0,
'email': [<list_of_emails>],
'email_on_failure': True,
'email_on_success':True,
'on_failure_callback': on_failure
}
#step3 - Instatiate the DAG
dag=DAG(dag_id='DAG-1',default_args=default_args,catchup=False,schedule_interval='@daily')
#step4 - Define Tasks
#start=DummyOperator(task_id='start',dag = dag)
#end=DummyOperator(task_id='end',dag=dag)
#t1 = BashOperator(
# task_id='testairflow',
# bash_command='python3 /home/zetaris/airflow/dags/testpy.py',
# dag=dag)
sshHook = SSHHook('ssh_dq')
python_command_1 = "python3.9 /home/zetaris/refresh_script/dq_business_rules.py"
python_command_2 = "python3.9 /home/zetaris/refresh_script/dq_exception_file-bkp.py"
t2 = SSHOperator(
ssh_hook=sshHook,
task_id='dq_business_rules_script',
command=python_command_1,
dag=dag)
t3 = SSHOperator(
ssh_hook=sshHook,
task_id='dq_exception_script',
command=python_command_2,
dag=dag)
t2_email = EmailOperator(
task_id='send_t2_email',
to=[<list_of_emails>],
subject='DQ Execution Update',
html_content=""" <h3>DQ Business Rules Script Email Notification - Success</h3> """,
dag=dag
)
t3_email = EmailOperator(
task_id='send_t3_email',
to=[<list_of_emails>],
subject='DQ Execution Update',
html_content=""" <h3>DQ Exception Script Email Notification - Success</h3> """,
dag=dag
)
#step5 - Define Dependencies
t2 >> t2_email >> t3 >> t3_emailImage: Dependencies - Add a connection on the airflow UI
- On the UI, click on Admin and click on connections from the dropdown list
- Click on Add and add the respective parameters as shown below in the image
- Setting up Outlook Email notifications
- Create an application password on google account by visiting the following link: https://myaccount.google.com/apppasswords
- Click Select app and choose the app you are using
- Click Select device and choose the device you’re using
- Select Generate. App password generated ( 16 character code )
- Select Done. (Copy the password before saving it)
- Create an application password on google account by visiting the following link: https://myaccount.google.com/apppasswords
- Change the SMTP configurations in airflow.cfg file:
cd /home/zetaris/airflow
vi airflow.cfg (change the below parameters)
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
smtp_host = smtp.googlemail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = <YOUR_GOOGLE_EMAIL_ADDRESS>
smtp_password = 16_DIGIT_APP_PASSWORD
smtp_port = 587
smtp_mail_from = <YOUR_GOOGLE_EMAIL_ADDRESS> - Add the ‘.pem’ key of the remote server on the source box
cd /home/zetaris/airflow/
Paste the '.pem' key here (in our case: azure_rhel_test_box.pem)
chmod 600 <key_name>.pem - Setting up Failure Notifications on Microsoft Teams
- Configuring webhook in Microsoft teams
- Open up teams, browse to the channel you would like to configure messages to be sent to
- click the
beside the channel name then select Connectors
- Search for Incoming Webhook.
- Click Configure, provide a webhook name and select Create. A Webhook URL will be generated.
Image: Setting up Webhook on MS Teams
- Create two external scripts for generating alerts on Microsoft Teams
cd /home/zetaris/airflow
touch ms_teams_webhook_hook.py
chmod 775 ms_teams_webhook_hook.py
touch ms_teams_webhook_operator.py
chmod 775 ms_teams_webhook_operator.pyThe content inside the ms_teams_webhook_hook.py file is pasted below:
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from airflow.hooks.http_hook import HttpHook
from airflow.exceptions import AirflowException
class MSTeamsWebhookHook(HttpHook):
"""
This hook allows you to post messages to MS Teams using the Incoming Webhook connector.
Takes both MS Teams webhook token directly and connection that has MS Teams webhook token.
If both supplied, the webhook token will be appended to the host in the connection.
:param http_conn_id: connection that has MS Teams webhook URL
:type http_conn_id: str
:param webhook_token: MS Teams webhook token
:type webhook_token: str
:param message: The message you want to send on MS Teams
:type message: str
:param subtitle: The subtitle of the message to send
:type subtitle: str
:param button_text: The text of the action button
:type button_text: str
:param button_url: The URL for the action button click
:type button_url : str
:param theme_color: Hex code of the card theme, without the #
:type message: str
:param proxy: Proxy to use when making the webhook request
:type proxy: str
"""
def __init__(self,
http_conn_id=None,
webhook_token=None,
message="",
subtitle="",
button_text="",
button_url="",
theme_color="00FF00",
proxy=None,
*args,
**kwargs
):
super(MSTeamsWebhookHook, self).__init__(*args, **kwargs)
self.http_conn_id = http_conn_id
self.webhook_token = self.get_token(webhook_token, http_conn_id)
self.message = message
self.subtitle = subtitle
self.button_text = button_text
self.button_url = button_url
self.theme_color = theme_color
self.proxy = proxy
def get_proxy(self, http_conn_id):
conn = self.get_connection(http_conn_id)
extra = conn.extra_dejson
print(extra)
return extra.get("proxy", '')
def get_token(self, token, http_conn_id):
"""
Given either a manually set token or a conn_id, return the webhook_token to use
:param token: The manually provided token
:param conn_id: The conn_id provided
:return: webhook_token (str) to use
"""
if token:
return token
elif http_conn_id:
conn = self.get_connection(http_conn_id)
extra = conn.extra_dejson
return extra.get('webhook_token', '')
else:
raise AirflowException('Cannot get URL: No valid MS Teams '
'webhook URL nor conn_id supplied')
def build_message(self):
cardjson = """
]
}}
]
}}]
}}
"""
return cardjson.format(self.message, self.message, self.subtitle, self.theme_color,
self.button_text, self.button_url)
def execute(self):
"""
Remote Popen (actually execute the webhook call)
:param cmd: command to remotely execute
:param kwargs: extra arguments to Popen (see subprocess.Popen)
"""
proxies = {}
proxy_url = self.get_proxy(self.http_conn_id)
print("Proxy is : " + proxy_url)
if len(proxy_url) > 5:
proxies = {'https': proxy_url}
self.run(endpoint=self.webhook_token,
data=self.build_message(),
headers={'Content-type': 'application/json'},
extra_options={'proxies': proxies})The content inside the ms_teams_webhook_operator.py file is pasted below:
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from ms_teams_webhook_hook import MSTeamsWebhookHook
import logging
class MSTeamsWebhookOperator(SimpleHttpOperator):
"""
This operator allows you to post messages to MS Teams using the Incoming Webhooks connector.
Takes both MS Teams webhook token directly and connection that has MS Teams webhook token.
If both supplied, the webhook token will be appended to the host in the connection.
:param http_conn_id: connection that has MS Teams webhook URL
:type http_conn_id: str
:param webhook_token: MS Teams webhook token
:type webhook_token: str
:param message: The message you want to send on MS Teams
:type message: str
:param subtitle: The subtitle of the message to send
:type subtitle: str
:param button_text: The text of the action button
:type button_text: str
:param button_url: The URL for the action button click
:type button_url : str
:param theme_color: Hex code of the card theme, without the #
:type message: str
:param proxy: Proxy to use when making the webhook request
:type proxy: str
"""
template_fields = ('message', 'subtitle',)
@apply_defaults
def __init__(self,
http_conn_id=None,
webhook_token=None,
message="",
subtitle="",
button_text="",
button_url="",
theme_color="00FF00",
proxy=None,
*args,
**kwargs):
super(MSTeamsWebhookOperator, self).__init__(endpoint=webhook_token, *args, **kwargs)
self.http_conn_id = http_conn_id
self.webhook_token = webhook_token
self.message = message
self.subtitle = subtitle
self.button_text = button_text
self.button_url = button_url
self.theme_color = theme_color
self.proxy = proxy
self.hook = None
def execute(self, context):
"""
Call the SparkSqlHook to run the provided sql query
"""
self.hook = MSTeamsWebhookHook(
self.http_conn_id,
self.webhook_token,
self.message,
self.subtitle,
self.button_text,
self.button_url,
self.theme_color,
self.proxy
)
self.hook.execute()
logging.info("Webhook request sent to MS Teams") -
Add a connection on the Airflow UI
-
On the Airflow UI, click on Admin and click on connections from the dropdown list
-
Click on Add and add the respective parameters as shown below in the image
Adding Connection on Airflow UI
-
-
Check if the updated DAG is reflected on the Airflow UI:
-
Refresh the Airflow UI
-
The updated DAG should appear on the top and follow the same steps as previously mentioned to run and check the logs.
-
-
Trigger the DAG from the airflow UI and you should receive an email for failure or success.
Image: Failure EmailImage: Success EmailImage: Teams Failure Notification -
Running Webserver and Scheduler Overnight
If you wish to run the webserver and scheduler overnight, use the below commands to start the webserver and the scheduler.
nohup airflow webserver &
<Press Enter Key>
nohup airflow scheduler &