Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create examples for the Bigtable Spark connector in Java, Scala, and Python #8442

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bigtable/spark-connector-preview/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Examples for Cloud Bigtable Apache Spark connector Private Preview

This project contains sample code to use the Spark Bigtable connector in
different languages. You can refer to the following subdirectories
to access the example for each of the languages, as well as commands
needed to run the examples using Dataproc:

1. `java-maven/`
2. `scala-sbt/`
3. `python/`
88 changes: 88 additions & 0 deletions bigtable/spark-connector-preview/java-maven/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Spark Bigtable Example Using Java and Maven

This example uses Java and Maven for package management to write data
to a Bigtable table and read it back.

## Compiling the project

To compile the code, you can run
the following command (after installing Maven) from inside the current
directory:

```
mvn clean install
```

The target JAR will be located under
`target/spark-bigtable-example-0.0.1-SNAPSHOT.jar`.

## Running the example using Dataproc

To submit the JAR to Dataproc, you will need a Bigtable project and
instance ID, as well as a Bigtable table name, which will be the three required
arguments. By default, a new table is created by the application, but you can
provide an optional fourth arguemnt `false` for `createNewTable` (assuming
that you have already created a table with the column family `example_family`).

To run the JAR using dataproc, you can run the following command:

```
gcloud dataproc jobs submit spark \
--cluster=$SPARK_BIGTABLE_DATAPROC_CLUSTER \
--region=$SPARK_BIGTABLE_DATAPROC_REGION \
--class=spark.bigtable.example.WordCount \
--jars=target/spark-bigtable-example-0.0.1-SNAPSHOT.jar \
-- \
$SPARK_BIGTABLE_PROJECT_ID \
$SPARK_BIGTABLE_INSTANCE_ID \
$SPARK_BIGTABLE_TABLE_NAME
```

## Expected output

The following text should be shown in the output of the Spark job.

```
Reading the DataFrame from Bigtable:
+-----+-----+
|count| word|
+-----+-----+
| 0|word0|
| 1|word1|
| 2|word2|
| 3|word3|
| 4|word4|
| 5|word5|
| 6|word6|
| 7|word7|
| 8|word8|
| 9|word9|
+-----+-----+
```


To verify that the data has been written to Bigtable, you can run the following
command (requires [cbt CLI](https://cloud.google.com/bigtable/docs/cbt-overview)):

```
cbt -project=$SPARK_BIGTABLE_PROJECT_ID -instance=$SPARK_BIGTABLE_INSTANCE_ID \
read $SPARK_BIGTABLE_TABLE_NAME
```

With this expected output:
```
----------------------------------------
word0
example_family:countCol @ 2023/07/11-16:05:59.596000
"\x00\x00\x00\x00"

----------------------------------------
word1
example_family:countCol @ 2023/07/11-16:05:59.611000
"\x00\x00\x00\x01"

----------------------------------------
.
.
.
```
108 changes: 108 additions & 0 deletions bigtable/spark-connector-preview/java-maven/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2022 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.google.cloud.spark.bigtable</groupId>
<artifactId>spark-bigtable-example</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>

<properties>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.1.2</spark.version>
<reload4j.version>1.7.36</reload4j.version>
<spark.bigtable.version>0.0.1-preview5-SNAPSHOT</spark.bigtable.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.cloud.spark.bigtable</groupId>
<artifactId>spark-bigtable</artifactId>
<version>${spark.bigtable.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>${reload4j.version}</version>
</dependency>
</dependencies>

<repositories>
<repository>
<id>artifact-registry</id>
<url>artifactregistry://us-central1-maven.pkg.dev/cloud-bigtable-ecosystem/spark-bigtable-connector-preview</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<build>
<extensions>
<extension>
<groupId>com.google.cloud.artifactregistry</groupId>
<artifactId>artifactregistry-maven-wagon</artifactId>
<version>2.2.0</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>spark.bigtable.example.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.bigtable.example;

import spark.bigtable.example.model.TestRow;

import java.util.ArrayList;
import java.util.Arrays;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class WordCount {
private static SparkSession spark;
private static String projectId;
private static String instanceId;
private static String tableName;
private static String createNewTable = "true";

private static void parseArguments(String[] args) throws IllegalArgumentException {
if (args.length < 3) {
throw new IllegalArgumentException(
"Arguments Bigtable project ID, instance ID, " +
"and table name must be specified");
}
projectId = args[0];
instanceId = args[1];
tableName = args[2];
if (args.length > 3) {
createNewTable = args[3];
}
}

public static void main(String[] args) throws IllegalArgumentException {
parseArguments(args);

spark = SparkSession.builder().getOrCreate();

Dataset<Row> df = createTestDataFrame();
System.out.println("Created the DataFrame:");
df.show();

String catalog = "{" +
"\"table\":{\"namespace\":\"default\", \"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"int\"}" +
"}}".replaceAll("\\s+", "");

writeDataframeToBigtable(df, catalog, createNewTable);
System.out.println("DataFrame was written to Bigtable.");

Dataset<Row> readDf = readDataframeFromBigtable(catalog);

System.out.println("Reading the DataFrame from Bigtable:");
readDf.show();
}

private static Dataset<Row> createTestDataFrame() {
ArrayList<TestRow> rows = new ArrayList<>();
for (int i = 0; i < 10; i++) {
rows.add(new TestRow(String.format("word%d", i), i));
}
Dataset<Row> df = spark.createDataset(
rows,
Encoders.bean(TestRow.class))
.toDF();

return df;
}

private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}

private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
}
Loading