Skip to content

Commit

Permalink
Merge pull request #1022 from allwefantasy/TRY
Browse files Browse the repository at this point in the history
fix sorl auth
  • Loading branch information
allwefantasy authored Apr 13, 2019
2 parents 667a80f + 23807ef commit ba4661a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package streaming.core.datasource.impl
import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}
import streaming.core.datasource._
import streaming.dsl.mmlib.algs.param.{BaseParams, WowParams}
import streaming.dsl.ScriptSQLExec
import streaming.dsl.{ConnectMeta, DBMappingKey, ScriptSQLExec}

class MLSQLSolr(override val uid: String) extends MLSQLBaseStreamSource with WowParams {

Expand Down Expand Up @@ -69,6 +69,52 @@ class MLSQLSolr(override val uid: String) extends MLSQLBaseStreamSource with Wow
}
}

override def sourceInfo(config: DataAuthConfig): SourceInfo = {

val Array(_dbname, _dbtable) = if (config.path.contains(dbSplitter)) {
config.path.split(dbSplitter, 2)
} else {
Array("", config.path)
}

var table = _dbtable
var dbName = _dbname

val newOptions = scala.collection.mutable.HashMap[String, String]() ++ config.config
ConnectMeta.options(DBMappingKey(shortFormat, _dbname)) match {
case Some(option) =>
dbName = ""
newOptions ++= option

table.split(dbSplitter) match {
case Array(_db, _table) =>
dbName = _db
table = _table
case _ =>
}

case None =>
//dbName = ""
}


newOptions.filter(f => f._1 == "collection").map { f =>
if (f._2.contains(dbSplitter)) {
f._2.split(dbSplitter, 2) match {
case Array(_db, _table) =>
dbName = _db
table = _table
case Array(_db) =>
dbName = _db
}
} else {
dbName = f._2
}
}

SourceInfo(shortFormat, dbName, table)
}

override def register(): Unit = {
DataSourceRegistry.register(MLSQLDataSourceKey(shortFormat, MLSQLSparkDataSourceType), this)
DataSourceRegistry.register(MLSQLDataSourceKey(shortFormat, MLSQLSparkDataSourceType), this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class MLSQLLoadStrSpec extends BasicSparkOperation with SpecFunctions with Basic
withBatchContext(setupBatchContext(batchParams, "classpath:///test/empty.json")) { runtime: SparkRuntime =>
implicit val spark = runtime.sparkSession
ShellCommand.exec("rm -rf /tmp/user/hive/warehouse/carbon_jack")

var ssel = createSSEL

ScriptSQLExec.parse(
Expand Down

0 comments on commit ba4661a

Please sign in to comment.