Lab 2: Data Ingestion

caution

You are viewing this lab from the handbook. This lab is meant to be loaded as Cloud Shell tutorial. Please see the labs section on how to do so.

Welcome back šŸ˜!

During this lab, you ingest fraudulent and non fraudulent transactions into BigQuery using three methods:

For all methods, we are ingesting data from the bucket you have created in the previous lab.


Method 1: External table using BigLake

BigLake tables allow querying structured data in external data stores with access delegation. For an overview, refer to the BigLake documentation. Access delegation decouples access to the BigLake table from access to the underlying data store. An external connection associated with a service account is used to connect to the data store.

Because the service account handles retrieving data from the data store, you only have to grant users access to the BigLake table. This lets you enforce fine-grained security at the table level, including row-level and column-level security.

First, we create the connection resource in BigQuery:

bq mk --connection --location=us --project_id=<PROJECT_ID> \
    --connection_type=CLOUD_RESOURCE fraud-transactions-conn

When you create a connection resource, BigQuery creates a unique system service account and associates it with the connection.

bq show --connection <PROJECT_ID>.us.fraud-transactions-conn

Note the serviceAccountID.

To connect to Cloud Storage, you must give the new connection read-only access to Cloud Storage so that BigQuery can access files on behalf of users. Let’s assign the service account to a variable:

CONN_SERVICE_ACCOUNT=$(bq --format=prettyjson show --connection ${PROJECT_ID}.us.fraud-transactions-conn | jq -r ".cloudResource.serviceAccountId")
echo $CONN_SERVICE_ACCOUNT

Let’s double check the service account.

  1. Go to the BigQuery Console.
  2. Expand <PROJECT_ID>
  3. Expand External connections
  4. Click us.fraud-transactions-conn.

Is the service account equivalent to the one you got from the command line?

If so, let’s grant the service account access to Cloud Storage:

gcloud storage buckets add-iam-policy-binding gs://<PROJECT_ID>-bucket \
--role=roles/storage.objectViewer \
--member=serviceAccount:$CONN_SERVICE_ACCOUNT

Let’s create a data set that contains the table and the external connection to Cloud Storage.

  1. Go to the BigQuery Console
  2. Click the three vertical dots ā‹® next to <PROJECT_ID> in the navigation menu
  3. Click Create dataset
  4. Enter ml_datasets (plural) in the ID field. Region should be multi-region US.
  5. Click Create dataset

Alternatively, you can create the data set on the command line:

bq --location=us mk -d ml_datasets

