In this repository we show a sample Dataflow pipeline using the Cloud Spanner Change Streams Connector.
- You must have maven installed.
- You must have set up authentication for gcloud.
- The authenticated account must have access to start a Dataflow job (more at Cloud Dataflow IAM).
- The authenticated account must have access to read / write to the specified GCS bucket (more at Cloud Storage IAM).
- The authenticated account must have access to query the Cloud Spanner Change Stream in the project/instance/database used (more at Cloud Spanner IAM).
- The authenticated account must have access to update Cloud Spanner database ddl in project/metadata instance/metadata database used (more at Cloud Spanner IAM).
- You must have set up dataflow security and permissions correctly.
- You must have pre-created a Cloud Spanner change stream to be read from.
- You must have pre-installed the Apache Beam Connector jar. You can see the specified jar version in the application
pom.xmlfile, underconnector.version.
The application specified in the com.google.changestreams will perform the following:
- It will read the specified change stream for 10 minutes of data (from now to 10 minutes in the future).
- It will extract the commit timestamps of each record streamed.
- It will group the records in 1 minute windows.
- It will output each window group into either a separate file in GCS, or records in BigQuery.
We provided a bash script to facilitate the execution.
For GCS, you can execute the script like so:
./run_gcs.sh \
--project <my-gcp-project> \
--instance <my-spanner-instance> \
--database <my-spanner-database> \
--metadata-instance <my-spanner-metadata-instance> \
--metadata-database <my-spanner-metadata-database> \
--change-stream-name <my-spanner-change-stream-name> \
--gcs-bucket <my-gcs-bucket> \
--region <my-dataflow-job-region>This script will dispatch a remote job in dataflow with the specified configuration:
-p|--project: the Google Cloud Platform project id-i|--instance: the Google Cloud Spanner instance id where the change stream resides-d|--database: the Google Cloud Spanner database id where the change stream resides-mi|--metadata-instance: the Google Cloud Spanner instance id where the Connector metadata tables will be created-md|--metadata-database: the Google Cloud Spanner database id where the Connector metadata tables will be created (we recommend it to be different from the change stream database)-c|--change-stream-name: the name of the pre-created Google Cloud Spanner change stream-g|--gcs-bucket: the Google Cloud Storage bucket to be used to store the results of the pipeline and to stage temp files for the Dataflow execution-r|--region: the region where to execute the Dataflow job (for options see Dataflow Locations)
The job executed here will spawn a single Dataflow worker to consume the change stream.
For BigQuery, you can execute the script like so:
./run_bigquery.sh \
--project <my-gcp-project> \
--instance <my-spanner-instance> \
--database <my-spanner-database> \
--metadata-instance <my-spanner-metadata-instance> \
--metadata-database <my-spanner-metadata-database> \
--change-stream-name <my-spanner-change-stream-name> \
--gcs-bucket <my-gcs-bucket> \
--big-query-dataset <my-big-query-dataset> \
--big-query-table-name <my-big-query-table-name> \
--region <my-dataflow-job-region>This script will dispatch a remote job in dataflow with the specified configuration:
-p|--project: the Google Cloud Platform project id-i|--instance: the Google Cloud Spanner instance id where the change stream resides-d|--database: the Google Cloud Spanner database id where the change stream resides-mi|--metadata-instance: the Google Cloud Spanner instance id where the Connector metadata tables will be created-md|--metadata-database: the Google Cloud Spanner database id where the Connector metadata tables will be created (we recommend it to be different from the change stream database)-c|--change-stream-name: the name of the pre-created Google Cloud Spanner change stream-g|--gcs-bucket: the Google Cloud Storage bucket to be used to stage temp files for the Dataflow execution-bd|--big-query-dataset: the BigQuery dataset to store the records emitted by the change stream-bt|--big-query-table-name: the BigQuery table name in the big query dataset to store the records emitted by the change stream-r|--region: the region where to execute the Dataflow job (for options see Dataflow Locations)
The job executed here will spawn a single Dataflow worker to consume the change stream.
For Pubsub, you can execute the script like so:
./run_pubsub.sh \
--project <my-gcp-project> \
--instance <my-spanner-instance> \
--database <my-spanner-database> \
--metadata-instance <my-spanner-metadata-instance> \
--metadata-database <my-spanner-metadata-database> \
--change-stream-name <my-spanner-change-stream-name> \
--gcs-bucket <my-gcs-bucket> \
--pubsub-topic <my-pubsub-topic> \
--region <my-dataflow-job-region>This script will dispatch a remote job in dataflow with the specified configuration:
-p|--project: the Google Cloud Platform project id-i|--instance: the Google Cloud Spanner instance id where the change stream resides-d|--database: the Google Cloud Spanner database id where the change stream resides-mi|--metadata-instance: the Google Cloud Spanner instance id where the Connector metadata tables will be created-md|--metadata-database: the Google Cloud Spanner database id where the Connector metadata tables will be created (we recommend it to be different from the change stream database)-c|--change-stream-name: the name of the pre-created Google Cloud Spanner change stream-g|--gcs-bucket: the Google Cloud Storage bucket to be used to stage temp files for the Dataflow execution-t|--pubsub-topic: the Google Cloud Pubsub topic to be used to publish the results of the pipeline-r|--region: the region where to execute the Dataflow job (for options see Dataflow Locations)
The job executed here will spawn a single Dataflow worker to consume the change stream.