Lab 2: Data Ingestion
caution
You are viewing this lab from the handbook. This lab is meant to be loaded as Cloud Shell tutorial. Please see the labs section on how to do so.
Welcome back š!
During this lab, you ingest fraudulent and non fraudulent transactions into BigQuery using three methods:
- Method 1: Using BigLake with data stored in Google Cloud Storage (GCS)
- Method 2: Near real-time ingestion into BigQuery using Cloud Pub/Sub
- Method 3: Batch ingestion into BigQuery using Dataproc Serverless
For all methods, we are ingesting data from the bucket you have created in the previous lab.
Method 1: External table using BigLake
BigLake tables allow querying structured data in external data stores with access delegation. For an overview, refer to the BigLake documentation. Access delegation decouples access to the BigLake table from access to the underlying data store. An external connection associated with a service account is used to connect to the data store.
Because the service account handles retrieving data from the data store, you only have to grant users access to the BigLake table. This lets you enforce fine-grained security at the table level, including row-level and column-level security.
First, we create the connection resource in BigQuery:
bq mk --connection --location=us --project_id=<PROJECT_ID> \
--connection_type=CLOUD_RESOURCE fraud-transactions-conn
When you create a connection resource, BigQuery creates a unique system service account and associates it with the connection.
bq show --connection <PROJECT_ID>.us.fraud-transactions-conn
Note the serviceAccountID
.
To connect to Cloud Storage, you must give the new connection read-only access to Cloud Storage so that BigQuery can access files on behalf of users. Letās assign the service account to a variable:
CONN_SERVICE_ACCOUNT=$(bq --format=prettyjson show --connection ${PROJECT_ID}.us.fraud-transactions-conn | jq -r ".cloudResource.serviceAccountId")
echo $CONN_SERVICE_ACCOUNT
Letās double check the service account.
- Go to the BigQuery Console.
- Expand
<PROJECT_ID> - Expand
External connections - Click
us.fraud-transactions-conn
.
Is the service account equivalent to the one you got from the command line?
If so, letās grant the service account access to Cloud Storage:
gcloud storage buckets add-iam-policy-binding gs://<PROJECT_ID>-bucket \
--role=roles/storage.objectViewer \
--member=serviceAccount:$CONN_SERVICE_ACCOUNT
Letās create a data set that contains the table and the external connection to Cloud Storage.
- Go to the BigQuery Console
- Click the three
vertical dots ā® next to<PROJECT_ID>
in the navigation menu - Click
Create dataset - Enter
ml_datasets
(plural) in the ID field. Region should be multi-region US. - Click
Create dataset
Alternatively, you can create the data set on the command line:
bq --location=us mk -d ml_datasets
Next, we connect the data in Cloud Storage to BigQuery:
- Click
+ Add data - Click
Google Cloud Storage - Select
Load to BigQuery
- Enter the following details:
- Create table from:
Google Cloud Storage
- Select file:
<PROJECT_ID>-bucket/data/parquet/ulb_fraud_detection/*
- File format:
Parquet
- Project:
<PROJECT_ID>
- Dataset:
ml_datasets
- Table:
ulb_fraud_detection_biglake
- Table type:
External table
- Check Create a BigLake table using a Cloud Resource connection
- Connection ID: Select
us.fraud-transactions-conn
- Schema:
Auto detect
- Click on
Create table
Alternatively, you can also use the command line to create the table:
bq mk --table \
--external_table_definition=@PARQUET="gs://${PROJECT_ID}-bucket/data/parquet/ulb_fraud_detection/*"@projects/${PROJECT_ID}/locations/us/connections/fraud-transactions-conn \
ml_datasets.ulb_fraud_detection_biglake
Letās have a look at the data set:
- Go to the BigQuery Console
- Expand
<PROJECT_ID> - Expand
ml_datasets - Click
ulb_fraud_detection_biglake
- Click
DETAILS
Have a look at the external data configuration. You can see the Cloud Storage bucket (gs://...
) your data
lives in.
Letās query it:
- Click
QUERY - Insert the following SQL query.
SELECT * FROM `<PROJECT_ID>.ml_datasets.ulb_fraud_detection_biglake` LIMIT 1000;
Note that you can also execute a query using the bq
tool:
bq --location=us query --nouse_legacy_sql "SELECT Time, V1, Amount, Class FROM <PROJECT_ID>.ml_datasets.ulb_fraud_detection_biglake LIMIT 10;"
The data you are querying still resides on Cloud Storage and there are no copies stored in BigQuery. When using BigLake, BigQuery acts as query engine but not as storage layer.
Method 2: Real time data ingestion into BigQuery using Pub/Sub
Pub/Sub enables real-time streaming into BigQuery. Learn more about Pub/Sub integrations with BigQuery.
We create an empty table and then stream data into it. For this to work, we need to specify a schema. Have a look at fraud_detection_bigquery_schema.json
. This is the schema we are going to use.
Create an empty table using this schema. We will use it to stream data into it:
bq --location=us mk --table \
<PROJECT_ID>:ml_datasets.ulb_fraud_detection_pubsub src/data_ingestion/fraud_detection_bigquery_schema.json
We also need to create a Pub/Sub schema. We use Apache Avro, as it is better suited for appending row-wise:
gcloud pubsub schemas create fraud-detection-schema \
--project=$PROJECT_ID \
--type=AVRO \
--definition-file=src/data_ingestion/fraud_detection_pubsub_schema.json
And then create a Pub/Sub topic using this schema:
gcloud pubsub topics create fraud-detection-topic \
--project=$PROJECT_ID \
--schema=fraud-detection-schema \
--message-encoding=BINARY
We also need to give Pub/Sub permissions to write data to BigQuery. The Pub/Sub service account is created automatically and
is comprised of the project number (not the id) and an identifier. In your case, it is service-<PROJECT_NUMBER>@gcp-sa-pubsub.iam.gserviceaccount.com
And grant the service account access to BigQuery:
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:service-<PROJECT_NUMBER>@gcp-sa-pubsub.iam.gserviceaccount.com --role=roles/bigquery.dataEditor
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:service-<PROJECT_NUMBER>@gcp-sa-pubsub.iam.gserviceaccount.com --role=roles/bigquery.jobUser
Next, we create the Pub/Sub subscription:
gcloud pubsub subscriptions create fraud-detection-subscription \
--project=$PROJECT_ID \
--topic=fraud-detection-topic \
--bigquery-table=$PROJECT_ID.ml_datasets.ulb_fraud_detection_pubsub \
--use-topic-schema
Examine it in the console:
- Go to the Pub/Sub Console
- Click
fraud-detection-subscription . Here you can see messages as they arrive. - Click
fraud-detection-topic . This is the topic you will be publishing messages to.
Please have a look at import_csv_to_bigquery_1.py
. This script loads CSV files from Cloud Storage, parses it in Python, and sends it to Pub/Sub - row by row.
Letās execute it.
./src/data_ingestion/import_csv_to_bigquery_1.py
Each line you see on the screen corresponds to one transaction being sent to Pub/Sub and written to BigQuery. It would take approximately 40 to 60 minutes for it to finish. So, please cancel the command using CTRL + C
.
But why is it so slow?
Letās ask Gemini:
-
Open Gemini Code Assist
-
Insert
Why is import_csv_to_bigquery_1.py so slow?
into the Gemini prompt.
Method 3: Ingestion using Cloud Dataproc (Apache Spark)
Dataproc is a fully managed and scalable service for running Apache Hadoop, Apache Spark, Apache Flink, Presto, and 30+ open source tools and frameworks. Dataproc allows data to be loaded and also transformed or pre-processed as it is brought in.
Create an empty BigQuery table:
bq --location=us mk --table \
<PROJECT_ID>:ml_datasets.ulb_fraud_detection_dataproc src/data_ingestion/fraud_detection_bigquery_schema.json
Download the Spark connector for BigQuery and copy it to our bucket:
wget -qN https://github.com/GoogleCloudDataproc/spark-bigquery-connector/releases/download/0.37.0/spark-3.3-bigquery-0.37.0.jar
gsutil cp spark-3.3-bigquery-0.37.0.jar gs://${PROJECT_ID}-bucket/jar/spark-3.3-bigquery-0.37.0.jar
Open <PROJECT_ID>
. Donāt forget to save.
Execute it:
gcloud dataproc batches submit pyspark src/data_ingestion/import_parquet_to_bigquery.py \
--project=$PROJECT_ID \
--region=$REGION \
--deps-bucket=gs://${PROJECT_ID}-bucket
While the command is still running, open the DataProc Console and monitor the job.
After the Dataproc job completes, confirm that data has been loaded into the BigQuery table. You should see over 200,000 records, but the exact count isnāt critical:
bq --location=us query --nouse_legacy_sql "SELECT count(*) as count FROM <PROJECT_ID>.ml_datasets.ulb_fraud_detection_dataproc;"
ā Please do not skip the above validation step. Data in the above table is needed for the following labs.
Success
š Congratulations! š
Youāve officially leveled up in data wizardry! By conquering the BigQuery Code Lab, youāve shown your skills in not just one, but three epic methods: BigLake (riding the waves of data), DataProc (processing like a boss), and Pub/Sub (broadcasting brilliance).
Your pipelines are now flawless, your tables well-fed, and your data destiny secured. Welcome to the realm of BigQuery heroes ā the Master of Ingestion! š¦¾š»