How to monitor work-flow of scraping project with Apache-Airflow

Apache Airflow is a platform to programmatically monitor workflows, schedule, and authorize projects.

In this blog, we will discuss handling the workflow of scraping yelp.com with Apache Airflow.

Quick setup of Airflow on ubuntu 20.04 LTS

# make sure your system is up-to-date

sudo apt update
sudo apt upgrade

# install airflow dependencies 

sudo apt-get install libmysqlclient-dev
sudo apt-get install libssl-dev
sudo apt-get install libkrb5-dev

# create the virtual env and install the airflow using pip

sudo apt install python3-virtualenv
virtualenv airflow_test
cd airflow_test/
source bin/activate
export AIRFLOW_HOME=~/airflow # set Airflow home
pip3 install apache-airflow
pip3 install typing_extensions
airflow db init # initialize the db

db, unittests, logs, configuration(cfg) files will be generated inside Airflow_Home

# Start a WebServer & Scheduler

airflow webserver -p 8080 # start the webserver
airflow scheduler # start the scheduler

By default it is localhost. If you wish to change, you can give the command like this

airflow webserver -H xxx.xxx.xxx.xxx -p 9005

Check the quick installation guide here.

If everything goes well, we can see the apache airflow web interface

http://localhost:8080/admin/ # web-server
Airflow WebServer

Everything in Airflow works as DAGs(Directed acyclic Graphs). We need to create a DAG with a unique dag_id and nest the tasks to that dag_id created. Simply put, DAG is the collection of tasks we want to run. Parameters like schedule_time, start_time, author, and other parameters can also be passed to the DAG object.

Create a folder named dags inside the Airflow_Home,the Scheduler will be checking for new DAGs for every 300’s, if any new dags are found — you can see them at web-server.

Airflow Scheduler

We are going to create a workflow to scrape yelp.com for business listings & save the data to MongoDB.

The code to be used in this tutorial to scrape the yelp.com can check here.

Creation of DAG

from airflow import DAG
from datetime import datetime
# dag creation
default_args = {'owner': 'turbolab', 'start_date': datetime(2019, 1, 1), 'depends_on_past': False}
_yelp_workflow = DAG('_yelp_workflow', catchup=False, schedule_interval=None, default_args=default_args) # creating a DAG
DAG Created

_yelp_workflow DAG is created. schedule_interval=None is for manual triggering the DAG. Other options are @daily, @weekly, “* * * */2 1”(cron schedule). Know about catchup, depends_on_past the airflow documentation here.

Task Creation

With the airflow set of operators, we can define tasks of the DAG workflow. An operator describes a single task in a workflow. While DAGs describes how to run a workflow, Operators determine what actually gets done. To call a python function — PythonOperator, for an Email — EmailOperator, for a Bash command — BashOperator, for a SQL instruction — MySqlOperator etc.,

Generally, operators run independently with no sharing of information in the order specified. If it absolutely can’t be avoided, Airflow does have a feature for operator cross-communication called XCom.

def url_generator(**kwargs):
    """ 
    generating the yelp url to find the business listings with place and search_query 
    {'place': 'Location | Address | zip code'}
    {'search_query': "Restaurants | Breakfast & Brunch | Coffee & Tea | Delivery | Reservations"}
    """
    place = Variable.get("place")
    search_query = Variable.get("search_query")
    yelp_url = "https://www.yelp.com/search?find_desc={0}&find_loc={1}".format(search_query,place)
    return yelp_url
"""defining a task"""
yelp_url_generator = PythonOperator(
    task_id='url_generator',
    python_callable=url_generator,
    provide_context=True,
    dag=_yelp_workflow)

Likewise, 6 tasks were created and the concepts like variables and xcom are used among the tasks.

Concept of xcom

def get_response(**kwargs):
    """
    validating the url and forwarding the response
    """
    ti = kwargs['ti']
    url = ti.xcom_pull(task_ids='url_generator')
    print('url generated: ', url)
    headers = {'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chrome/70.0.3538.77 Safari/537.36'}
    success = False
    
    for retry in range(10):
        response = requests.get(url, verify=False, headers=headers)
        if response.status_code == 200:
            success = True
            break
        else:
            print("Response received: %s. Retrying : %s"%(response.status_code, url))
            success = False
    
    if success == False:
        print("Failed to process the URL: ", url)
        raise ValueError("Failed to process the URL: ", url)
    return response
response_generator = PythonOperator(
    task_id='response_generator',
    python_callable=get_response,
    provide_context=True,
    dag=_yelp_workflow)

url_generator task returning the yelp_url has to pass to response_generator task, where we will be checking the response of the URL. If the status_code of the response is 200, we are returning — otherwise raising a ValueError to stop the pipeline.

xcom’s can be viewed at the admin page after the successful task runs.

Concept of variable

This concept is used when the user has to input the values(like command-line arguments in python) to the tasks created.

place = Variable.get("place")
search_query = Variable.get("search_query")

These variables place and search_query are used in the url_generator python function of yelp_url_generator task.

Variables Creation

Tasks Relationship/Arrangement

The DAG will make sure that operators run in the correct certain order. Check here.

end_task << validate_db << writing_to_db << validate_data << get_data << response_generator << yelp_url_generator << start_task

airflow upstream arrangement of tasks with start_task and end_task is dummy tasks(optional). Others yelp_url_generator →response_generator →get_data →validate_data →writing_to_db →validate_db are python tasks.

Check the complete code here 

Triggering the DAG

Since we kept schedule_interval=None, we have to manually trigger the DAG. Let’s see how to do that →

MongoDB data
Tasks Successfully Completed

Tree View of each DAG run

Tree View of each DAG run

Handling Cases

You must be wondering why to use this setup of airflow for simple scraping. The reason is,

  1. We can break down the whole single task into multiple tasks and have control over each task at any point.
  2. Will have clear logs at every level.
  3. Can easily connect to other servers with airflow operators to execute the script.

Here are the few cases handled in the work-flow

  • When we are trying to write the same set of data into the Database with multiple DAG runs.
Duplicate Key Error

task_id=writing_to_db will be handling this case.

  • When the data scraped and pushed to the database doesn’t match.

task_id=validate_db will be handling this case. In case the anomaly is detected, we will be raising the Value Error.

Share this:

We’re hiring!

Sounds like your cup of tea? Join us!