Next, we connect the data in Cloud Storage to BigQuery:

  1. Click + Add data
  2. Click Google Cloud Storage
  3. Select Load to BigQuery
  4. Enter the following details:
  • Create table from: Google Cloud Storage
  • Select file: <PROJECT_ID>-bucket/data/parquet/ulb_fraud_detection/*
  • File format: Parquet
  • Project: <PROJECT_ID>
  • Dataset: ml_datasets
  • Table: ulb_fraud_detection_biglake
  • Table type: External table
  • Check Create a BigLake table using a Cloud Resource connection
  • Connection ID: Select us.fraud-transactions-conn
  • Schema: Auto detect
  1. Click on Create table

Alternatively, you can also use the command line to create the table:

bq mk --table \
  --external_table_definition=@PARQUET="gs://${PROJECT_ID}-bucket/data/parquet/ulb_fraud_detection/*"@projects/${PROJECT_ID}/locations/us/connections/fraud-transactions-conn \
  ml_datasets.ulb_fraud_detection_biglake

Let’s have a look at the data set:

  1. Go to the BigQuery Console
  2. Expand <PROJECT_ID>
  3. Expand ml_datasets
  4. Click ulb_fraud_detection_biglake
  5. Click DETAILS

Have a look at the external data configuration. You can see the Cloud Storage bucket (gs://...) your data lives in.

Let’s query it:

  1. Click QUERY
  2. Insert the following SQL query.
SELECT * FROM `<PROJECT_ID>.ml_datasets.ulb_fraud_detection_biglake` LIMIT 1000;

Note that you can also execute a query using the bq tool:

bq --location=us query --nouse_legacy_sql "SELECT Time, V1, Amount, Class FROM <PROJECT_ID>.ml_datasets.ulb_fraud_detection_biglake LIMIT 10;"

The data you are querying still resides on Cloud Storage and there are no copies stored in BigQuery. When using BigLake, BigQuery acts as query engine but not as storage layer.


Method 2: Real time data ingestion into BigQuery using Pub/Sub

Pub/Sub enables real-time streaming into BigQuery. Learn more about Pub/Sub integrations with BigQuery.

We create an empty table and then stream data into it. For this to work, we need to specify a schema. Have a look at fraud_detection_bigquery_schema.json. This is the schema we are going to use.

Create an empty table using this schema. We will use it to stream data into it:

bq --location=us mk --table \
<PROJECT_ID>:ml_datasets.ulb_fraud_detection_pubsub src/data_ingestion/fraud_detection_bigquery_schema.json

We also need to create a Pub/Sub schema. We use Apache Avro, as it is better suited for appending row-wise:

gcloud pubsub schemas create fraud-detection-schema \
    --project=$PROJECT_ID  \
    --type=AVRO \
    --definition-file=src/data_ingestion/fraud_detection_pubsub_schema.json

And then create a Pub/Sub topic using this schema:

gcloud pubsub topics create fraud-detection-topic \
    --project=$PROJECT_ID  \
    --schema=fraud-detection-schema \
    --message-encoding=BINARY

We also need to give Pub/Sub permissions to write data to BigQuery. The Pub/Sub service account is created automatically and is comprised of the project number (not the id) and an identifier. In your case, it is service-<PROJECT_NUMBER>@gcp-sa-pubsub.iam.gserviceaccount.com

And grant the service account access to BigQuery:

gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member=serviceAccount:service-<PROJECT_NUMBER>@gcp-sa-pubsub.iam.gserviceaccount.com --role=roles/bigquery.dataEditor

gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member=serviceAccount:service-<PROJECT_NUMBER>@gcp-sa-pubsub.iam.gserviceaccount.com --role=roles/bigquery.jobUser

Next, we create the Pub/Sub subscription:

gcloud pubsub subscriptions create fraud-detection-subscription \
    --project=$PROJECT_ID  \
    --topic=fraud-detection-topic \
    --bigquery-table=$PROJECT_ID.ml_datasets.ulb_fraud_detection_pubsub \
    --use-topic-schema  

Examine it in the console:

  1. Go to the Pub/Sub Console
  2. Click fraud-detection-subscription. Here you can see messages as they arrive.
  3. Click fraud-detection-topic. This is the topic you will be publishing messages to.

Please have a look at import_csv_to_bigquery_1.py. This script loads CSV files from Cloud Storage, parses it in Python, and sends it to Pub/Sub - row by row.

Let’s execute it.

./src/data_ingestion/import_csv_to_bigquery_1.py

Each line you see on the screen corresponds to one transaction being sent to Pub/Sub and written to BigQuery. It would take approximately 40 to 60 minutes for it to finish. So, please cancel the command using CTRL + C.

But why is it so slow?

Let’s ask Gemini:

  1. Open Gemini Code Assist

  2. Insert Why is import_csv_to_bigquery_1.py so slow? into the Gemini prompt.


Method 3: Ingestion using Cloud Dataproc (Apache Spark)

Dataproc is a fully managed and scalable service for running Apache Hadoop, Apache Spark, Apache Flink, Presto, and 30+ open source tools and frameworks. Dataproc allows data to be loaded and also transformed or pre-processed as it is brought in.

Create an empty BigQuery table:

bq --location=us mk --table \
<PROJECT_ID>:ml_datasets.ulb_fraud_detection_dataproc src/data_ingestion/fraud_detection_bigquery_schema.json

Download the Spark connector for BigQuery and copy it to our bucket:

wget -qN https://github.com/GoogleCloudDataproc/spark-bigquery-connector/releases/download/0.37.0/spark-3.3-bigquery-0.37.0.jar
gsutil cp spark-3.3-bigquery-0.37.0.jar gs://${PROJECT_ID}-bucket/jar/spark-3.3-bigquery-0.37.0.jar

Open import_parquet_to_bigquery.py in the Cloud Shell editor and replace the project id with your project id <PROJECT_ID>. Don’t forget to save.

Execute it:

gcloud dataproc batches submit pyspark src/data_ingestion/import_parquet_to_bigquery.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --deps-bucket=gs://${PROJECT_ID}-bucket

While the command is still running, open the DataProc Console and monitor the job.

After the Dataproc job completes, confirm that data has been loaded into the BigQuery table. You should see over 200,000 records, but the exact count isn’t critical:

bq --location=us query --nouse_legacy_sql "SELECT count(*) as count FROM <PROJECT_ID>.ml_datasets.ulb_fraud_detection_dataproc;"

ā— Please do not skip the above validation step. Data in the above table is needed for the following labs.


Success

šŸŽ‰ Congratulations! šŸš€

You’ve officially leveled up in data wizardry! By conquering the BigQuery Code Lab, you’ve shown your skills in not just one, but three epic methods: BigLake (riding the waves of data), DataProc (processing like a boss), and Pub/Sub (broadcasting brilliance).

Your pipelines are now flawless, your tables well-fed, and your data destiny secured. Welcome to the realm of BigQuery heroes — the Master of Ingestion! šŸ¦¾šŸ’»

aaa