Airflow with Twitter Scraper, Google Cloud Storage, Big Query — tweets relating to Covid19

Jayce Jiang
8 min readMay 22, 2020

Background:

In Part I, we learned how to set up Airflow with Google Cloud Platform (GCS) using Docker. We then implemented the standard operators and sensors concept to our google cloud storage, followed by performing a file clean-up procedure.

In Part II of this 4-part blog series, we will go over how to set up a Twitter scraper in Airflow and store the data in GCS, then automatically load it into BigQuery for further analysis. I will apply what is known as the “data engineering framework” to our airflow tweet pipeline, which will dynamically generate different instantiations of Twitter Airflow DAGs. We will also enhance our dataflow by creating additional branches leveraging the BranchOperator and improving our GCS and BigQuery file modification sensors. Given the unprecedented global impact of COVID-19, the target data to be scraped are tweets containing the word “coronavirus.”

Data Engineering Framework Pipeline

With this tutorial we can create new chunks of workflows without doing “new” work, thus, saving a lot of time and effort. It also allow data scientists and engineer to get to the “value” extraction part faster.

Before we begin building, I will cover the fundamentals of the data engineering framework pipeline. You have an input, the pipeline, and an output. A real example would be data flowing from Cloud Function/Cloud Composer (input) to BigQuery (output).

Google Cloud Source

While I was working for ByteDance, I learned the characteristics of a variety of pipeline frameworks. Below I highlighted the different types of frameworks that most data engineers and scientists build in their careers.

  • Aggregation Framework — this is a pipeline that focuses on the aggregation of data and pre-computation of the required facts. Using SQL query to calculate these desired aggregation metrics is ineffective because it requires you to load data in a large batch instead of incrementally.
  • Join Transfer Framework — as the name entails, it is an ETL pipeline that involved joining different fact tables or transferring data tables from one platform to another.
  • History Filling Framework — once a data pipeline is created, the engineer often needs to create a separate pipeline that fills in the historical data. This pipeline usually contains many histories backfilling scripts. It is also one of the most common frameworks engineers work on because it enables the company to visit earlier data and find meaningful insights.
  • Deployment Framework — this is a pipeline that focuses on automated testing cases. It is used before production to check for bugs and integration issues. An Operation or DevOps team usually handles this procedure.

Understanding and creating each of these frameworks have a multipliers effort on your data technical skills.

Without further ado, let’s start coding!

Technical Section:

Before we start, we need to rebuild our docker-airflow image from Part I. The Twint library requires Python version 3.6 and a few other dependencies. Add the following two lines to our Dockerfile. I also provided a link to the full file here.

FROM python:3.6-slim-buster
.....
RUN pip3 install twint.....docker build --rm --build-arg AIRFLOW_DEPS="gcp" -t puckel/docker-airflow .

Next, we need to change the image name in our Docker-compose file to reflect this new image.

puckel/docker-airflow ------> puckel/docker-airflow:latest

First, let’s test out the Twint library and collect the tweet containing the word “coronavirus” on verified account on twitter between May 1 to May 5, 2020.

After you run the DAG, you can see what the scraper did in Logs.

Airflow Logs

The output is converted into a CSV format on GCS. We can easily create a framework between various timeframes to study the tweet historical of the word “coronavirus.”

Dynamically Generate Operators in DAGs:

In this example, I dynamically generate multiple twitter related operators and set unique IDs on them. Also, by adding the metadata into our blog file, we can check the file modified time.

We need to modify the example above: first, by adding a for loop and, second, by creating a new function that returns the Operator class. There are minor modifications I made to the Twint configuration. Since there are many tweets regarding the Coronavirus, and I am running this DAG in my local machine, I limited the number of tweets collected per month to 100. If you want all the tweets, you can edit the setting in the Twint object.

Once you load up the DAG, you should see this graph in your webserver.

By triggering this DAG, you should see new files has been created in your google storage.

gcs_to_bq GCP Operator:

The next step is to load those files from Google Cloud Storage into Big Query using the GCP pre-built Operator. There is a small prerequisite: we need to create a dataset on the Big Query platform. You can either created using code or using the console.

Google Instructions: How to Create Dataset in BigQuery
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set dataset_id to the ID of the dataset to create.
# dataset_id = "{}.your_dataset".format(client.project)

# Construct a full Dataset object to send to the API.
dataset = bigquery.Dataset(dataset_id)

# TODO(developer): Specify the geographic location where the dataset should reside.
dataset.location = "US"

# Send the dataset to the API for creation.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
dataset = client.create_dataset(dataset) # Make an API request.
print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

