Apache Airflow is a platform to programmatically monitor workflows, schedule, and authorize projects.
In this blog, we will discuss handling the workflow of scraping yelp.com with Apache Airflow.
Quick setup of Airflow on ubuntu 20.04 LTS
# make sure your system is up-to-date
sudo apt update
sudo apt upgrade
# install airflow dependencies
sudo apt-get install libmysqlclient-dev
sudo apt-get install libssl-dev
sudo apt-get install libkrb5-dev
# create the virtual env and install the airflow using pip
sudo apt install python3-virtualenv virtualenv airflow_test cd airflow_test/ source bin/activate export AIRFLOW_HOME=~/airflow # set Airflow home pip3 install apache-airflow pip3 install typing_extensions airflow db init # initialize the db
db, unittests, logs, configuration(cfg) files will be generated inside Airflow_Home
# Start a WebServer & Scheduler
airflow webserver -p 8080 # start the webserverairflow scheduler # start the scheduler
By default it is localhost. If you wish to change, you can give the command like this
airflow webserver -H xxx.xxx.xxx.xxx -p 9005
Check the quick installation guide here.
If everything goes well, we can see the apache airflow web interface
http://localhost:8080/admin/ # web-server
Everything in Airflow works as DAGs(Directed acyclic Graphs). We need to create a DAG with a unique dag_id and nest the tasks to that dag_id created. Simply put, DAG is the collection of tasks we want to run. Parameters like schedule_time, start_time, author, and other parameters can also be passed to the DAG object.
Create a folder named dags
inside the Airflow_Home,
the Scheduler will be checking for new DAGs for every 300’s, if any new dags are found — you can see them at web-server.
We are going to create a workflow to scrape yelp.com for business listings & save the data to MongoDB.
The code to be used in this tutorial to scrape the yelp.com can check here.
Creation of DAG
from airflow import DAG from datetime import datetime# dag creation default_args = {'owner': 'turbolab', 'start_date': datetime(2019, 1, 1), 'depends_on_past': False} _yelp_workflow = DAG('_yelp_workflow', catchup=False, schedule_interval=None, default_args=default_args) # creating a DAG
_yelp_workflow
DAG is created. schedule_interval=None
is for manual triggering the DAG. Other options are @daily, @weekly,
“* * * */2 1”
(cron schedule). Know about catchup, depends_on_past
the airflow documentation here.
Task Creation
With the airflow set of operators, we can define tasks of the DAG workflow. An operator describes a single task in a workflow. While DAGs describes how to run a workflow, Operators
determine what actually gets done. To call a python function — PythonOperator
, for an Email — EmailOperator
, for a Bash command — BashOperator
, for a SQL instruction — MySqlOperator
etc.,
Generally, operators run independently with no sharing of information in the order specified. If it absolutely can’t be avoided, Airflow does have a feature for operator cross-communication called XCom.
def url_generator(**kwargs): """ generating the yelp url to find the business listings with place and search_query {'place': 'Location | Address | zip code'} {'search_query': "Restaurants | Breakfast & Brunch | Coffee & Tea | Delivery | Reservations"} """ place = Variable.get("place") search_query = Variable.get("search_query") yelp_url = "https://www.yelp.com/search?find_desc={0}&find_loc={1}".format(search_query,place) return yelp_url"""defining a task""" yelp_url_generator = PythonOperator( task_id='url_generator', python_callable=url_generator, provide_context=True, dag=_yelp_workflow)
Likewise, 6 tasks were created and the concepts like variables
and xcom
are used among the tasks.
Concept of xcom
def get_response(**kwargs): """ validating the url and forwarding the response """ ti = kwargs['ti'] url = ti.xcom_pull(task_ids='url_generator') print('url generated: ', url) headers = {'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chrome/70.0.3538.77 Safari/537.36'} success = False for retry in range(10): response = requests.get(url, verify=False, headers=headers) if response.status_code == 200: success = True break else: print("Response received: %s. Retrying : %s"%(response.status_code, url)) success = False if success == False: print("Failed to process the URL: ", url) raise ValueError("Failed to process the URL: ", url) return responseresponse_generator = PythonOperator( task_id='response_generator', python_callable=get_response, provide_context=True, dag=_yelp_workflow)
url_generator
task returning the yelp_url
has to pass to response_generator
task, where we will be checking the response of the URL. If the status_code of the response is 200
, we are returning — otherwise raising a ValueError
to stop the pipeline.
xcom’s can be viewed at the admin page after the successful task runs.
Concept of variable
This concept is used when the user has to input the values(like command-line arguments in python) to the tasks created.
place = Variable.get("place") search_query = Variable.get("search_query")
These variables place
and search_query
are used in the url_generator
python function of yelp_url_generator
task.
Tasks Relationship/Arrangement
The DAG will make sure that operators run in the correct certain order. Check here.
end_task << validate_db << writing_to_db << validate_data << get_data << response_generator << yelp_url_generator << start_task
airflow upstream arrangement of tasks with start_task and end_task
is dummy tasks(optional). Others yelp_url_generator →response_generator →get_data →validate_data →writing_to_db →validate_db
are python tasks.
Triggering the DAG
Since we kept schedule_interval=None,
we have to manually trigger the DAG. Let’s see how to do that →
Tree View of each DAG run
Handling Cases
You must be wondering why to use this setup of airflow for simple scraping. The reason is,
- We can break down the whole single task into multiple tasks and have control over each task at any point.
- Will have clear logs at every level.
- Can easily connect to other servers with airflow operators to execute the script.
Here are the few cases handled in the work-flow
- When we are trying to write the same set of data into the Database with multiple DAG runs.
task_id=writing_to_db
will be handling this case.
- When the data scraped and pushed to the database doesn’t match.
task_id=validate_db
will be handling this case. In case the anomaly is detected, we will be raising the Value Error
.