Data engineering is the foundational base of every data scientist’s toolbox. After all, before we could produce any meaningful analysis that adds business value, data must be obtained, cleaned, and shaped. Thus, a good data scientist should know enough about data engineering to understand his role and evaluate his contribution to the need of the company.
Despite the critical nature of data engineering, only a fraction of the educational programs appropriately emphasize this topic on an enterprise level. The lack of emphasis on data engineering in online education leaves students at a disadvantage when they begin their quest for a data science role. More often than not, the skills that future data scientists require are acquired while on the job. Yet, how could these skills be earned when most of these burgeoning data scientists didn’t have the opportunity to learn on the job. This conundrum leads to longer job searches and wasted resources, for the candidates and the hiring companies.
I am writing this guide to help close this gap and help our fellow data scientists improve their foundational skillset by providing a real-world example using enterprise tools.
This post is the first of four, illustrating a real-world example of enterprise-level data flow. At the beginning of each post, there is a brief high-level introduction to the aspects of the data flows we are developing and the different technologies being used.
- Part I, we run Docker and Airflow. We also briefly go over the different features of Airflow and a practical example using some of them. We then demonstrate how to use Docker, run a simple airflow task, and store the output in Google Cloud Storage.
- Part II focuses on Airflow, explicitly setting up a simple twitter scraper and storing the data in the Google Cloud Platform. In part II, we would define a custom operator and sensor, then highlight some acceptable practices when writing ETL code.
- Part III will feature more functionality with Airflow, Google Cloud Platform, and how it all connects to Big Query.
- Part IV, I will set up a PySpark section, then provide some simple data analysis and machine learning on twitter data.
Before I begin the example, I would like to share the Data Science Hierarchy of Needs. Similar to the regular human of the Hierarchy of Needs, a company should focus on the lowest level of needs before advancing to the next level. Not saying that every data scientist needs to be an expert in data engineering, but one should know the basics to reduce the most friction within the unit.
Brief introduction of Airflow:
Airflow is an open-source workflow management platform that defines tasks code, executes those tasks on a regular schedule, and distributes task execution across worker processes
Airflow’s workflow is represented using the concept known as DAGs or directed acyclic graphs, which is just a collection of directional dependent tasks. Each node in the graph is just a task, and the edges define the dependencies within the tasks.
There are different type of operators task:
- Common operators include Python Operator, MySQL Operator, and Bash Operator — use to triggers specific actions.
- Sensors operators — the ability to pause the execution of dependent tasks until some criterion is met
- Transfer operators: moves data from one location to another
This guide assumes one already have Docker installed. If you did not install Docker and Docker-Compose for your operating system, please do so. I first create the local development version of the airflow docker.
Before we start, please take a look at the GitHub documentation of docker-airflow.
Let’s create a new folder and clone the above GitHub Repository.
Use the command below to create the docker image with GCP dependencies.
docker build --rm --build-arg AIRFLOW_DEPS="gcp" -t puckel/docker-airflow .
Once the image is built, we can create the docker-compose file.
You can either use LocalExecutor or CeleryExecutor:
- LocalExecutor — exemplifies single-node architecture and completes tasks in parallel that run on a single machine — ideal for testing.
- CeleryExecutor — focus on horizontal scaling and run python processes in a distributed style with a pool of independent workers — the best choice for users in production and ideal for massive amounts of jobs but take longer to set up.
In this case, we will use the LocalExecutor
docker-compose -f docker-compose-LocalExecutor.yml up -d
You may need to change the ports depending on which port is allocated.
We need to set up the google credential and register a connection. We can add the GCP connection via the airflow WebUI like this:
However, we can do better by adding the connection using the internal PythonOperator. We can build our first python operator to add the GCP connection. The code is self-explanatory.
Once you activate this dag, check under Admin -> Connection to see if the connection has been changed.
Once we see change, we can continue this guide. The next thing I will be doing is uploading NumPy generated Dataframe into Google Cloud Storage.
I am not covering how to create a Google Cloud bucket in this post, but there is some help documentation available on the Google Cloud Platform official page.
Once you have the bucket created, let’s start coding.
First, we need to install the correct library. First, create the requirements.txt file and input the required libraries. Then add the following line to the docker-compose file.
google-cloud-storage#any libraries you want#docker-compose file under the scheduler and workervolumes:- ./dags:/usr/local/airflow/dags- ./requirements.txt:/requirements.txt
Once you complete this, we can start writing the dag. One needs to first create the hook from the GoogleCloudStorageHook() function. For more information, you can refer to this.
Quick Note: Google Cloud Storage does not allow modification of a file, if you want to modify any file, you need first to open the content then upload the current to replace it.
If your confirmation is correct, you should find a folder name in Google Cloud Storage.
Now let’s read in CSV file in another dag and do a transformation to it.
This time I will not use the airflow’s built-in google_cloud_hook; instead, I will use the official google.cloud library from Google. Make sure you input your credential and bucket information.
We do not need to download the file, and instead, we can just load the object using the ByteIO buffer and load the object into pandas.
byte_object = BytesIO()
Once you run this dag, you can check results in the airflow logs and your google cloud bucket.
Now let’s link everything we did together with this finale, let’s create a sensor operator to detect one of the files. Then have it trigger a function that deletes that file.
By using gcs_sensor functionality from airflow, we can detect if a file is in our bucket. Once we detect the file, we can trigger delete functionality. To see if the code works properly, you can check the google bucket.
Our first tutorial ends here! In the next blog, I will go over the more advanced topic of Airflow and cover topics such as custom sensor/operator and how we can use it to scrape Twitter data daily. Stay tuned!
If you like this article, please like, share, and subscribe for more content. Thanks again.