Using Apache Airflow ETL to fetch and analyze BTC data

This post is part of the Data Engineering and ETL Series.

I am taking a short break from Blockchain-related posts.

In the past, I have covered Apache Airflow posts here. In this post, I am discussing how to use the CCXT library to grab BTC/USD data from exchanges and create an ETL for data analysis and visualization. I am using the dockerized version of Airflow. It is easy to set up and use proper different images to run different components instead of a one-machine setup. The docker-compose.yml file is available here but I have made a few changes to install custom libraries. Therefore, it is advised you use the file I have provided in the Github repo. Before I continue, the demo of the work is given here:


The DAG

The entire flow looks like the below:

Airflow ETL in progress

I am fetching BTC/USD data from three different exchanges: Binance, ByBit, and FTX. Once the data is available, I am sending all three sources into the next stage where I am only pulling close and timestamp data because these are fields that I need for the next stages. From here, I am branching out two tasks: one to create text files and store data for later use and the other is storing into the DB. The 3 different files are then sent to the server via FTP. The data from load_data section is then used to generate reports and then FTPing the file to the server. Notice that some borders are dark green and some are light green in color. Dark green means these tasks are executed successfully while light green means they are still running. The task, ftp_pdf_file is yet to run. In Airflow terms, the DAG flow has been set like below:

In the first stage all 3 tasks running in parallel and output results to transform_data therefore I put all 3 tasks in a Python list.  The >> sign is telling that it is upstream. The transformed data is being to create_text_file and load_data tasks who themselves are upstream for ftp_file and generate_pdf_reports tasks. Notice I am using << here to define upstream for ftp_pdf_file and generate_pdf_reports tasks. Again these tasks were first added in Python list and then branched out for the next stage.

In order to pass data from one task to another task, Airflow provides xcom_pull and xcom_push methods. XCom means cross-communication. When you use xcom_pull, it means you are fetching data from the task whose IDs have been passed. For instance:

xcom_pull

Here I passed the task_id of three tasks and assigned in three different variables. Wondering where those task_ids are defined? they are defined as PythonOperator.

Check the log of the task fetch_binance_ohlcv:

I highlighted the returned value of the task. This and the other two tasks values are xcom_pulled in the transform_data task.

I am also using two external components here: FTP and Database. You may use the traditional python code by importing the required libraries but Airflow also provides the options of hooks and connections. For instance, pgsql connection looks like the below:

Once the connection is defined, you may use PG Hook to perform Postgres-based operations.

 

Cool, isn’t it? No need to save credentials in the code. The same thing has been done with FTP.

In the end, data is analyzed and generates a two-page PDF for executives.

the graph is being generated in matplotlib and saved as an embedded image in the PDF file.

Sweet, No? I was supposed to add data and charts of both FTX and ByBit too but I leave it for you ;). Also, this is not a heavy analysis of the data. I have provided you all data need. You may play with ta-lib or other libraries and find insights. All up to you 😉

I am using fpdf python library to generate PDF files. This is how files are being sent via FTP.

Conclusion

In this post, you learned how you can make complex flows for ETLs and use connections and hooks to connect 3rd party tools like FTP, DB, AWS, etc. Airflow could be a pretty powerful tool if used correctly. Like always, the code is available on GitHub.

If you like this post then you should subscribe to my blog for future updates.

* indicates required