-
Notifications
You must be signed in to change notification settings - Fork 35
/
SimpleKustoDataSink.scala
42 lines (38 loc) · 1.58 KB
/
SimpleKustoDataSink.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import com.microsoft.kusto.spark.datasink.KustoSinkOptions
import com.microsoft.kusto.spark.datasource.KustoSourceOptions
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object SimpleKustoDataSink {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "Your HADOOP_HOME")
val sparkConf = new SparkConf()
.set("spark.testing", "true")
.set("spark.ui.enabled", "false")
.setAppName("SimpleKustoDataSink")
.setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val conf: Map[String, String] = Map(
KustoSourceOptions.KUSTO_AAD_APP_ID -> "Your Client ID",
KustoSourceOptions.KUSTO_AAD_APP_SECRET -> "Your secret",
KustoSourceOptions.KUSTO_QUERY -> "Your Kusto query")
// Create a DF - read from a Kusto cluster
val df = sparkSession.read.kusto(
"Your Kusto Cluster",
"Your Kusto Database",
"Your Kusto Query in KustoOptions.Kusto_Query",
conf)
df.show
// Now write to a Kusto table
df.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoSinkOptions.KUSTO_CLUSTER, "Your Kusto Cluster")
.option(KustoSinkOptions.KUSTO_DATABASE, "Your Kusto Database")
.option(KustoSinkOptions.KUSTO_TABLE, "Your Kusto Destination Table")
.option(KustoSinkOptions.KUSTO_AAD_APP_ID, "Your Client ID")
.option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, "Your secret")
.mode(SaveMode.Append)
.save()
sparkSession.stop
}
}