Schedule web scrapers with Apache Airflow

 This post is the part of Data Engineering Series.

In the previous post, I discussed Apache Airflow and it’s basic concepts, configuration, and usage. In this post, I am going to discuss how can you schedule your web scrapers with help of Apache Airflow.

I will be using the same example I used in Apache Kafka and Elastic Search example that is scraping https://allrecipes.com  because the purpose is to use Airflow. In case you want to learn about scraping you may check the entire series here.

So, we will work on a workflow consist of tasks:

  • parse_recipes: It will parse individual recipes.
  • download_image: It downloads recipe image.
  • store_data: Finally store image and parsed data into MySQL

I will not be covering how to schedule DAGs and other things as I already covered in Part 1.

I set load_examples=False in airflow.cfg to declutter the interface and keep relevant entries here. The basic code structure look like below:

If you register this DAG by running airflow scheduler something similar should appear on your screen.

You can see the power of workflows here. You just come up with a skeleton and can rush to your higher-ups and show how their enterprise data pipeline will look like without getting into details first. Cool, right?

Now, let’s implement each task routine one by one. Before I get into the original logic of parse_recipies, allow me to discuss how Airflow tasks can communicate with each other.

What is XCOM?

XCOM provides a way to let tasks communicate with each other. The pushed data from one task is pulled into another task. If you set provide_context=True, the returned value of the function is pushed itself into XCOM which itself is nothing but a Db table. If you check airflow.db you will find a table with name xcom you will see entries of the running task instances.

The first task has no such changes other than providing **kwargs which let share key/value pairs. The other is setting provide_context=True in each operator to make it XCom compatible. For instance:

 

The download_image will have the following changes:

The first line is ti=kwargs['t1']` get the instances details by access ti key. In case you wonder why this has been done. If you print kwargs it prints something like below in which you can find keys like t1, task_instance etc to get a task’s pushed value:

The next, I called xcom_pull to put the certain task’s returned value. In my the task id is parse_recipes:

v1 = ti.xcom_pull(key=None, task_ids='parse_recipes')

As you can see in the screenshot above, it is printing the returned value as  {logging_mixin.py:95} INFO - RETURNS parse_recipes

Alright, you learned the concept of XCOM, now revert back to our original code.

So the idea is, the first task will read URLs from the text file(You can create another task that will only be responsible to fetch links and store in File or DB. That’s your Task!) and then that list of entries is passed to next task to download images and add the info of newly downloaded file in the dict and finally store it into MySQL.

download_image will now look like below:

I am not discussing dl_img method as this is beyond the scope of this post. You can check the code on the Github for it. Once the file is downloaded, you append the key in the original record.

The store_data now looks like below:

Make sure you set provide_context=True in opr_store_data operator otherwise you will get the following error:

Subtask store_data KeyError: 'ti'

Now, the data is available. All is left to store data into MySQL.

To use MySQL with Airflow, we will be using Hooks provided by Airflow.

Airflow Hooks let you interact with external systems: Email, S3, Databases, and various others.

Before we get into coding, we need to set up a MySQL connection.

On Airflow Web UI go to Admin > Connections. Here you will see a list of existing connections if you go to http://0.0.0.0:8080/admin/connection/

I edit the connection and set the table name and password.

I will be importing MySqlHook to use MySQL library

from airflow.hooks.mysql_hook import MySqlHook

The SQL of the table is given below

That’s it. Oh, wait… what if Admin can be notified that the workflow is successfully done? Airflow let you use EmailOperator for the purpose.

Before using it you gotta make changes in airflow.cfg for mail related settings. There is an [smtp] section

I used my own Gmail account to send mail.

Conclusion

In this post, you learned how to introduce Airflow in your existing scraping architecture and how to use MySQL with Airflow. There are various possibilities to improve it or extend it. Try it and let me know.

The code is available on Github

 

If you like this post then you should subscribe to my blog for future updates.

* indicates required