I am taking a short break from Blockchain-related posts.
In past, I have covered Apache Airflow posts here. In this post, I am discussing how to use the CCXT library to grab BTC/USD data from exchanges and create an ETL for data analysis and visualization. I am using the dockerized version of Airflow. It is easy to set up and using proper different images to run different components instead of a one-machine setup. The
docker-compose.yml file is available here but I have made a few changes to install custom libraries. Therefore, it is advised you use the file I have provided in the Github repo. Before I continue, the demo of the work is given here:
The entire flow looks like the below:
I am fetching BTC/USD data from three different exchanges: Binance, ByBit, and FTX. Once the data is available, I am sending all three sources into the next stage where I am only pulling close and timestamp data because these are fields that I need for the next stages. From here, I am branching out two tasks: one to create text files and storing data for later use and the other is storing into the DB. The 3 different files are then sent to the server via FTP. The data from
load_data section is then used to generate reports and then FTPing the file to the server. Notice that some borders are dark green and some are light green in color. Dark green means these tasks are executed successfully while light green means they are still running. The task, ftp_pdf_file is yet to run. In Airflow terms, the DAG flow has been set like below:
In first stage all 3 tasks run in parallel and output results to
transform_data therefore I put all 3 tasks in a Python
list. The >> sign is telling that it is an upstream. The transformed data is being to
load_data tasks who themselves are upstream for ftp_file and
generate_pdf_reports tasks. Notice I am using << here to define upstream for
generate_pdf_reports tasks. Again these tasks were first added in a Python
list and then branched out for the next stage.
In order to pass data from one task to another task, Airflow provides
xcom_push methods. XCom means Cross Communication. When you use
xcom_pull, it means you are fetching data from the task whose Ids have been passed. For instance:
Here I passed the
task_id of three tasks and assigned in three different variables. Wondering where those task_ids are defined? they are defined as
Check the log of the task
I highlighted the returned value of the task. This and the other two tasks values are xcom_pulled in the
I am also using two external components here: FTP and Database. You may use the traditional python code by importing the required libraries but Airflow also provides the options of hooks and connections. For instance, pgsql connection looks like below:
Once the connection is defined, you may use PG Hook to perform Postgres-based operations.
Cool, isn’t it? No need to saving credentials in the code. The same thing has been done with FTP.
In the end, data is analyzed and generates a two-page PDF for executives.
the graph is being generated in matplotlib and saved as an embedded image in the PDF file.
Sweet, No? I was supposed to add data and charts of both FTX and ByBit too but I leave it for you ;). Also, this is not a heavy analysis of the data. I have provided you all data need. You may play with
ta-lib or other libraries and find insights. All up to you 😉
I am using fpdf python library to generate PDF files. This is how files are being sent via FTP.
In this post, you learned how you can make complex flows for ETLs and use connections and hooks to connect 3rd party tools like FTP, DB, AWS, etc. Airflow could be a pretty powerful tool if used correctly. Like always, the code is available on Github.