To transfer data from GCS to BG, we can use the built-in GCP operators. Here is how you utilize the code. For more information, take a look at this.

from airflow.contrib.operators import gcs_to_bqwith dag:
GCS_to_BQ = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='airflowexample',
source_objects=['airflowTweet/*.csv'],
destination_project_dataset_table='tweetScraper.tweet',
skip_leading_rows=1,
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='google_cloud_default',
allow_quoted_newlines=True, #allows for newlines
allow_jagged_rows=True, #allows for missing values
autodetect=True
)

Next, we need to create a python function that checks for modified files in the cloud storage to determine whether or not to trigger the transfer task.
One of the essential skills in software development is the concept of modularization. So I decided this would be the perfect timing to showcase how you can use modularization in Airflow by creating a custom sensor — automatic file change detection GCS_BigQuery sensor.

Custom Sensor:

There are several ways to import custom operators and sensors. Below, I went over two commons ways, Directly or through the Airflow’s plugins_manager, and briefly listed their benefit and drawback.

But before we continue, let’s include these lines in our Docker-Compose File for our worker, scheduler, and webserver.

volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins- ./plugins:/usr/local/airflow/plugins

Directly Approach:

The simplest way to use a custom operator, sensor, and hook is a direct import. Two steps: create the custom sensor file in the plugin folder then use the following line to load it into the DAG.

from filename import classnameOfTheCustomPlugin

Even though this approach is simple and can work, it can cause some severe readability issues when your code-base increases. This is why I prefer to use the airflow plugin manager.

Using Airflow Plugin_Manager:

Before we can use the built-in plugin library, we need to create the “sensors” folder, two __init__.py files, and the custom_sensor.py file like the pictures shown below. If you want to create custom operators, then just create a “operators” folder instead.

The outer __init__.py needs to initialize all of our custom files.

from airflow.plugins_manager import AirflowPlugin
from sensors.gcs_bq_custom_sensor import GoogleCloudStorageBigQueryUpdateSensor
class AirflowCustomPlugin(AirflowPlugin):
name = "airflow_custom_plugin"
sensors = [GoogleCloudStorageBigQueryUpdateSensor]
operators = []
executors = []
macros = []
admin_views = []
flask_blueprints = []
menu_links = []

You do not need to put anything __init__.py inside the “sensors” folder. But this file is useful when we want to combine multiple custom sensors/operators file into one import file.

Next, just create the gcs_bq_custom_sensor.py and import the BaseSensorOperator to your class GoogleCloudStorageBigQuery.

This sensor checks the modified date on your big query table and google cloud storage’s files, and depending on which is older, returns true or false.

To import this library in your DAGs, simple use the following code:

from airflow.sensors import GoogleCloudStorageBigQueryUpdateSensor

Let’s Us Combine Everything:

Practice makes perfect, so let’s combine everything we learn in this blog and make a simple daily scraping and aggregation pipeline — scrapes tweets every day, store it in google cloud storage, and updates it to the big query once our sensor detects any change in the cloud file system.

We set up our DAGs to scrape yesterday’s twitter data, only if that data file does not exist in our cloud storage. The best way to solve this problem is to use something called BranchingOperator.

BranchPythonOperator it takes a python function as an input and it returns a task id or a list of task ids to decide which part of the graph to go down.

We can use this operator to iterate down the variety of paths in our DAG based off the result of the function below.

The above code scans our google bucket to determine whether yesterday’s data was gathered. If the data CSV file was saved, it triggers the All_jobs_end task; else it set off the tweeter-today-scraper. Here is the completed file.

The above operator is the basic version that has a simple checking algorithm. In theory, there should be no duplications in BigQuery using the above step.

However, there can be plenty of reasons why there may be duplications in the dataset. You can easily remove duplication using the following custom Operator.

The following code is found in Nicholas Leong’s article in Gmail Data Pipeline on Apache Airflow. If you want to implement this feature, you can reverse engineer his lesson as well.

Our second tutorial ends here!

Part III of this series focuses on creating a data engineering pipeline that will manipulate our Twitter dataset. There will also be lessons on how we can load the data from BigQuery into Google Colab for some simple text analysis.

Thanks for reading! If you found this blog helpful, please click “claps” and share the article.

Follow me if you want more content like this. I also appreciate all feedback in the comment section. Finally, I would like to thank my friend, Danny, for helping me with this article.

If you are interested in my newest data science project, check out my SBTN’s Player Comparison Tool and SBTN’s NBA K-Mean Cluster Analysis

--

--

Jayce Jiang

Data Engineer at Disney, who previous work at Bytedance.