From 65994b1275db03da8d6a6728ad2173776ccb7e74 Mon Sep 17 00:00:00 2001 From: yangw Date: Wed, 14 Jun 2017 16:11:03 +0800 Subject: [PATCH] move udfpredictor here --- .gitignore | 7 + scala/udfpredictor/pom.xml | 193 +++++++ .../main/resources/example/udfpredictor/types | 21 + .../udfpredictor/DataframePredictor.scala | 124 +++++ .../udfpredictor/FileStreamProducer.scala | 59 ++ .../example/udfpredictor/Options.scala | 44 ++ .../tutorials/example/udfpredictor/README.md | 526 ++++++++++++++++++ .../StructuredStreamPredictor.scala | 149 +++++ .../example/udfpredictor/Utils.scala | 230 ++++++++ .../example/udfpredictor/create_test_texts.py | 40 ++ 10 files changed, 1393 insertions(+) create mode 100644 scala/udfpredictor/pom.xml create mode 100644 scala/udfpredictor/src/main/resources/example/udfpredictor/types create mode 100644 scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/DataframePredictor.scala create mode 100644 scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/FileStreamProducer.scala create mode 100644 scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/Options.scala create mode 100644 scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/README.md create mode 100644 scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/StructuredStreamPredictor.scala create mode 100644 scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/Utils.scala create mode 100644 scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/create_test_texts.py diff --git a/.gitignore b/.gitignore index d81e363..893cbad 100644 --- a/.gitignore +++ b/.gitignore @@ -94,3 +94,10 @@ metastore_db/ # Download files datasets/ + +# IDEA files +.idea/ +target/ +*.iml +*.pyc +*.log diff --git a/scala/udfpredictor/pom.xml b/scala/udfpredictor/pom.xml new file mode 100644 index 0000000..2e78ebb --- /dev/null +++ b/scala/udfpredictor/pom.xml @@ -0,0 +1,193 @@ + + + 4.0.0 + + com.intel.analytics + udfpredictor + 0.1-SNAPSHOT + + + + central + Maven Repository + https://repo1.maven.org/maven2 + + true + + + false + + + + apache-repo + Apache Repository + https://repository.apache.org/content/repositories/releases + + true + + + false + + + + jboss-repo + JBoss Repository + https://repository.jboss.org/nexus/content/repositories/releases + + true + + + false + + + + sonatype + sonatype repository + https://oss.sonatype.org/content/groups/public/ + + true + + + true + + + + + + 1.7 + 1.7 + 2.11 + 2.11.8 + 2.0.1 + 2.2.4 + 2.1.0 + provided + + + + + org.scala-lang + scala-compiler + ${scala.version} + ${spark-scope} + + + org.scala-lang + scala-reflect + ${scala.version} + ${spark-scope} + + + org.scala-lang + scala-library + ${scala.version} + ${spark-scope} + + + org.scala-lang + scalap + ${scala.version} + ${spark-scope} + + + org.scalatest + scalatest_${scala.major.version} + ${scalatest.version} + ${spark-scope} + + + com.intel.analytics.bigdl + bigdl + 0.2.0-SNAPSHOT + + + org.apache.spark + spark-core_${scala.major.version} + ${spark.version} + ${spark-scope} + + + org.apache.spark + spark-mllib_${scala.major.version} + ${spark.version} + ${spark-scope} + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.0 + + + eclipse-add-source + + add-source + + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + attach-scaladocs + verify + + doc-jar + + + + + ${scala.version} + incremental + true + + -unchecked + -deprecation + -feature + + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + jar-with-dependencies + + + + + package + + single + + + + + + + \ No newline at end of file diff --git a/scala/udfpredictor/src/main/resources/example/udfpredictor/types b/scala/udfpredictor/src/main/resources/example/udfpredictor/types new file mode 100644 index 0000000..468ec9e --- /dev/null +++ b/scala/udfpredictor/src/main/resources/example/udfpredictor/types @@ -0,0 +1,21 @@ +textType, textLabel +alt.atheism, 1 +comp.graphics, 2 +comp.os.ms-windows.misc, 3 +comp.sys.ibm.pc.hardware, 4 +comp.sys.mac.hardware, 5 +comp.windows.x, 6 +misc.forsale, 7 +rec.autos, 8 +rec.motorcycles, 9 +rec.sport.baseball, 10 +rec.sport.hockey, 11 +sci.crypt, 12 +sci.electronics, 13 +sci.med, 14 +sci.space, 15 +soc.religion.christian, 16 +talk.politics.guns, 17 +talk.politics.mideast, 18 +talk.politics.misc, 19 +talk.religion.misc, 20 \ No newline at end of file diff --git a/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/DataframePredictor.scala b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/DataframePredictor.scala new file mode 100644 index 0000000..5a9ce76 --- /dev/null +++ b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/DataframePredictor.scala @@ -0,0 +1,124 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.intel.analytics.tutorials.example.udfpredictor + +import com.intel.analytics.bigdl.example.utils.WordMeta +import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter} +import org.apache.spark.SparkContext +import org.apache.spark.sql.functions._ +import org.apache.log4j.{Level, Logger} +import org.apache.spark.sql.SQLContext + +object DataframePredictor { + + LoggerFilter.redirectSparkInfoLogs() + Logger.getLogger("com.intel.analytics.bigdl.example").setLevel(Level.INFO) + + def main(args: Array[String]): Unit = { + + Utils.localParser.parse(args, TextClassificationUDFParams()).foreach { param => + + val conf = Engine.createSparkConf() + conf.setAppName("Text classification") + .set("spark.task.maxFailures", "1") + val sc = new SparkContext(conf) + Engine.init + + // Create spark session + val spark = new SQLContext(sc) + import spark.implicits._ + + var word2Meta = None: Option[Map[String, WordMeta]] + var word2Index = None: Option[Map[String, Int]] + var word2Vec = None: Option[Map[Float, Array[Float]]] + + val result = Utils.getModel(sc, param) + + val model = result._1 + word2Meta = result._2 + word2Vec = result._3 + val sampleShape = result._4 + + // if not train, load word meta from file + if (word2Meta.isEmpty) { + val word2IndexMap = sc.textFile(s"${param.baseDir}/word2Meta.txt").map(item => { + val tuple = item.stripPrefix("(").stripSuffix(")").split(",") + (tuple(0), tuple(1).toInt) + }).collect() + word2Index = Some(word2IndexMap.toMap) + } else { + // already trained, use existing word meta + val word2IndexMap = collection.mutable.HashMap.empty[String, Int] + for((word, wordMeta) <- word2Meta.get) { + word2IndexMap += (word -> wordMeta.index) + } + word2Index = Some(word2IndexMap.toMap) + } + + // if not train, create word vec + if (word2Vec.isEmpty) { + word2Vec = Some(Utils.getWord2Vec(word2Index.get)) + } + val predict = Utils.genUdf(sc, model, sampleShape, word2Index.get, word2Vec.get) + + // register udf for data frame + val classifierUDF = udf(predict) + + val data = Utils.loadTestData(param.testDir) + + val df = spark.createDataFrame(data) + + // static dataframe + val types = sc.textFile(Utils.getResourcePath("/example/udfpredictor/types")) + .filter(!_.contains("textType")) + .map { line => + val words = line.split(",") + (words(0).trim, words(1).trim.toInt) + }.toDF("textType", "textLabel") + + val classifyDF1 = df.withColumn("textLabel", classifierUDF($"text")) + .select("filename", "text", "textLabel") + classifyDF1.show() + + val filteredDF1 = df.filter(classifierUDF($"text") === 9) + filteredDF1.show() + + val df_join = classifyDF1.join(types, "textLabel") + df_join.show() + + // aggregation + val typeCount = classifyDF1.groupBy($"textLabel").count() + typeCount.show() + + // play with udf in sqlcontext + spark.udf.register("textClassifier", predict) + df.registerTempTable("textTable") + + val classifyDF2 = spark + .sql("SELECT filename, textClassifier(text) AS textType_sql, text " + + "FROM textTable") + classifyDF2.show() + + val filteredDF2 = spark + .sql("SELECT filename, textClassifier(text) AS textType_sql, text " + + "FROM textTable WHERE textClassifier(text) = 9") + filteredDF2.show() + sc.stop() + } + + } + +} diff --git a/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/FileStreamProducer.scala b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/FileStreamProducer.scala new file mode 100644 index 0000000..d0dec53 --- /dev/null +++ b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/FileStreamProducer.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.intel.analytics.tutorials.example.udfpredictor + +import com.intel.analytics.tutorials.example.udfpredictor.Utils._ +import com.intel.analytics.bigdl.utils.LoggerFilter +import org.apache.spark.sql.SparkSession +import org.apache.log4j.{Level => Levle4j, Logger => Logger4j} +import org.slf4j.{Logger, LoggerFactory} + + +object FileStreamProducer { + + val log: Logger = LoggerFactory.getLogger(this.getClass) + LoggerFilter.redirectSparkInfoLogs() + Logger4j.getLogger("com.intel.analytics.bigdl.optim").setLevel(Levle4j.INFO) + + def main(args: Array[String]): Unit = { + + parquetProducerParser.parse(args, TextProducerParquetParams()).foreach { param => + + val batchSize = param.batchsize + val interval = param.interval + // load messages + val data = Utils.loadTestData(param.srcFolder) + + val spark = SparkSession.builder().appName("Produce Text").getOrCreate() + var send_count = 0 + val batches = data.grouped(batchSize) + batches.foreach { batch => + try { + val df = spark.createDataFrame(batch) + log.info("send text batch " + send_count) + df.write + .format("parquet") + .mode(org.apache.spark.sql.SaveMode.Append) + .save(param.destFolder) + send_count += 1 + Thread.sleep(interval*1000) + } catch { + case e: Exception => log.error("sending batch error", e) + } + } + } + } +} diff --git a/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/Options.scala b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/Options.scala new file mode 100644 index 0000000..e7a389c --- /dev/null +++ b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/Options.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.intel.analytics.tutorials.example.udfpredictor + +import com.intel.analytics.bigdl.example.utils.AbstractTextClassificationParams + +/** + * Text classification udf parameters + */ +case class TextClassificationUDFParams( + override val baseDir: String = "./", + override val maxSequenceLength: Int = 1000, + override val maxWordsNum: Int = 20000, + override val trainingSplit: Double = 0.8, + override val batchSize: Int = 128, + override val embeddingDim: Int = 100, + override val partitionNum: Int = 4, + modelPath: Option[String] = None, + checkpoint: Option[String] = None, + testDir: String = "./") + extends AbstractTextClassificationParams + +/** + * Text parquet producer parameters + */ +case class TextProducerParquetParams( + srcFolder: String = "./", + destFolder: String = "./", + batchsize: Int = 2, + interval: Long = 5) + diff --git a/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/README.md b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/README.md new file mode 100644 index 0000000..b564034 --- /dev/null +++ b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/README.md @@ -0,0 +1,526 @@ +## Summary + This example is to show how to load BigDL model as UDF to perform predictions in Spark SQL/Dataframes and StructuredStreaming (Spark 2.0+ only). + + First use a (pre-trained GloVe embedding) to convert word to vector, + and uses it to train the text classification model on a 20 Newsgroup dataset + with 20 different categories. This model can achieve around 90% accuracy after 2 epochs training. +(It was first described in: https://blog.keras.io/using-pre-trained-word-embeddings-in-a-keras-model.html) + +Then create a UDF to do the text classification with this model, and use this UDF to do the prediction in Spark SQL (spark 1.5, spark 1.6) and Structured Streaming (spark 2.0). +## Data +* Embedding: 100-dimensional pre-trained GloVe embeddings of 400k words which trained on a 2014 dump of English Wikipedia. +* Training data: "20 Newsgroup dataset" which containing 20 categories and with totally 19997 texts. + +## Get the bigdl.sh script and build the this project +```shell +wget https://raw.githubusercontent.com/intel-analytics/BigDL/master/scripts/bigdl.sh +chmod +x bigdl.sh +source bigdl.sh + +cd $PROJECT_DIR +mvn package +``` + +## Steps to run this example: +1. Download [Pre-train GloVe word embeddings](http://nlp.stanford.edu/data/glove.6B.zip) + + ```shell + wget http://nlp.stanford.edu/data/glove.6B.zip + unzip -q glove.6B.zip -d glove.6B + ``` + +2. Download [20 Newsgroup dataset](http://www.cs.cmu.edu/afs/cs.cmu.edu/project/theo-20/www/data/news20.html) as the training data + + ```shell + wget http://www.cs.cmu.edu/afs/cs.cmu.edu/project/theo-20/www/data/news20.tar.gz + tar zxf news20.tar.gz + ``` + +3. Put those data under BASE_DIR, and the final structure would look like this: + + ``` + [~/textclassification]$ tree . -L 1 + . + ├── 20_newsgroup + └── glove.6B + ``` + +4. Run this command to generate test data for prediction in Structured Streaming: + ```bash + usage: create_test_texts.py [-h] [-s SRC] [-t TEST] + + Create test text files from source + + optional arguments: + -h, --help show this help message and exit + -s SRC, --src SRC source directory + -t TEST, --test TEST test directory + + Example: python ./create_test_texts.py -s BASE_DIR/20_newsgroup -t BASE_DIR/test + + ``` + +5. Run Spark SQL example + +* Training the model and make predictions. + + Run the commands: + + If you do not have the pre-trained model, you need to use this command to train the model and use this model to predict text classification of text records with UDF. + + Example: + + ```shell + BASE_DIR=${PWD} # where is the data + PROJECT_DIR=${PWD} + MASTER=local[*] # the master url + ./bigdl.sh -- $SPARK_HOME/bin/spark-submit --master $MASTER --driver-memory 20g \ + --class com.intel.analytics.tutorials.example.udfpredictor.DataframePredictor \ + $PROJECT_DIR/target/tutorial-example-0.1-SNAPSHOT-jar-with-dependencies.jar \ + --batchSize 32 \ + --baseDir $BASE_DIR \ + --partitionNum 4 \ + --checkpoint $BASE_DIR/model/text \ + --dataDir $BASE_DIR/test + ``` + + In the above commands, + + --batchSize: how many text files to be trained at one time + + --baseDir: folder containing trainning text files and word2Vec embedding. + + --partitionNum: number to partition training data + + --checkpoint: location to save model + + --dataDir: Directory containing the test data + + If you are running spark cluster mode, you also need to set --executor-cores and --total-executor-cores, and the + --batchSize should be a multiple of node_number*core_number. + + Example: + + ```shell + BASE_DIR=${PWD} # where is the data + PROJECT_DIR=${PWD} + MASTER=xxx.xxx.xxx.xxx:xxxx # the master url + ./bigdl.sh -- $SPARK_HOME/bin/spark-submit --master $MASTER --driver-memory 20g \ + --executor-cores 8 \ + --total-executor-cores 32 \ + --class com.intel.analytics.tutorials.example.udfpredictor.DataframePredictor \ + $PROJECT_DIR/target/tutorial-example-0.1-SNAPSHOT-jar-with-dependencies.jar \ + --batchSize 32 \ + --baseDir $BASE_DIR \ + --partitionNum 4 \ + --checkpoint $BASE_DIR/model/text \ + --dataDir $BASE_DIR/test + ``` + + If you have saved model, you need to use this command to predict text classification of text records with UDF. + + Example: + + ```shell + BASE_DIR=${PWD} # where is the data + PROJECT_DIR=${PWD} + MASTER=local[*] # the master url + ./bigdl.sh -- $SPARK_HOME/bin/spark-submit --master $MASTER --driver-memory 5g \ + --class com.intel.analytics.tutorials.example.udfpredictor.DataframePredictor \ + $PROJECT_DIR/target/tutorial-example-0.1-SNAPSHOT-jar-with-dependencies.jar \ + --baseDir $BASE_DIR \ + --modelPath $BASE_DIR/model/text/model.1 \ + --dataDir $BASE_DIR/test + ``` + In the above commands, + + -baseDir: folder containing trainning text files and word2Vec embedding + + --modelPath: model location + + --dataDir: Directory containing the test data + +* Verification + + * Show the predicted label with UDF for text records: + + ``` + val classifyDF1 = df.withColumn("textLabel", classifierUDF($"text")) + .select("filename", "text", "textLabel") + classifyDF1.show() + +--------+--------------------+---------+ + |filename| text|textLabel| + +--------+--------------------+---------+ + | 10014|Xref: cantaloupe....| 3| + | 102615|Newsgroups: rec.s...| 10| + | 102642|Newsgroups: rec.s...| 10| + | 102685|Path: cantaloupe....| 10| + | 102741|Newsgroups: rec.a...| 8| + | 102771|Xref: cantaloupe....| 8| + | 102826|Newsgroups: rec.a...| 8| + | 102970|Newsgroups: rec.a...| 8| + | 102982|Newsgroups: rec.a...| 8| + | 103125|Newsgroups: rec.a...| 8| + | 103329|Path: cantaloupe....| 8| + | 103497|Path: cantaloupe....| 8| + | 103515|Path: cantaloupe....| 8| + | 103781|Xref: cantaloupe....| 8| + | 104333|Newsgroups: rec.m...| 9| + | 104341|Path: cantaloupe....| 9| + | 104381|Newsgroups: rec.m...| 9| + | 104509|Newsgroups: rec.m...| 9| + | 104542|Xref: cantaloupe....| 9| + | 104590|Newsgroups: rec.s...| 10| + +--------+--------------------+---------+ + ``` + + Note: "textLabel" column is the prediction for the text. + + * Filter text label with UDF: + + ``` + val filteredDF1 = df.filter(classifierUDF($"text") === 9) + filteredDF1.show() + +--------+--------------------+ + |filename| text| + +--------+--------------------+ + | 104333|Newsgroups: rec.m...| + | 104341|Path: cantaloupe....| + | 104381|Newsgroups: rec.m...| + | 104509|Newsgroups: rec.m...| + | 104542|Xref: cantaloupe....| + | 104595|Newsgroups: rec.m...| + | 104753|Path: cantaloupe....| + | 104806|Newsgroups: rec.m...| + +--------+--------------------+ + ``` + + * Join the text type table to show the text type name : + + ``` + val df_join = classifyDF1.join(types, "textLabel") + df_join.show() + +--------+--------------------+---------+-------------+ + |filename| text|textLabel| textType| + +--------+--------------------+---------+-------------+ + | 51141|Path: cantaloupe....| 1| alt.atheism| + | 51189|Newsgroups: alt.a...| 1| alt.atheism| + | 51202|Newsgroups: alt.a...| 1| alt.atheism| + | 51313|Newsgroups: alt.a...| 1| alt.atheism| + | 53165|Path: cantaloupe....| 1| alt.atheism| + | 53237|Path: cantaloupe....| 1| alt.atheism| + | 53252|Path: cantaloupe....| 1| alt.atheism| + | 53383|Path: cantaloupe....| 1| alt.atheism| + | 53577|Xref: cantaloupe....| 1| alt.atheism| + | 84114|Xref: cantaloupe....| 1| alt.atheism| + | 15244|Xref: cantaloupe....| 2|comp.graphics| + | 38265|Newsgroups: comp....| 2|comp.graphics| + | 38330|Path: cantaloupe....| 2|comp.graphics| + | 38363|Xref: cantaloupe....| 2|comp.graphics| + | 38600|Xref: cantaloupe....| 2|comp.graphics| + | 38684|Newsgroups: comp....| 2|comp.graphics| + | 38766|Newsgroups: comp....| 2|comp.graphics| + | 38865|Path: cantaloupe....| 2|comp.graphics| + | 38958|Newsgroups: comp....| 2|comp.graphics| + | 38999|Path: cantaloupe....| 2|comp.graphics| + +--------+--------------------+---------+-------------+ + ``` + + * Do the aggregation of data frame with predicted text label: + + ``` + val typeCount = classifyDF1.groupBy($"textLabel").count() + typeCount.show() + + +---------+-----+ + | 1| 10| + | 2| 11| + | 3| 11| + | 4| 10| + | 5| 10| + | 6| 9| + | 7| 11| + | 8| 10| + | 9| 10| + | 10| 9| + | 11| 10| + | 12| 9| + | 13| 11| + | 14| 10| + | 15| 10| + | 16| 10| + | 17| 10| + | 18| 11| + | 19| 13| + | 20| 5| + +---------+-----+ + ``` + + * Show the predicted label with UDF in Spark SQL: + + ``` + val classifyDF2 = spark + .sql("SELECT filename, textClassifier(text) AS textType_sql, text " + + "FROM textTable") + classifyDF2.show() + + +--------+------------+--------------------+ + |filename|textType_sql| text| + +--------+------------+--------------------+ + | 10014| 3|Xref: cantaloupe....| + | 102615| 10|Newsgroups: rec.s...| + | 102642| 10|Newsgroups: rec.s...| + | 102685| 10|Path: cantaloupe....| + | 102741| 8|Newsgroups: rec.a...| + | 102771| 8|Xref: cantaloupe....| + | 102826| 8|Newsgroups: rec.a...| + | 102970| 8|Newsgroups: rec.a...| + | 102982| 8|Newsgroups: rec.a...| + | 103125| 8|Newsgroups: rec.a...| + | 103329| 8|Path: cantaloupe....| + | 103497| 8|Path: cantaloupe....| + | 103515| 8|Path: cantaloupe....| + | 103781| 8|Xref: cantaloupe....| + | 104333| 9|Newsgroups: rec.m...| + | 104341| 9|Path: cantaloupe....| + | 104381| 9|Newsgroups: rec.m...| + | 104509| 9|Newsgroups: rec.m...| + | 104542| 9|Xref: cantaloupe....| + | 104590| 10|Newsgroups: rec.s...| + +--------+------------+--------------------+ + ``` + + * Filter text label with UDF for incoming stream in Spark SQL: + + ``` + val filteredDF2 = spark + .sql("SELECT filename, textClassifier(text) AS textType_sql, text " + + "FROM textTable WHERE textClassifier(text) = 9") + filteredDF2.show() + +--------+------------+--------------------+ + |filename|textType_sql| text| + +--------+------------+--------------------+ + | 104333| 9|Newsgroups: rec.m...| + | 104341| 9|Path: cantaloupe....| + | 104381| 9|Newsgroups: rec.m...| + | 104509| 9|Newsgroups: rec.m...| + | 104542| 9|Xref: cantaloupe....| + | 104595| 9|Newsgroups: rec.m...| + | 104753| 9|Path: cantaloupe....| + | 104806| 9|Newsgroups: rec.m...| + +--------+------------+--------------------+ + ``` + +6. Run Structured Streaming example + + Note: To run this example, you spark version must be equal or higher than spark 2.0 + +* Start the consumer to subscribe the text streaming and do prediction with UDF. + + Run the commands: + + If you do not have the pre-trained model, you need to use this command to train the model and use this model to predict text classification of incoming text streaming with UDF. + + Example: + + ```shell + BASE_DIR=${PWD} # where is the data + PROJECT_DIR=${PWD} + MASTER=local[*] # the master url + ./bigdl.sh -- $SPARK_HOME/bin/spark-submit --master $MASTER --driver-memory 20g \ + --class com.intel.analytics.tutorials.example.udfpredictor.StructuredStreamPredictor \ + $PROJECT_DIR/target/tutorial-example-0.1-SNAPSHOT-jar-with-dependencies.jar \ + --batchSize 32 \ + --baseDir $BASE_DIR \ + --partitionNum 4 \ + --checkpoint $BASE_DIR/model/text \ + --dataDir $BASE_DIR/data/text/parquet + ``` + + In the above commands, + + --batchSize: how many text files to be trained at one time + + --baseDir: folder containing training text files and word2Vec embedding. + + --partitionNum: number to partition training data + + --checkpoint: location to save model + + --dataDir: Directory to subscribe + + If you are running spark cluster mode, you also need to set --executor-cores and --total-executor-cores, and the + --batchSize should be a multiple of node_number*core_number. + + Example: + + ```shell + BASE_DIR=${PWD} # where is the data + PROJECT_DIR=${PWD} + MASTER=xxx.xxx.xxx.xxx:xxxx # the master url + ./bigdl.sh -- $SPARK_HOME/bin/spark-submit --master $MASTER --driver-memory 20g \ + --executor-cores 8 \ + --total-executor-cores 32 \ + --class com.intel.analytics.tutorials.example.udfpredictor.StructuredStreamPredictor \ + $PROJECT_DIR/target/tutorial-example-0.1-SNAPSHOT-jar-with-dependencies.jar \ + --batchSize 32 \ + --baseDir $BASE_DIR \ + --partitionNum 4 \ + --checkpoint $BASE_DIR/model/text \ + --dataDir $BASE_DIR/data/text/parquet + ``` + + If you have saved model, you need to use this command to predict text classification of incoming text streaming with UDF. + + Example: + + ```shell + BASE_DIR=${PWD} # where is the data, please modify it accordingly + PROJECT_DIR=${PWD} + MASTER=loca[*] # the master url, please modify it accordingly + ./bigdl.sh -- $SPARK_HOME/bin/spark-submit --master MASTER --driver-memory 5g \ + --class com.intel.analytics.tutorials.example.udfpredictor.StructuredStreamPredictor \ + $PROJECT_DIR/tutorial-example-0.1-SNAPSHOT-jar-with-dependencies.jar \ + --baseDir $BASE_DIR \ + --modelPath $BASE_DIR/model/text/model.1 \ + --dataDir $BASE_DIR/data/text/parquet + ``` + + In the above commands, + + --baseDir: folder containing training text files and word2Vec embedding + + --modelPath: model location + + --dataDir: Directory to subscribe + +* Run this command to publish text data to the target directory: + + Example: + ``` + BASE_DIR=${PWD} # where is the data + PROJECT_DIR=${PWD} + MASTER=local[*] # the master url + $SPARK_HOME/bin/spark-submit --master $MASTER \ + --class com.intel.analytics.tutorials.example.udfpredictor.FileStreamProducer \ + $PROJECT_DIR/target/tutorial-example-0.1-SNAPSHOT-jar-with-dependencies.jar \ + -s $BASE_DIR/test \ + -d $BASE_DIR/data/text/parquet \ + -b 4 \ + -i 5 + ``` + In the above commands + + -s: source folder containing text files to to be published + + -d: target directory to be published to + + -i: publish interval in second + + -b: how many text files to be published at one time + +* Verification + + * Show the predicted label with UDF for text records: + + ``` + val classifyDF1 = df.withColumn("textLabel", classiferUDF($"text")) + .select("fileName", "textLabel", "text") + val classifyQuery1 = classifyDF1.writeStream + .format("console") + .start() + +--------+--------------------+---------+ + |fileName| text|textLabel| + +--------+--------------------+---------+ + | 100521|Path: cantaloupe....| 10| + | 101551|Path: cantaloupe....| 8| + | 101552|Newsgroups: rec.a...| 8| + | 101553|Xref: cantaloupe....| 8| + +--------+--------------------+---------+ + ``` + Note: "textLabel" column is the prediction for the text. + + * Filter text label with UDF in stream: + + ``` + val filteredDF1 = df.filter(classiferUDF($"text") === 8) + val filteredQuery1 = filteredDF1.writeStream + .format("console") + .start() + +--------------------+--------+ + | text|filename| + +--------------------+--------+ + |Path: cantaloupe....| 101551| + |Newsgroups: rec.a...| 101552| + |Xref: cantaloupe....| 101553| + +--------------------+--------+ + ``` + + * Join the static text type table with stream to show the text type name : + + ``` + val df_join = classifyDF1.join(types, "textLabel") + val classifyQuery_join = df_join.writeStream + .format("console") + .start() + +---------+--------+--------------------+------------------+ + |textLabel|fileName| text| textType| + +---------+--------+--------------------+------------------+ + | 10| 100521|Path: cantaloupe....|rec.sport.baseball| + | 8| 101551|Path: cantaloupe....| rec.autos| + | 8| 101552|Newsgroups: rec.a...| rec.autos| + | 8| 101553|Xref: cantaloupe....| rec.autos| + +---------+--------+--------------------+------------------+ + ``` + + * Do the aggregation of stream with predicted text label: + + ``` + val typeCount = classifyDF1.groupBy($"textLabel").count() + val aggQuery = typeCount.writeStream + .outputMode("complete") + .format("console") + .start() + + +---------+-----+ + |textLabel|count| + +---------+-----+ + | 8| 3| + | 10| 1| + +---------+-----+ + ``` + + * Show the predicted label with UDF for incoming stream in Spark SQL: + + ``` + val classifyDF2 = spark + .sql("SELECT fileName, textClassifier(text) AS textType_sql, text FROM textTable") + val classifyQuery2 = classifyDF2.writeStream + .format("console") + .start() + + +--------+------------+--------------------+ + |fileName|textType_sql| text| + +--------+------------+--------------------+ + | 101725| 9|Path: cantaloupe....| + | 102151| 10|Path: cantaloupe....| + | 102584| 10|Path: cantaloupe....| + | 102585| 10|Newsgroups: rec.s...| + +--------+------------+--------------------+ + ``` + + * Filter text label with UDF for incoming stream in Spark SQL: + + ``` + val filteredDF2 = spark + .sql("SELECT fileName, textClassifier(text) AS textType_sql, text " + + "FROM textTable WHERE textClassifier(text) = 9") + val filteredQuery2 = filteredDF2.writeStream + .format("console") + .start() + +--------+------------+--------------------+ + |fileName|textType_sql| text| + +--------+------------+--------------------+ + | 101725| 9|Path: cantaloupe....| + +--------+------------+--------------------+ + ``` \ No newline at end of file diff --git a/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/StructuredStreamPredictor.scala b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/StructuredStreamPredictor.scala new file mode 100644 index 0000000..ab13a86 --- /dev/null +++ b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/StructuredStreamPredictor.scala @@ -0,0 +1,149 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.intel.analytics.tutorials.example.udfpredictor + +import com.intel.analytics.bigdl.example.utils.WordMeta +import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.StructType +import org.apache.log4j.{Level, Logger} + +object StructuredStreamPredictor { + + LoggerFilter.redirectSparkInfoLogs() + Logger.getLogger("com.intel.analytics.bigdl.example").setLevel(Level.INFO) + + // import Options._ + + def main(args: Array[String]): Unit = { + + Utils.localParser.parse(args, TextClassificationUDFParams()).foreach { param => + + // Create spark session + val sparkConf = Engine.createSparkConf() + sparkConf.setAppName("Text classification") + .set("spark.akka.frameSize", 64.toString) + val spark = SparkSession + .builder + .config(sparkConf) + .getOrCreate() + Engine.init + val sc = spark.sparkContext + + var word2Meta = None: Option[Map[String, WordMeta]] + var word2Index = None: Option[Map[String, Int]] + var word2Vec = None: Option[Map[Float, Array[Float]]] + + val result = Utils.getModel(sc, param) + + val model = result._1 + word2Meta = result._2 + word2Vec = result._3 + val sampleShape = result._4 + + // if not train, load word meta from file + if (word2Meta.isEmpty) { + val word2IndexMap = sc.textFile(s"${param.baseDir}/word2Meta.txt").map(item => { + val tuple = item.stripPrefix("(").stripSuffix(")").split(",") + (tuple(0), tuple(1).toInt) + }).collect() + word2Index = Some(word2IndexMap.toMap) + } else { + // already trained, use existing word meta + val word2IndexMap = collection.mutable.HashMap.empty[String, Int] + for((word, wordMeta) <- word2Meta.get) { + word2IndexMap += (word -> wordMeta.index) + } + word2Index = Some(word2IndexMap.toMap) + } + + // if not train, create word vec + if (word2Vec.isEmpty) { + word2Vec = Some(Utils.getWord2Vec(word2Index.get)) + } + val predict = Utils.genUdf(sc, model, sampleShape, word2Index.get, word2Vec.get) + + // register udf for data frame + val classifierUDF = udf(predict) + + val textSchema = new StructType().add("filename", "string").add("text", "string") + // stream dataframe + val df = spark.readStream + .schema(textSchema) + .parquet(param.testDir) + + val typeSchema = new StructType().add("textType", "string").add("textLabel", "string") + // static dataframe + val types = spark.read + .format("csv") + .option("header", "true") + .option("mode", "DROPMALFORMED") + .schema(typeSchema) + .csv(Utils.getResourcePath("/example/udfpredictor/types")) + + import spark.implicits._ + + val classifyDF1 = df.withColumn("textLabel", classifierUDF($"text")) + .select("fileName", "text", "textLabel") + val classifyQuery1 = classifyDF1.writeStream + .format("console") + .start() + + val df_join = classifyDF1.join(types, "textLabel") + val classifyQuery_join = df_join.writeStream + .format("console") + .start() + + val filteredDF1 = df.filter(classifierUDF($"text") === 8) + val filteredQuery1 = filteredDF1.writeStream + .format("console") + .start() + + // aggregation + val typeCount = classifyDF1.groupBy($"textLabel").count() + val aggQuery = typeCount.writeStream + .outputMode("complete") + .format("console") + .start() + + // play with udf in sqlcontext + spark.udf.register("textClassifier", predict) + df.createOrReplaceTempView("textTable") + + val classifyDF2 = spark + .sql("SELECT fileName, textClassifier(text) AS textType_sql, text FROM textTable") + val classifyQuery2 = classifyDF2.writeStream + .format("console") + .start() + + val filteredDF2 = spark + .sql("SELECT fileName, textClassifier(text) AS textType_sql, text " + + "FROM textTable WHERE textClassifier(text) = 9") + val filteredQuery2 = filteredDF2.writeStream + .format("console") + .start() + + classifyQuery1.awaitTermination() + classifyQuery_join.awaitTermination() + filteredQuery1.awaitTermination() + aggQuery.awaitTermination() + classifyQuery2.awaitTermination() + filteredQuery2.awaitTermination() + sc.stop() + } + } +} diff --git a/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/Utils.scala b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/Utils.scala new file mode 100644 index 0000000..e02051d --- /dev/null +++ b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/Utils.scala @@ -0,0 +1,230 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.analytics.tutorials.example.udfpredictor + +import java.io.{File, InputStream, PrintWriter} + +import com.intel.analytics.bigdl.example.utils.WordMeta +import com.intel.analytics.bigdl.example.utils.TextClassifier +import com.intel.analytics.bigdl.models.utils.ModelBroadcast +import com.intel.analytics.bigdl.nn.abstractnn.{AbstractModule, Activity} +import com.intel.analytics.bigdl.tensor.{Storage, Tensor} +import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric +import com.intel.analytics.bigdl.nn.Module +import org.apache.spark.SparkContext + +import scala.io.Source +import scopt.OptionParser + + +object Utils { + + type Model = AbstractModule[Activity, Activity, Float] + type Word2Meta = Map[String, WordMeta] + type Word2Index = Map[String, Int] + type Word2Vec = Map[Float, Array[Float]] + type SampleShape = Array[Int] + type TFP = TextClassificationUDFParams + + case class Sample(filename: String, text: String) + + private var textClassification: TextClassifier = null + + def getTextClassifier(param: TFP): TextClassifier = { + if (textClassification == null) { + textClassification = new TextClassifier(param) + } + textClassification + } + + def getModel(sc: SparkContext, param: TFP): (Model, Option[Word2Meta], + Option[Word2Vec], SampleShape) = { + val textClassification = getTextClassifier(param) + if (param.modelPath.isDefined) { + (Module.load[Float](param.modelPath.get), + None, + None, + Array(param.maxSequenceLength, param.embeddingDim)) + } else { + // get train and validation rdds + val (rdds, word2Meta, word2Vec) = textClassification.getData(sc) + // save word2Meta for later generate vectors + val word2Index = word2Meta.mapValues[Int]((wordMeta: WordMeta) => wordMeta.index) + sc.parallelize(word2Index.toSeq).saveAsTextFile(s"${param.baseDir}/word2Meta.txt") + // train + val trainedModel = textClassification.trainFromData(sc, rdds) + // after training, save model + if (param.checkpoint.isDefined) { + trainedModel.save(s"${param.checkpoint.get}/model.1", overWrite = true) + } + + (trainedModel.evaluate(), + Some(word2Meta), + Some(word2Vec), + Array(param.maxSequenceLength, param.embeddingDim)) + } + } + + def getWord2Vec(word2Index: Map[String, Int]): Map[Float, Array[Float]] = { + val word2Vec = textClassification.buildWord2VecWithIndex(word2Index) + word2Vec + } + + def genUdf(sc: SparkContext, + model: Model, + sampleShape: Array[Int], + word2Index: Word2Index, + word2Vec: Word2Vec) + (implicit ev: TensorNumeric[Float]): (String) => Int = { + + val broadcastModel = ModelBroadcast[Float].broadcast(sc, model) + val word2IndexBC = sc.broadcast(word2Index) + val word2VecBC = sc.broadcast(word2Vec) + + val udf = (text: String) => { + val sequenceLen = sampleShape(0) + val embeddingDim = sampleShape(1) + val word2Meta = word2IndexBC.value + val word2Vec = word2VecBC.value + // first to tokens + val tokens = text.replaceAll("[^a-zA-Z]", " ") + .toLowerCase().split("\\s+").filter(_.length > 2).map { word: String => + if (word2Meta.contains(word)) { + Some(word2Meta(word).toFloat) + } else { + None + } + }.flatten + + // shaping + val paddedTokens = if (tokens.length > sequenceLen) { + tokens.slice(tokens.length - sequenceLen, tokens.length) + } else { + tokens ++ Array.fill[Float](sequenceLen - tokens.length)(0) + } + + val data = paddedTokens.map { word: Float => + if (word2Vec.contains(word)) { + word2Vec(word) + } else { + // Treat it as zeros if cannot be found from pre-trained word2Vec + Array.fill[Float](embeddingDim)(0) + } + }.flatten + + val featureTensor: Tensor[Float] = Tensor[Float]() + var featureData: Array[Float] = null + val sampleSize = sampleShape.product + val localModel = broadcastModel.value + + // create tensor from input column + if (featureData == null) { + featureData = new Array[Float](1 * sampleSize) + } + Array.copy(data.map(ev.fromType(_)), 0, + featureData, 0, sampleSize) + featureTensor.set(Storage[Float](featureData), sizes = Array(1) ++ sampleShape) + val tensorBuffer = featureTensor.transpose(2, 3) + + // predict + val output = localModel.forward(tensorBuffer).toTensor[Float] + val predict = if (output.dim == 2) { + output.max(2)._2.squeeze().storage().array() + } else if (output.dim == 1) { + output.max(1)._2.squeeze().storage().array() + } else { + throw new IllegalArgumentException + } + ev.toType[Int](predict(0)) + } + + udf + } + + def loadTestData(testDir: String): IndexedSeq[Sample] = { + val fileList = new File(testDir).listFiles() + .filter(_.isFile).filter(_.getName.forall(Character.isDigit)).sorted + + val testData = fileList.map { file => { + val fileName = file.getName + val source = Source.fromFile(file, "ISO-8859-1") + val text = try source.getLines().toList.mkString("\n") finally source.close() + Sample(fileName, text) + } + } + testData + } + + def getResourcePath(resource: String): String = { + val stream: InputStream = getClass.getResourceAsStream(resource) + val lines = scala.io.Source.fromInputStream(stream).mkString + val file = File.createTempFile(resource, "") + val pw = new PrintWriter(file) + pw.write(lines) + pw.close() + file.getAbsolutePath + } + + val localParser = new OptionParser[TextClassificationUDFParams]("BigDL Example") { + opt[String]('b', "baseDir") + .text("Base dir containing the training and word2Vec data") + .action((x, c) => c.copy(baseDir = x)) + opt[String]('p', "partitionNum") + .text("you may want to tune the partitionNum if run into spark mode") + .action((x, c) => c.copy(partitionNum = x.toInt)) + opt[String]('s', "maxSequenceLength") + .text("maxSequenceLength") + .action((x, c) => c.copy(maxSequenceLength = x.toInt)) + opt[String]('w', "maxWordsNum") + .text("maxWordsNum") + .action((x, c) => c.copy(maxWordsNum = x.toInt)) + opt[String]('l', "trainingSplit") + .text("trainingSplit") + .action((x, c) => c.copy(trainingSplit = x.toDouble)) + opt[String]('z', "batchSize") + .text("batchSize") + .action((x, c) => c.copy(batchSize = x.toInt)) + opt[String]("modelPath") + .text("where to load the model") + .action((x, c) => c.copy(modelPath = Some(x))) + opt[String]("checkpoint") + .text("where to load the model") + .action((x, c) => c.copy(checkpoint = Some(x))) + opt[String]('f', "dataDir") + .text("Text dir containing the text data") + .action((x, c) => c.copy(testDir = x)) + } + + val parquetProducerParser + = new OptionParser[TextProducerParquetParams]("BigDL Streaming Example") { + opt[String]('s', "srcFolder") + .required() + .text("Base dir containing the text data") + .action((x, c) => c.copy(srcFolder = x)) + opt[String]('d', "destFolder") + .required() + .text("Destination parquet dir containing the text data") + .action((x, c) => c.copy(destFolder = x)) + opt[Int]('b', "batchsize") + .text("produce batchsize") + .action((x, c) => c.copy(batchsize = x)) + opt[Long]('i', "interval") + .text("produce interval") + .action((x, c) => c.copy(interval = x)) + } + +} diff --git a/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/create_test_texts.py b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/create_test_texts.py new file mode 100644 index 0000000..0a0fb05 --- /dev/null +++ b/scala/udfpredictor/src/main/scala/com/intel/analytics/tutorials/example/udfpredictor/create_test_texts.py @@ -0,0 +1,40 @@ +import os +import argparse +import commands + +def file_exists(file_path): + if "No such file or directory" in commands.getoutput('ls %s' % file_path): + return False + return True + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Create test text files from source') + parser.add_argument( + '-s', '--src', type=str, + help="source directory") + parser.add_argument( + '-t', '--test', type=str, + help="test directory", + ) + + args = parser.parse_args() + src_dir = args.src + test_dir = args.test + if not file_exists(test_dir): + commands.getoutput("mkdir %s" % (test_dir)) + + count = 0 + for dir in os.listdir(src_dir): + for file in os.listdir(src_dir + '/' + dir): + if count < 10: + # copy file + dest_file_path = test_dir + '/'+ file + if file_exists(dest_file_path): + continue + else: + commands.getoutput('cp %s %s' % (src_dir + '/' + dir + '/' + file, dest_file_path)) + count += 1 + else: + count = 0 + break