Skip to content

Commit

Permalink
Merge pull request #1021 from allwefantasy/TRY
Browse files Browse the repository at this point in the history
add grammar validate
  • Loading branch information
allwefantasy authored Apr 12, 2019
2 parents ce88b51 + d94ded9 commit 667a80f
Show file tree
Hide file tree
Showing 17 changed files with 274 additions and 136 deletions.
82 changes: 12 additions & 70 deletions streamingpro-mlsql/src/main/java/streaming/dsl/ScriptSQLExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ import streaming.log.{Logging, WowLog}
import streaming.parser.lisener.BaseParseListenerextends
import tech.mlsql.dsl.CommandCollection
import tech.mlsql.dsl.adaptor.PreProcessIncludeListener
import tech.mlsql.dsl.processor.PreProcessListener
import tech.mlsql.dsl.processor.{AuthProcessListener, GrammarProcessListener, PreProcessListener}

import scala.collection.mutable.ArrayBuffer


/**
Expand Down Expand Up @@ -66,7 +65,12 @@ object ScriptSQLExec extends Logging with WowLog {
def unset = mlsqlExecuteContext.remove()


def parse(input: String, listener: DSLSQLListener, skipInclude: Boolean = true, skipAuth: Boolean = true, skipPhysicalJob: Boolean = false) = {
def parse(input: String, listener: DSLSQLListener,
skipInclude: Boolean = true,
skipAuth: Boolean = true,
skipPhysicalJob: Boolean = false,
skipGrammarValidate: Boolean = true
) = {
//preprocess some statements e.g. include

var wow = input
Expand Down Expand Up @@ -95,7 +99,11 @@ object ScriptSQLExec extends Logging with WowLog {
sqel.preProcessListener = Some(preProcessListener)
_parse(wow, preProcessListener)
wow = preProcessListener.toScript


if (!skipGrammarValidate) {
_parse(wow, new GrammarProcessListener(sqel))
}

if (!skipAuth) {

val staticAuthImpl = sqel.sparkSession
Expand Down Expand Up @@ -229,75 +237,9 @@ class ScriptSQLExecListener(val _sparkSession: SparkSession, val _defaultPathPre

}

class GrammarProcessListener(_sparkSession: SparkSession, _defaultPathPrefix: String, _allPathPrefix: Map[String, String]) extends ScriptSQLExecListener(_sparkSession, _defaultPathPrefix, _allPathPrefix) {
def this() {
this(null, null, null)
}

override def exitSql(ctx: SqlContext): Unit = {
}
}

class AuthProcessListener(val listener: ScriptSQLExecListener) extends BaseParseListenerextends with Logging {

val ENABLE_RUNTIME_SELECT_AUTH = listener.sparkSession
.sparkContext
.getConf
.getBoolean("spark.mlsql.enable.runtime.select.auth", false)

private val _tables = MLSQLTableSet(ArrayBuffer[MLSQLTable]())

def addTable(table: MLSQLTable) = {
_tables.tables.asInstanceOf[ArrayBuffer[MLSQLTable]] += table
}

def withDBs = {
_tables.tables.filter(f => f.db.isDefined)
}

def withoutDBs = {
_tables.tables.filterNot(f => f.db.isDefined)
}

def tables() = _tables

override def exitSql(ctx: SqlContext): Unit = {
ctx.getChild(0).getText.toLowerCase() match {
case "load" =>
new LoadAuth(this).auth(ctx)

case "select" =>
if (!ENABLE_RUNTIME_SELECT_AUTH) {
new SelectAuth(this).auth(ctx)
}
case "save" =>
new SaveAuth(this).auth(ctx)

case "connect" =>
new ConnectAuth(this).auth(ctx)

case "create" =>
new CreateAuth(this).auth(ctx)
case "insert" =>
new InsertAuth(this).auth(ctx)
case "drop" =>
new DropAuth(this).auth(ctx)

case "refresh" =>

case "set" =>
new SetAuth(this).auth(ctx)

case "train" | "run" =>

case "register" =>

case _ =>
logInfo(s"receive unknown grammar: [ ${ctx.getChild(0).getText.toLowerCase()} ].")

}
}
}


object ConnectMeta {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import streaming.core.datasource.DataSourceRegistry
import streaming.dsl._
import streaming.dsl.parser.DSLSQLParser._
import streaming.dsl.template.TemplateMerge
import tech.mlsql.dsl.processor.AuthProcessListener

/**
* Created by latincross on 12/27/2018.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.spark.sql.execution.MLSQLAuthParser
import streaming.dsl.parser.DSLSQLLexer
import streaming.dsl.parser.DSLSQLParser._
import streaming.dsl.template.TemplateMerge
import streaming.dsl.{AuthProcessListener, DslTool}
import streaming.dsl.DslTool
import tech.mlsql.dsl.processor.AuthProcessListener


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.spark.sql.execution.MLSQLAuthParser
import streaming.dsl.parser.DSLSQLLexer
import streaming.dsl.parser.DSLSQLParser._
import streaming.dsl.template.TemplateMerge
import streaming.dsl.{AuthProcessListener, DslTool}
import streaming.dsl.DslTool
import tech.mlsql.dsl.processor.AuthProcessListener


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.spark.sql.execution.MLSQLAuthParser
import streaming.dsl.parser.DSLSQLLexer
import streaming.dsl.parser.DSLSQLParser._
import streaming.dsl.template.TemplateMerge
import streaming.dsl.{AuthProcessListener, DslTool}
import streaming.dsl.DslTool
import tech.mlsql.dsl.processor.AuthProcessListener


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ package streaming.dsl.auth
import streaming.core.datasource.{DataAuthConfig, DataSourceRegistry, SourceInfo}
import streaming.dsl.parser.DSLSQLParser._
import streaming.dsl.template.TemplateMerge
import streaming.dsl.{AuthProcessListener, DslTool, ScriptSQLExec}
import streaming.dsl.{DslTool, ScriptSQLExec}
import streaming.log.{Logging, WowLog}
import tech.mlsql.dsl.processor.AuthProcessListener


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ package streaming.dsl.auth
import streaming.core.datasource.{DataAuthConfig, DataSourceRegistry, SourceInfo}
import streaming.dsl.parser.DSLSQLParser._
import streaming.dsl.template.TemplateMerge
import streaming.dsl.{AuthProcessListener, DslTool}
import streaming.dsl.DslTool
import tech.mlsql.dsl.processor.AuthProcessListener


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.spark.sql.execution.MLSQLAuthParser
import streaming.dsl.parser.DSLSQLLexer
import streaming.dsl.parser.DSLSQLParser._
import streaming.dsl.template.TemplateMerge
import streaming.dsl.{AuthProcessListener, DslTool}
import streaming.dsl.DslTool
import tech.mlsql.dsl.processor.AuthProcessListener


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package streaming.dsl.auth

import streaming.dsl.parser.DSLSQLParser._
import streaming.dsl.template.TemplateMerge
import streaming.dsl.{AuthProcessListener, DslTool}
import streaming.dsl.DslTool
import tech.mlsql.dsl.processor.AuthProcessListener


/**
Expand Down
27 changes: 16 additions & 11 deletions streamingpro-mlsql/src/main/java/streaming/log/WowLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@ import scala.collection.mutable.ArrayBuffer
* Created by allwefantasy on 4/9/2018.
*/
trait WowLog {
def format(msg: String) = {
if (ScriptSQLExec.context() != null) {
val context = ScriptSQLExec.context()
s"""[owner] [${context.owner}] [groupId] [${context.groupId}] $msg"""
def format(msg: String, skipPrefix: Boolean = false) = {
if (skipPrefix) {
msg
} else {
s"""[owner] [null] [groupId] [null] $msg"""
if (ScriptSQLExec.context() != null) {
val context = ScriptSQLExec.context()
s"""[owner] [${context.owner}] [groupId] [${context.groupId}] $msg"""
} else {
s"""[owner] [null] [groupId] [null] $msg"""
}
}


}

def wow_format(msg: String) = {
Expand All @@ -45,10 +50,10 @@ trait WowLog {
(e.toString.split("\n") ++ e.getStackTrace.map(f => f.toString)).map(f => format(f)).toSeq.mkString("\n")
}

def format_throwable(e: Throwable) = {
(e.toString.split("\n") ++ e.getStackTrace.map(f => f.toString)).map(f => format(f)).toSeq.mkString("\n")
def format_throwable(e: Throwable, skipPrefix: Boolean = false) = {
(e.toString.split("\n") ++ e.getStackTrace.map(f => f.toString)).map(f => format(f,skipPrefix)).toSeq.mkString("\n")
}

def format_cause(e: Exception) = {
var cause = e.asInstanceOf[Throwable]
while (cause.getCause != null) {
Expand All @@ -57,12 +62,12 @@ trait WowLog {
format_throwable(cause)
}

def format_full_exception(buffer: ArrayBuffer[String], e: Exception) = {
def format_full_exception(buffer: ArrayBuffer[String], e: Exception, skipPrefix: Boolean = true) = {
var cause = e.asInstanceOf[Throwable]
buffer += format_throwable(cause)
buffer += format_throwable(cause,skipPrefix)
while (cause.getCause != null) {
cause = cause.getCause
buffer += "caused by:\n" + format_throwable(cause)
buffer += "caused by:\n" + format_throwable(cause,skipPrefix)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import _root_.streaming.core._
import _root_.streaming.core.strategy.platform.{PlatformManager, SparkRuntime}
import _root_.streaming.dsl.mmlib.algs.tf.cluster.{ClusterSpec, ClusterStatus, ExecutorInfo}
import _root_.streaming.dsl.{MLSQLExecuteContext, ScriptSQLExec, ScriptSQLExecListener}
import _root_.streaming.log.WowLog
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import net.csdn.annotation.rest.{At, _}
Expand All @@ -40,6 +41,7 @@ import org.joda.time.format.ISODateTimeFormat
import tech.mlsql.job.{JobManager, MLSQLJobInfo, MLSQLJobType}

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

/**
Expand All @@ -57,7 +59,7 @@ import scala.util.control.NonFatal
"""),
servers = Array()
)
class RestController extends ApplicationController {
class RestController extends ApplicationController with WowLog {

// mlsql script execute api, support async and sysn
// begin -------------------------------------------
Expand All @@ -79,7 +81,8 @@ class RestController extends ApplicationController {
new Parameter(name = "async", required = false, description = "If set true ,please also provide a callback url use `callback` parameter and the job will run in background and the API will return. default: false", `type` = "boolean", allowEmptyValue = false),
new Parameter(name = "callback", required = false, description = "Used when async is set true. callback is a url. default: false", `type` = "string", allowEmptyValue = false),
new Parameter(name = "skipInclude", required = false, description = "disable include statement. default: false", `type` = "boolean", allowEmptyValue = false),
new Parameter(name = "skipAuth", required = false, description = "disable table authorize . default: true", `type` = "boolean", allowEmptyValue = false)
new Parameter(name = "skipAuth", required = false, description = "disable table authorize . default: true", `type` = "boolean", allowEmptyValue = false),
new Parameter(name = "skipGrammarValidate", required = false, description = "validate mlsql grammar. default: true", `type` = "boolean", allowEmptyValue = false)
))
@Responses(Array(
new ApiResponse(responseCode = "200", description = "", content = new Content(mediaType = "application/json",
Expand All @@ -106,20 +109,34 @@ class RestController extends ApplicationController {
JobManager.asyncRun(sparkSession, jobInfo, () => {
try {
val context = createScriptSQLExecListener(sparkSession, jobInfo.groupId)
ScriptSQLExec.parse(param("sql"), context, paramAsBoolean("skipInclude", false), paramAsBoolean("skipAuth", true))

ScriptSQLExec.parse(param("sql"), context,
skipInclude = paramAsBoolean("skipInclude", false),
skipAuth = paramAsBoolean("skipAuth", true),
skipPhysicalJob = paramAsBoolean("skipPhysicalJob", false),
skipGrammarValidate = paramAsBoolean("skipGrammarValidate", true))

outputResult = getScriptResult(context, sparkSession)
htp.post(new Url(param("callback")), Map("stat" -> s"""succeeded""", "res" -> outputResult))
} catch {
case e: Exception =>
e.printStackTrace()
val msg = if (paramAsBoolean("show_stack", false)) e.getStackTrace.map(f => f.toString).mkString("\n") else ""
htp.post(new Url(param("callback")), Map("stat" -> s"""failed""", "msg" -> (e.getMessage + "\n" + msg)))
val msgBuffer = ArrayBuffer[String]()
if (paramAsBoolean("show_stack", false)) {
format_full_exception(msgBuffer, e)
}
htp.post(new Url(param("callback")), Map("stat" -> s"""failed""", "msg" -> (e.getMessage + "\n" + msgBuffer.mkString("\n"))))
}
})
} else {
JobManager.run(sparkSession, jobInfo, () => {
val context = createScriptSQLExecListener(sparkSession, jobInfo.groupId)
ScriptSQLExec.parse(param("sql"), context, paramAsBoolean("skipInclude", false), paramAsBoolean("skipAuth", true))
ScriptSQLExec.parse(param("sql"), context,
skipInclude = paramAsBoolean("skipInclude", false),
skipAuth = paramAsBoolean("skipAuth", true),
skipPhysicalJob = paramAsBoolean("skipPhysicalJob", false),
skipGrammarValidate = paramAsBoolean("skipGrammarValidate", true)
)
if (!silence) {
outputResult = getScriptResult(context, sparkSession)
}
Expand All @@ -129,8 +146,11 @@ class RestController extends ApplicationController {
} catch {
case e: Exception =>
e.printStackTrace()
val msg = if (paramAsBoolean("show_stack", false)) e.getStackTrace.map(f => f.toString).mkString("\n") else ""
render(500, e.getMessage + "\n" + msg)
val msgBuffer = ArrayBuffer[String]()
if (paramAsBoolean("show_stack", false)) {
format_full_exception(msgBuffer, e)
}
render(500, e.getMessage + "\n" + msgBuffer.mkString("\n"))
}
render(outputResult)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class CommandAdaptor(preProcessListener: PreProcessListener) extends DslAdaptor
case s: CommandContext =>
command = s.getText.substring(1)
case s: SetValueContext =>
var oringinalText = s.getText
val oringinalText = s.getText
parameters += cleanBlockStr(cleanStr(evaluate(oringinalText)))
case s: SetKeyContext =>
parameters += s.getText
Expand All @@ -46,20 +46,18 @@ class CommandAdaptor(preProcessListener: PreProcessListener) extends DslAdaptor

}

if (parameters.size > 0) {
var count = 0
(0 until len).foreach { i =>
if (tempCommand(i) == '{' && i < (len - 1) && tempCommand(i + 1) == '}') {
finalCommand ++= fetchParam(count)
count += 1
} else if (i >= 1 && tempCommand(i - 1) == '{' && tempCommand(i) == '}') {
//
}
else {
finalCommand += tempCommand(i)
}

var count = 0
(0 until len).foreach { i =>
if (tempCommand(i) == '{' && i < (len - 1) && tempCommand(i + 1) == '}') {
finalCommand ++= fetchParam(count)
count += 1
} else if (i >= 1 && tempCommand(i - 1) == '{' && tempCommand(i) == '}') {
//
}
else {
finalCommand += tempCommand(i)
}

}

preProcessListener.addStatement(String.valueOf(finalCommand.toArray))
Expand Down
Loading

0 comments on commit 667a80f

Please sign in to comment.