diff --git a/external/mlsql-autosuggest/src/main/java/tech/mlsql/autosuggest/app/RDSchema.scala b/external/mlsql-autosuggest/src/main/java/tech/mlsql/autosuggest/app/RDSchema.scala index 7f3cb9c52..2d5651325 100644 --- a/external/mlsql-autosuggest/src/main/java/tech/mlsql/autosuggest/app/RDSchema.scala +++ b/external/mlsql-autosuggest/src/main/java/tech/mlsql/autosuggest/app/RDSchema.scala @@ -1,7 +1,6 @@ package tech.mlsql.autosuggest.app import java.sql.{JDBCType, SQLException} - import com.alibaba.druid.sql.SQLUtils import com.alibaba.druid.sql.ast.SQLDataType import com.alibaba.druid.sql.ast.statement.{SQLColumnDefinition, SQLCreateTableStatement} @@ -9,6 +8,7 @@ import com.alibaba.druid.sql.repository.SchemaRepository import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.types.DecimalType.{MAX_PRECISION, MAX_SCALE} import org.apache.spark.sql.types._ +import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ import scala.math.min @@ -18,6 +18,8 @@ import scala.math.min */ class RDSchema(dbType: String) { + private val log: Logger = LoggerFactory.getLogger(this.getClass) + private val repository = new SchemaRepository(dbType) def createTable(sql: String) = { @@ -36,7 +38,11 @@ class RDSchema(dbType: String) { try { f.toString.toInt } catch { - case e: Exception => 0 + case e: Exception => + if (log.isDebugEnabled()) { + log.debug("extractfieldSize Error: {}", e) + } + 0 } }.headOption diff --git a/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/AutoSuggestContextTest.scala b/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/AutoSuggestContextTest.scala index 592c3606b..107a2072f 100644 --- a/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/AutoSuggestContextTest.scala +++ b/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/AutoSuggestContextTest.scala @@ -5,13 +5,14 @@ import org.scalatest.BeforeAndAfterEach import tech.mlsql.autosuggest.meta.{MetaProvider, MetaTable, MetaTableColumn, MetaTableKey} import tech.mlsql.autosuggest.statement.{LexerUtils, SuggestItem} import tech.mlsql.autosuggest.{DataType, SpecialTableConst, TokenPos, TokenPosType} +import tech.mlsql.common.utils.log.Logging import scala.collection.JavaConverters._ /** * 2/6/2020 WilliamZhu(allwefantasy@gmail.com) */ -class AutoSuggestContextTest extends BaseTest with BeforeAndAfterEach { +class AutoSuggestContextTest extends BaseTest with BeforeAndAfterEach with Logging { override def afterEach(): Unit = { // context.statements.clear() } @@ -44,8 +45,9 @@ class AutoSuggestContextTest extends BaseTest with BeforeAndAfterEach { def printStatements(items: List[List[Token]]) = { items.foreach { item => - println(item.map(_.getText).mkString(" ")) - println() + if (log.isInfoEnabled()) { + log.info(item.map(_.getText).mkString(" ")) + } } } @@ -84,7 +86,7 @@ class AutoSuggestContextTest extends BaseTest with BeforeAndAfterEach { |SELECT CAST(25.65 AS int) from jack; |""".stripMargin).tokens.asScala.toList - wow.foreach(item => println(s"${item.getText} ${item.getType}")) + wow.foreach(item => log.info(item.getText + " " + item.getType)) } test("load/select 4/10 select ke[cursor] from") { @@ -150,7 +152,9 @@ class AutoSuggestContextTest extends BaseTest with BeforeAndAfterEach { | select from table3 |""".stripMargin).tokens.asScala.toList val items = context.build(wow).suggest(5, 8) - println(items) + if (log.isInfoEnabled()) { + log.info(items.toString()) + } } @@ -178,7 +182,9 @@ class AutoSuggestContextTest extends BaseTest with BeforeAndAfterEach { | select sum() from table3 |""".stripMargin val items = context.buildFromString(sql).suggest(5, 12) - println(items) + if (log.isInfoEnabled()) { + log.info(items.toString()) + } } test("table alias with temp table") { diff --git a/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/LoadSuggesterTest.scala b/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/LoadSuggesterTest.scala index c774ed674..99e7aab16 100644 --- a/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/LoadSuggesterTest.scala +++ b/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/LoadSuggesterTest.scala @@ -2,13 +2,14 @@ package com.intigua.antlr4.autosuggest import tech.mlsql.autosuggest.statement.LoadSuggester import tech.mlsql.autosuggest.{TokenPos, TokenPosType} +import tech.mlsql.common.utils.log.Logging import scala.collection.JavaConverters._ /** * 2/6/2020 WilliamZhu(allwefantasy@gmail.com) */ -class LoadSuggesterTest extends BaseTest { +class LoadSuggesterTest extends BaseTest with Logging { test("load hiv[cursor]") { val wow = context.lexer.tokenizeNonDefaultChannel( """ @@ -26,7 +27,13 @@ class LoadSuggesterTest extends BaseTest { | load |""".stripMargin).tokens.asScala.toList val loadSuggester = new LoadSuggester(context, wow, TokenPos(0, TokenPosType.NEXT, 0)).suggest() - println(loadSuggester) + if (log.isInfoEnabled()) { + var loadSuggesterToString = ""; + loadSuggester.foreach(i => + loadSuggesterToString += ("name: " + i.name + ",") + ) + log.info(loadSuggesterToString) + } assert(loadSuggester.size > 1) } @@ -37,7 +44,9 @@ class LoadSuggesterTest extends BaseTest { | load csv.`` where |""".stripMargin).tokens.asScala.toList val result = new LoadSuggester(context, wow, TokenPos(4, TokenPosType.NEXT, 0)).suggest() - println(result) + if (log.isInfoEnabled()) { + log.info(result.toString()) + } } diff --git a/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/SelectSuggesterTest.scala b/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/SelectSuggesterTest.scala index 2db9364d7..93ab64115 100644 --- a/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/SelectSuggesterTest.scala +++ b/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/SelectSuggesterTest.scala @@ -3,13 +3,14 @@ package com.intigua.antlr4.autosuggest import tech.mlsql.autosuggest.meta.{MetaProvider, MetaTable, MetaTableColumn, MetaTableKey} import tech.mlsql.autosuggest.statement.SelectSuggester import tech.mlsql.autosuggest.{DataType, MLSQLSQLFunction, TokenPos, TokenPosType} +import tech.mlsql.common.utils.log.Logging import scala.collection.JavaConverters._ /** * 2/6/2020 WilliamZhu(allwefantasy@gmail.com) */ -class SelectSuggesterTest extends BaseTest { +class SelectSuggesterTest extends BaseTest with Logging { def buildMetaProvider = { context.setUserDefinedMetaProvider(new MetaProvider { @@ -185,10 +186,14 @@ class SelectSuggesterTest extends BaseTest { |""".stripMargin).tokens.asScala.toList val suggester = new SelectSuggester(context, wow2, TokenPos(0, TokenPosType.NEXT, 0)) - println(suggester.sqlAST.printAsStr(suggester.tokens, 0)) + if (log.isInfoEnabled()) { + log.info(suggester.sqlAST.printAsStr(suggester.tokens, 0)) + } suggester.table_info.foreach { case (level, item) => - println(level + ":") - println(item.map(_._1).toList) + if (log.isInfoEnabled()) { + log.info(level + ":") + log.info(item.map(_._1).toList.toString()) + } } assert(suggester.suggest().map(_.name) == List(("b"), ("keywords"), ("search_num"), ("rank"))) @@ -203,8 +208,10 @@ class SelectSuggesterTest extends BaseTest { val tokens = getMLSQLTokens(sql) val suggester = new SelectSuggester(context, tokens, TokenPos(0, TokenPosType.NEXT, 0)) - println("=======") - println(suggester.suggest()) + if (log.isInfoEnabled()) { + log.info("=======") + log.info(suggester.suggest().toString()) + } assert(suggester.suggest().head.name=="b") } @@ -250,7 +257,9 @@ class SelectSuggesterTest extends BaseTest { |""".stripMargin).tokens.asScala.toList val suggester = new SelectSuggester(context, wow, TokenPos(1, TokenPosType.CURRENT, 3)) - println(suggester.suggest()) + if (log.isInfoEnabled()) { + log.info(suggester.suggest().toString()) + } assert(suggester.suggest().map(_.name) == List(("split"))) } diff --git a/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/TableStructureTest.scala b/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/TableStructureTest.scala index 25afe9ca1..0481d0716 100644 --- a/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/TableStructureTest.scala +++ b/external/mlsql-autosuggest/src/test/java/com/intigua/antlr4/autosuggest/TableStructureTest.scala @@ -23,7 +23,9 @@ class TableStructureTest extends BaseTest with Logging { val tokens = getMLSQLTokens(sql) val suggester = new SelectSuggester(context, tokens, TokenPos(0, TokenPosType.NEXT, 0)) - println(suggester.sqlAST) + if (log.isInfoEnabled()) { + log.info(suggester.toString) + } } test("s2") { @@ -82,7 +84,9 @@ class TableStructureTest extends BaseTest with Logging { val suggester = new SelectSuggester(context, wow, TokenPos(3, TokenPosType.CURRENT, 1)) val root = suggester.sqlAST root.visitDown(0) { case (ast, level) => - println(s"${ast.name(suggester.tokens)} ${ast.output(suggester.tokens)}") + if (log.isInfoEnabled()) { + log.info(ast.name(suggester.tokens) + " " + ast.output(suggester.tokens)) + } } assert(suggester.suggest().map(_.name) == List("keywords")) @@ -99,7 +103,9 @@ class TableStructureTest extends BaseTest with Logging { val suggester = new SelectSuggester(context, wow, TokenPos(3, TokenPosType.CURRENT, 1)) val root = suggester.sqlAST root.visitDown(0) { case (ast, level) => - println(s"${ast.name(suggester.tokens)} ${ast.output(suggester.tokens)}") + if (log.isInfoEnabled()) { + log.info(ast.name(suggester.tokens) + " " + ast.output(suggester.tokens)) + } } assert(suggester.suggest().map(_.name) == List("keywords")) diff --git a/external/mlsql-sql-profiler-30/src/main/java/tech/mlsql/plugins/sql/profiler/ProfilerCommand.scala b/external/mlsql-sql-profiler-30/src/main/java/tech/mlsql/plugins/sql/profiler/ProfilerCommand.scala index 0e2892e59..60fe1e2db 100644 --- a/external/mlsql-sql-profiler-30/src/main/java/tech/mlsql/plugins/sql/profiler/ProfilerCommand.scala +++ b/external/mlsql-sql-profiler-30/src/main/java/tech/mlsql/plugins/sql/profiler/ProfilerCommand.scala @@ -6,6 +6,7 @@ import streaming.dsl.ScriptSQLExec import streaming.dsl.auth._ import streaming.dsl.mmlib.SQLAlg import streaming.dsl.mmlib.algs.param.WowParams +import tech.mlsql.common.utils.log.Logging import tech.mlsql.common.utils.serder.json.JSONTool import tech.mlsql.dsl.auth.ETAuth import tech.mlsql.dsl.auth.dsl.mmlib.ETMethod.ETMethod @@ -13,7 +14,7 @@ import tech.mlsql.dsl.auth.dsl.mmlib.ETMethod.ETMethod /** * 27/3/2020 WilliamZhu(allwefantasy@gmail.com) */ -class ProfilerCommand(override val uid: String) extends SQLAlg with ETAuth with WowParams { +class ProfilerCommand(override val uid: String) extends SQLAlg with ETAuth with WowParams with Logging { def this() = this(WowParams.randomUID()) override def train(df: DataFrame, path: String, params: Map[String, String]): DataFrame = { @@ -48,7 +49,9 @@ class ProfilerCommand(override val uid: String) extends SQLAlg with ETAuth with val explain = MLSQLUtils.createExplainCommand(df.queryExecution.logical, extended = extended) val items = df.sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect(). map(_.getString(0)).mkString("\n") - println(items) + if (log.isInfoEnabled()) { + log.info(items) + } df.sparkSession.createDataset[Plan](Seq(Plan("doc", items))).toDF() } diff --git a/external/mlsql-sql-profiler/src/main/java/tech/mlsql/indexer/impl/RestIndexerMeta.scala b/external/mlsql-sql-profiler/src/main/java/tech/mlsql/indexer/impl/RestIndexerMeta.scala index 9279b14d2..bc28a1836 100644 --- a/external/mlsql-sql-profiler/src/main/java/tech/mlsql/indexer/impl/RestIndexerMeta.scala +++ b/external/mlsql-sql-profiler/src/main/java/tech/mlsql/indexer/impl/RestIndexerMeta.scala @@ -1,15 +1,15 @@ package tech.mlsql.indexer.impl import java.nio.charset.Charset - import org.apache.http.client.fluent.{Form, Request} +import tech.mlsql.common.utils.log.Logging import tech.mlsql.common.utils.serder.json.JSONTool import tech.mlsql.indexer.{MLSQLIndexerMeta, MlsqlIndexerItem, MlsqlOriTable} /** * 21/12/2020 WilliamZhu(allwefantasy@gmail.com) */ -class RestIndexerMeta(url: String, token: String,timeout:Int=2000) extends MLSQLIndexerMeta { +class RestIndexerMeta(url: String, token: String,timeout:Int=2000) extends MLSQLIndexerMeta with Logging { override def fetchIndexers(tableNames: List[MlsqlOriTable], options: Map[String, String]): Map[MlsqlOriTable, List[MlsqlIndexerItem]] = { val form = Form.form() form.add("data", JSONTool.toJsonStr(tableNames)) @@ -23,7 +23,9 @@ class RestIndexerMeta(url: String, token: String,timeout:Int=2000) extends MLSQL JSONTool.parseJson[Map[MlsqlOriTable, List[MlsqlIndexerItem]]](value) } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled) { + log.debug("fetchIndexers Error: {}", e) + } Map() } diff --git a/external/mlsql-sql-profiler/src/main/java/tech/mlsql/plugins/sql/profiler/ProfilerCommand.scala b/external/mlsql-sql-profiler/src/main/java/tech/mlsql/plugins/sql/profiler/ProfilerCommand.scala index 0e2892e59..60fe1e2db 100644 --- a/external/mlsql-sql-profiler/src/main/java/tech/mlsql/plugins/sql/profiler/ProfilerCommand.scala +++ b/external/mlsql-sql-profiler/src/main/java/tech/mlsql/plugins/sql/profiler/ProfilerCommand.scala @@ -6,6 +6,7 @@ import streaming.dsl.ScriptSQLExec import streaming.dsl.auth._ import streaming.dsl.mmlib.SQLAlg import streaming.dsl.mmlib.algs.param.WowParams +import tech.mlsql.common.utils.log.Logging import tech.mlsql.common.utils.serder.json.JSONTool import tech.mlsql.dsl.auth.ETAuth import tech.mlsql.dsl.auth.dsl.mmlib.ETMethod.ETMethod @@ -13,7 +14,7 @@ import tech.mlsql.dsl.auth.dsl.mmlib.ETMethod.ETMethod /** * 27/3/2020 WilliamZhu(allwefantasy@gmail.com) */ -class ProfilerCommand(override val uid: String) extends SQLAlg with ETAuth with WowParams { +class ProfilerCommand(override val uid: String) extends SQLAlg with ETAuth with WowParams with Logging { def this() = this(WowParams.randomUID()) override def train(df: DataFrame, path: String, params: Map[String, String]): DataFrame = { @@ -48,7 +49,9 @@ class ProfilerCommand(override val uid: String) extends SQLAlg with ETAuth with val explain = MLSQLUtils.createExplainCommand(df.queryExecution.logical, extended = extended) val items = df.sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect(). map(_.getString(0)).mkString("\n") - println(items) + if (log.isInfoEnabled()) { + log.info(items) + } df.sparkSession.createDataset[Plan](Seq(Plan("doc", items))).toDF() } diff --git a/external/mlsql-sql-profiler/src/test/java/tech/mlsql/test/tool/ByteUtilTest.scala b/external/mlsql-sql-profiler/src/test/java/tech/mlsql/test/tool/ByteUtilTest.scala index ca03aa63c..d467408e3 100644 --- a/external/mlsql-sql-profiler/src/test/java/tech/mlsql/test/tool/ByteUtilTest.scala +++ b/external/mlsql-sql-profiler/src/test/java/tech/mlsql/test/tool/ByteUtilTest.scala @@ -1,12 +1,13 @@ package tech.mlsql.test.tool import org.scalatest.{BeforeAndAfterAll, FunSuite} +import tech.mlsql.common.utils.log.Logging import tech.mlsql.tool.ZOrderingBytesUtil /** * 31/12/2020 WilliamZhu(allwefantasy@gmail.com) */ -class ByteUtilTest extends FunSuite with BeforeAndAfterAll { +class ByteUtilTest extends FunSuite with BeforeAndAfterAll with Logging{ test("toString") { //只需要支持证正数 val a = 3L @@ -18,7 +19,9 @@ class ByteUtilTest extends FunSuite with BeforeAndAfterAll { } test("func"){ - println(ZOrderingBytesUtil.toString(ZOrderingBytesUtil.toBytes(-1))) + if (log.isDebugEnabled()) { + log.info(ZOrderingBytesUtil.toString(ZOrderingBytesUtil.toBytes(-1))) + } } test("intTo8Byte") { diff --git a/streamingpro-cluster/src/main/java/tech/mlsql/cluster/service/elastic_resource/AllocateService.scala b/streamingpro-cluster/src/main/java/tech/mlsql/cluster/service/elastic_resource/AllocateService.scala index f53e566bf..b00235c08 100644 --- a/streamingpro-cluster/src/main/java/tech/mlsql/cluster/service/elastic_resource/AllocateService.scala +++ b/streamingpro-cluster/src/main/java/tech/mlsql/cluster/service/elastic_resource/AllocateService.scala @@ -18,9 +18,9 @@ package tech.mlsql.cluster.service.elastic_resource -import java.util.concurrent.{Executors, TimeUnit} -import java.util.logging.Logger +import org.slf4j.LoggerFactory +import java.util.concurrent.{Executors, TimeUnit} import tech.mlsql.cluster.ProxyApplication import tech.mlsql.cluster.model.ElasticMonitor import tech.mlsql.common.utils.log.Logging @@ -31,7 +31,6 @@ import scala.collection.JavaConverters._ * 2018-12-05 WilliamZhu(allwefantasy@gmail.com) */ object AllocateService extends Logging { - val logger = Logger.getLogger("AllocateService") private[this] val _executor = Executors.newFixedThreadPool(100) private[this] val scheduler = Executors.newSingleThreadScheduledExecutor() @@ -65,7 +64,9 @@ object AllocateService extends Logging { } } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("run Error: {}", e) + } //catch all ,so the scheduler will not been stopped by exception } } diff --git a/streamingpro-cluster/src/test/java/tech/mlsql/cluster/test/DeploySpec.scala b/streamingpro-cluster/src/test/java/tech/mlsql/cluster/test/DeploySpec.scala index 6c16bd4e3..4dc63b66f 100644 --- a/streamingpro-cluster/src/test/java/tech/mlsql/cluster/test/DeploySpec.scala +++ b/streamingpro-cluster/src/test/java/tech/mlsql/cluster/test/DeploySpec.scala @@ -19,20 +19,20 @@ package tech.mlsql.cluster.test import java.util - import net.csdn.ServiceFramwork import net.csdn.bootstrap.Bootstrap import org.scalatest.{FlatSpec, Matchers} import tech.mlsql.cluster.model.{Backend, EcsResourcePool, ElasticMonitor} import tech.mlsql.cluster.service.elastic_resource.local.LocalDeployInstance import tech.mlsql.cluster.service.elastic_resource.{BaseResource, JobNumAwareAllocateStrategy, LocalResourceAllocation, LocalResourceDeAllocation} +import tech.mlsql.common.utils.log.Logging import scala.collection.JavaConverters._ /** * 2018-12-07 WilliamZhu(allwefantasy@gmail.com) */ -class DeploySpec extends FlatSpec with Matchers { +class DeploySpec extends FlatSpec with Matchers with Logging{ def mockServer = { try { @@ -117,7 +117,9 @@ class DeploySpec extends FlatSpec with Matchers { planRes = allocate.plan(monitor.getTag.split(",").toSeq, monitor) assume(planRes.isDefined) - print(planRes) + if (log.isInfoEnabled()) { + log.info(planRes.toString) + } planRes.get match { case LocalResourceAllocation(tags) => assume(tags == "jack") @@ -186,7 +188,9 @@ class DeploySpec extends FlatSpec with Matchers { planRes = allocate.plan(monitor.getTag.split(",").toSeq, monitor) assume(planRes.isDefined) - print(planRes) + if (log.isInfoEnabled()) { + log.info(planRes.toString) + } planRes.get match { case LocalResourceDeAllocation(tags) => assume(tags == "jack") diff --git a/streamingpro-commons/src/main/java/streaming/common/zk/Path.java b/streamingpro-commons/src/main/java/streaming/common/zk/Path.java index 1a9e75c26..3443d8d53 100644 --- a/streamingpro-commons/src/main/java/streaming/common/zk/Path.java +++ b/streamingpro-commons/src/main/java/streaming/common/zk/Path.java @@ -18,6 +18,8 @@ package streaming.common.zk; +import org.apache.log4j.Logger; + import java.util.ArrayList; import java.util.List; @@ -26,6 +28,8 @@ */ public class Path { + private static Logger logger = Logger.getLogger(Path.class); + private List detail ; private String split = "/"; @@ -116,9 +120,15 @@ public static void main(String[] args) { Path p = new Path("/video/_index/pid"); - System.out.println(p.getPathString()); - System.out.println(p.getParentPath().getPathString()); - System.out.println(p.getPathString()); + if (logger.isInfoEnabled()) { + logger.info( + String.format( + "Path: %s \n ParentPath: %s \n ", + p.getPathString(), + p.getParentPath().getPathString() + ) + ); + } } } \ No newline at end of file diff --git a/streamingpro-commons/src/main/java/streaming/common/zk/ZKClient.java b/streamingpro-commons/src/main/java/streaming/common/zk/ZKClient.java index f6d6fe9f2..560e2788c 100644 --- a/streamingpro-commons/src/main/java/streaming/common/zk/ZKClient.java +++ b/streamingpro-commons/src/main/java/streaming/common/zk/ZKClient.java @@ -129,8 +129,16 @@ public boolean addListenerAndInit(String confName, final ConfCallBack confCallBa confCallBack.setConf(zkConfUtil.getConf(data)); } catch (Exception e) { - - e.printStackTrace(); + if (logger.isDebugEnabled()) { + logger.debug( + String.format( + "addListenerAndInit Error!!!\n confName: %s \n confCallBack: %s \n exception: %s", + confName, + confCallBack, + e + ) + ); + } } @@ -170,8 +178,9 @@ public void main(String[] args) throws InterruptedException { @Override public void setConf(String conf) { - System.out.println(conf); - logger.info("test" + conf); + if (logger.isInfoEnabled()) { + logger.info(String.format("conf: %s", conf)); + } } }); diff --git a/streamingpro-commons/src/main/java/streaming/common/zk/ZKConfUtil.java b/streamingpro-commons/src/main/java/streaming/common/zk/ZKConfUtil.java index 540fed51a..cd5a86efa 100644 --- a/streamingpro-commons/src/main/java/streaming/common/zk/ZKConfUtil.java +++ b/streamingpro-commons/src/main/java/streaming/common/zk/ZKConfUtil.java @@ -122,7 +122,9 @@ public void checkAndAddPath(Path path) { } else { checkAndAddPath(path.getParentPath()); - System.out.println("add path " + p); + if (logger.isInfoEnabled()) { + logger.info("add path {}", p); + } client.createPersistent(p); } } @@ -220,8 +222,9 @@ public void main(String[] args) throws InterruptedException { String dir = "/lock/127.0.0.1"; if (!ifExist(dir)) { - System.out.println("create dir"); - + if (logger.isInfoEnabled()) { + logger.info("create dir"); + } if (!ifExist("/lock")) { @@ -232,11 +235,14 @@ public void main(String[] args) throws InterruptedException { } String path = client.createEphemeralSequential(dir, null); - System.out.println("path " + path); + if (logger.isInfoEnabled()) { + logger.info(String.format("path: %s", path)); + } for (String l : client.getChildren(dir)) { - - System.out.println(l); + if (logger.isInfoEnabled()) { + logger.info(l); + } } Thread.sleep(50000); diff --git a/streamingpro-commons/src/main/java/tech/mlsql/tool/ByzerConfigCLI.java b/streamingpro-commons/src/main/java/tech/mlsql/tool/ByzerConfigCLI.java index 7857d8b8f..f9ab10ae2 100644 --- a/streamingpro-commons/src/main/java/tech/mlsql/tool/ByzerConfigCLI.java +++ b/streamingpro-commons/src/main/java/tech/mlsql/tool/ByzerConfigCLI.java @@ -19,6 +19,7 @@ package tech.mlsql.tool; import com.google.common.collect.Maps; +import org.apache.log4j.Logger; import java.util.Map; import java.util.Objects; @@ -26,6 +27,7 @@ public class ByzerConfigCLI { + private static Logger log = Logger.getLogger(ByzerConfigCLI.class); private final static String SPARK_CONF_TEMP = "--conf %s=%s"; private final static String BYZER_CONF_TEMP = "-%s %s"; @@ -41,8 +43,9 @@ public static void execute(String[] args) { boolean needDec = false; if (args.length != 1) { if (args.length < 2 || !Objects.equals(EncryptUtil.DEC_FLAG, args[1])) { - System.out.println("Usage: ByzerConfigCLI conf_name"); - System.out.println("Example: ByzerConfigCLI byzer.server.mode"); + if (log.isInfoEnabled()) { + log.info("Usage: ByzerConfigCLI conf_name\nExample: ByzerConfigCLI byzer.server.mode"); + } Unsafe.systemExit(1); } else { needDec = true; @@ -58,7 +61,7 @@ public static void execute(String[] args) { String entryKey = (String) entry.getKey(); if (entryKey.startsWith("streaming") || entryKey.startsWith("spark.mlsql")) { String prop = String.format(BYZER_CONF_TEMP, entryKey, entry.getValue()); - System.out.println(prop); + log.info(prop); } } } else if (key.equals("-spark")) { @@ -67,7 +70,7 @@ public static void execute(String[] args) { String entryKey = (String) entry.getKey(); if (entryKey.startsWith("spark") && !entryKey.startsWith("spark.mlsql")) { String prop = String.format(SPARK_CONF_TEMP, entryKey, entry.getValue()); - System.out.println(prop); + log.info(prop); } } } else if ("-args".equals(key)) { @@ -77,7 +80,9 @@ public static void execute(String[] args) { String entryKey = (String) entry.getKey(); prop.append(String.format(ARGS_CONF_TEMP, entryKey, entry.getValue())); } - System.out.println(prop); + if (log.isInfoEnabled()) { + log.info(prop); + } } else if (!key.endsWith(".")) { String value = config.getProperty(key); @@ -85,14 +90,16 @@ else if (!key.endsWith(".")) { value = ""; } if (needDec && EncryptUtil.isEncrypted(value)) { - System.out.println(EncryptUtil.decryptPassInKylin(value)); + log.info(EncryptUtil.decryptPassInKylin(value)); } else { - System.out.println(value.trim()); + log.info(value.trim()); } } else { Map props = getPropertiesByPrefix(config, key); for (Map.Entry prop : props.entrySet()) { - System.out.println(prop.getKey() + "=" + prop.getValue().trim()); + if (log.isInfoEnabled()) { + log.info(prop.getKey() + "=" + prop.getValue().trim()); + } } } } diff --git a/streamingpro-commons/src/main/java/tech/mlsql/tool/EncryptUtil.java b/streamingpro-commons/src/main/java/tech/mlsql/tool/EncryptUtil.java index 9ce885054..096b03271 100644 --- a/streamingpro-commons/src/main/java/tech/mlsql/tool/EncryptUtil.java +++ b/streamingpro-commons/src/main/java/tech/mlsql/tool/EncryptUtil.java @@ -20,6 +20,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; import javax.crypto.Cipher; import javax.crypto.spec.SecretKeySpec; @@ -32,6 +33,8 @@ public class EncryptUtil { private static final byte[] key = { 0x74, 0x68, 0x69, 0x73, 0x49, 0x73, 0x41, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x4b, 0x65, 0x79 }; + private static Logger log = Logger.getLogger(EncryptUtil.class); + public static final String ENC_PREFIX = "ENC('"; public static final String ENC_SUBFIX = "')"; @@ -72,7 +75,9 @@ public static String decryptPassInKylin(String value) { } private static void printUsage() { - System.out.println("Usage: java org.apache.kylin.common.util "); + if (log.isInfoEnabled()) { + log.info("Usage: java org.apache.kylin.common.util "); + } } public static String getDecryptedValue(String value) { @@ -90,6 +95,8 @@ public static void main(String[] args) { String passwordTxt = args[0]; // for encrypt password like LDAP password - System.out.println(EncryptUtil.encrypt(passwordTxt)); + if (log.isInfoEnabled()) { + log.info(EncryptUtil.encrypt(passwordTxt)); + } } } diff --git a/streamingpro-core/src/main/java/org/apache/spark/ps/cluster/PSExecutorBackend.scala b/streamingpro-core/src/main/java/org/apache/spark/ps/cluster/PSExecutorBackend.scala index 4ea6fa023..4962a9df3 100644 --- a/streamingpro-core/src/main/java/org/apache/spark/ps/cluster/PSExecutorBackend.scala +++ b/streamingpro-core/src/main/java/org/apache/spark/ps/cluster/PSExecutorBackend.scala @@ -10,6 +10,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.util.ThreadUtils +import org.slf4j.{Logger, LoggerFactory} import tech.mlsql.common.utils.exception.ExceptionTool import tech.mlsql.nativelib.runtime.MLSQLNativeRuntime import tech.mlsql.python.BasicCondaEnvManager @@ -101,6 +102,8 @@ class PSExecutorBackend(env: SparkEnv, override val rpcEnv: RpcEnv, psDriverUrl: object PSExecutorBackend { + private val log: Logger = LoggerFactory.getLogger(PSExecutorBackend.getClass) + def isLocalMaster(conf: SparkConf): Boolean = { // val master = MLSQLConf.MLSQL_MASTER.readFrom(configReader).getOrElse("") val master = conf.get("spark.master", "") @@ -161,12 +164,16 @@ object PSExecutorBackend { userClassPath += new URL(value) argv = tail case item::value::tail if item.startsWith("--")=> - System.out.println(s"ignore options: ${item} ${value}--") + if (log.isInfoEnabled()) { + log.info(s"ignore options: ${item} ${value}--") + } argv = tail case Nil => case tail => - System.err.println(s"Unrecognized options: ${tail.mkString(" ")}") + if (log.isInfoEnabled()) { + log.info(s"Unrecognized options: ${tail.mkString(" ")}") + } } } if (psDriverUrl.contains("@")) { diff --git a/streamingpro-core/src/main/java/streaming/core/strategy/platform/SparkRuntime.scala b/streamingpro-core/src/main/java/streaming/core/strategy/platform/SparkRuntime.scala index 35207c6a7..f33d55c70 100644 --- a/streamingpro-core/src/main/java/streaming/core/strategy/platform/SparkRuntime.scala +++ b/streamingpro-core/src/main/java/streaming/core/strategy/platform/SparkRuntime.scala @@ -241,7 +241,9 @@ class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with Platfo } } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("registerUDF Error: {}", e) + } } } } diff --git a/streamingpro-core/src/main/java/tech/mlsql/app/App.scala b/streamingpro-core/src/main/java/tech/mlsql/app/App.scala index 46d195fcd..2274e1820 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/app/App.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/app/App.scala @@ -1,6 +1,7 @@ package tech.mlsql.app import org.apache.spark.sql.DataFrame +import tech.mlsql.common.utils.log.Logging trait App { @@ -11,20 +12,20 @@ trait CustomController { def run(params: Map[String, String]): String } -trait RequestCleaner { +trait RequestCleaner extends Logging { def run(): Unit final def call() = { try { run() } catch { - case e: Exception => e.printStackTrace() + case e: Exception => log.debug("RequestCleaner Error: {}", e) } } } -trait ExceptionRender { +trait ExceptionRender extends Logging { def format(e: Exception): String def is_match(e: Exception): Boolean @@ -38,7 +39,8 @@ trait ExceptionRender { } } catch { - case e1: Exception => e1.printStackTrace() + case e1: Exception => + log.debug("ExceptionRender Error: {}", e1) ExceptionResult(e, None) } diff --git a/streamingpro-core/src/main/java/tech/mlsql/core/version/MLSQLVersion.scala b/streamingpro-core/src/main/java/tech/mlsql/core/version/MLSQLVersion.scala index 394f8b18f..ac4ea7d09 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/core/version/MLSQLVersion.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/core/version/MLSQLVersion.scala @@ -1,5 +1,7 @@ package tech.mlsql.core.version +import org.slf4j.{Logger, LoggerFactory} + import java.io.{IOException, InputStream} import java.util.Properties @@ -8,6 +10,8 @@ import java.util.Properties */ object MLSQLVersion { + private val log: Logger = LoggerFactory.getLogger(MLSQLVersion.getClass) + private val versionFile: String = "mlsql-version-info.properties" private val info = load() @@ -44,7 +48,9 @@ object MLSQLVersion { is.close() catch { case e: IOException => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("load Error: {}", e) + } } } info diff --git a/streamingpro-core/src/main/java/tech/mlsql/crawler/HttpClientCrawler.scala b/streamingpro-core/src/main/java/tech/mlsql/crawler/HttpClientCrawler.scala index 473b4fc23..a94b39d66 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/crawler/HttpClientCrawler.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/crawler/HttpClientCrawler.scala @@ -3,7 +3,6 @@ package tech.mlsql.crawler import java.nio.charset.Charset import java.security.cert.X509Certificate import java.util - import org.apache.http.client.entity.UrlEncodedFormEntity import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost} import org.apache.http.client.utils.URIBuilder @@ -15,6 +14,7 @@ import org.apache.http.protocol.HttpContext import org.apache.http.ssl.{SSLContextBuilder, TrustStrategy} import org.apache.http.util.EntityUtils import org.apache.http.{HttpHost, HttpRequest} +import org.slf4j.{Logger, LoggerFactory} import tech.mlsql.crawler.beans.WebPage /** @@ -22,6 +22,8 @@ import tech.mlsql.crawler.beans.WebPage */ object HttpClientCrawler { + private val log: Logger = LoggerFactory.getLogger(HttpClientCrawler.getClass) + private def client(useProxy: Boolean) = { val routePlanner = new HttpRoutePlanner() { @@ -70,7 +72,15 @@ object HttpClientCrawler { } else null } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug( + String.format( + "request Error: %s\n url: %s", + e, + url + ) + ) + } null } finally { if (response != null) { @@ -107,7 +117,21 @@ object HttpClientCrawler { } catch { case e: Exception => - e.printStackTrace() + var paramToString = "[" + params.keys.foreach(i => paramToString += String.format("{ %s, %s }", i, params(i))) + paramToString += "]" + + if (log.isDebugEnabled()) { + log.debug( + String.format( + "requestByMethod Error: %s\n url: %s\n method: %s\n params: %s", + e, + url, + method, + paramToString + ) + ) + } null } finally { if (response != null) { @@ -131,7 +155,15 @@ object HttpClientCrawler { } else null } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug( + String.format( + "requestImage Error: %s\n url: %s", + e, + url + ) + ) + } null } finally { if (response != null) { @@ -144,6 +176,8 @@ object HttpClientCrawler { def main(args: Array[String]): Unit = { //println(request("https://www.baidu.com")) - println(request("http://www.javaroots.com/2017/02/how-to-use-apache-httpclient-45-https.html")) + if (log.isInfoEnabled()) { + log.info(request("http://www.javaroots.com/2017/02/how-to-use-apache-httpclient-45-https.html").toString) + } } } diff --git a/streamingpro-core/src/main/java/tech/mlsql/python/BasicCondaEnvManager.scala b/streamingpro-core/src/main/java/tech/mlsql/python/BasicCondaEnvManager.scala index 3b8babc05..9186e04bf 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/python/BasicCondaEnvManager.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/python/BasicCondaEnvManager.scala @@ -76,7 +76,9 @@ class BasicCondaEnvManager(user: String, groupId: String, executorHostAndPort: S try { FileUtils.write(new File(tempFile), getCondaYamlContent(condaEnvPath), Charset.forName("utf-8")) val cr = ShellCommand.execCmdV2WithProcessing((s) => { - println(s"Creating conda env:${projectEnvName}: " + s) + if (log.isInfoEnabled()) { + log.info(s"Creating conda env:${projectEnvName}: " + s) + } WriteLog.write(List(s"Creating conda env in ${executorHostAndPort}: ${s}").iterator, Map("PY_EXECUTE_USER" -> user, "groupId" -> groupId)) }, condaPath, "env", "create", "-n", projectEnvName, "--file", tempFile) diff --git a/streamingpro-core/src/main/java/tech/mlsql/runtime/PluginUtils.scala b/streamingpro-core/src/main/java/tech/mlsql/runtime/PluginUtils.scala index 898ca13ee..dc298b69a 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/runtime/PluginUtils.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/runtime/PluginUtils.scala @@ -170,14 +170,18 @@ object PluginUtils extends Logging with WowLog { copyBytes(inputStream, dos, 4 * 1024 * 1024) } catch { case ex: Exception => - println("file save exception") + if (log.isInfoEnabled()) { + log.info("file save exception") + } } finally { if (null != dos) { try { dos.close() } catch { case ex: Exception => - println("close exception") + if (log.isInfoEnabled()) { + log.info("close exception") + } } dos.close() } diff --git a/streamingpro-core/src/main/java/tech/mlsql/runtime/plugins/exception_render/DefaultExceptionRender.scala b/streamingpro-core/src/main/java/tech/mlsql/runtime/plugins/exception_render/DefaultExceptionRender.scala index 6c3ac7ad5..46a6fac3b 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/runtime/plugins/exception_render/DefaultExceptionRender.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/runtime/plugins/exception_render/DefaultExceptionRender.scala @@ -2,15 +2,18 @@ package tech.mlsql.runtime.plugins.exception_render import streaming.log.WowLog import tech.mlsql.app.ExceptionRender +import tech.mlsql.common.utils.log.Logging import scala.collection.mutable.ArrayBuffer -class DefaultExceptionRender extends ExceptionRender with WowLog { +class DefaultExceptionRender extends ExceptionRender with WowLog with Logging { override def format(e: Exception): String = { - e.printStackTrace() val msgBuffer = ArrayBuffer[String]() format_full_exception(msgBuffer, e) + if (log.isDebugEnabled()) { + log.debug(e.getMessage + "\n" + msgBuffer.mkString("\n")) + } e.getMessage + "\n" + msgBuffer.mkString("\n") } diff --git a/streamingpro-core/src/main/java/tech/mlsql/scheduler/client/SchedulerTaskStore.scala b/streamingpro-core/src/main/java/tech/mlsql/scheduler/client/SchedulerTaskStore.scala index f2bf44c6e..28b9021e1 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/scheduler/client/SchedulerTaskStore.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/scheduler/client/SchedulerTaskStore.scala @@ -111,7 +111,9 @@ class SchedulerTaskStore(spark: SparkSession, _consoleUrl: String, _consoleToken } catch { case e: Exception => //design how to handle the exception - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("execute Error: {}", e) + } } } }) diff --git a/streamingpro-core/src/main/java/tech/mlsql/store/DeltaLakeDBStore.scala b/streamingpro-core/src/main/java/tech/mlsql/store/DeltaLakeDBStore.scala index f63551172..24bba59b8 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/store/DeltaLakeDBStore.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/store/DeltaLakeDBStore.scala @@ -1,6 +1,7 @@ package tech.mlsql.store import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import tech.mlsql.common.utils.log.Logging import tech.mlsql.datalake.DataLake import tech.mlsql.scheduler.client.SchedulerUtils.DELTA_FORMAT import tech.mlsql.store.DictType.DictType @@ -8,7 +9,7 @@ import tech.mlsql.store.DictType.DictType /** * 16/3/2020 WilliamZhu(allwefantasy@gmail.com) */ -class DeltaLakeDBStore extends DBStore { +class DeltaLakeDBStore extends DBStore with Logging { private val configTableName = "__mlsql__.config" override def saveConfig(spark: SparkSession, appPrefix: String, name: String, value: String, dictType: DictType): Unit = this.synchronized { @@ -47,7 +48,9 @@ class DeltaLakeDBStore extends DBStore { writer.mode(SaveMode.Append).save(finalPath) } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("saveTable Error: {}", e) + } } } diff --git a/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala b/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala index 988aa62ba..2d1f44768 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/tool/HDFSOperatorV2.scala @@ -5,6 +5,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.io.IOUtils import org.apache.spark.MLSQLSparkUtils +import org.slf4j.{Logger, LoggerFactory} import tech.mlsql.common.utils.Md5 import tech.mlsql.common.utils.path.PathFun @@ -16,6 +17,7 @@ import scala.collection.mutable.ArrayBuffer */ object HDFSOperatorV2 { + private val log: Logger = LoggerFactory.getLogger(HDFSOperatorV2.getClass) def hadoopConfiguration: Configuration = { if (MLSQLSparkUtils.sparkHadoopUtil != null) { MLSQLSparkUtils.sparkHadoopUtil.conf @@ -100,14 +102,18 @@ object HDFSOperatorV2 { dos.write(bytes) } catch { case ex: Exception => - println("file save exception") + if (log.isInfoEnabled()) { + log.info("file save exception") + } } finally { if (null != dos) { try { dos.close() } catch { case ex: Exception => - println("close exception") + if (log.isInfoEnabled()) { + log.info("close exception") + } } dos.close() } @@ -128,15 +134,18 @@ object HDFSOperatorV2 { IOUtils.copyBytes(inputStream, dos, 4 * 1024 * 1024) } catch { case ex: Exception => - ex.printStackTrace() - println("file save exception") + if (log.isDebugEnabled()) { + log.debug("file save exception: {}", ex) + } } finally { if (null != dos) { try { dos.close() } catch { case ex: Exception => - println("close exception") + if (log.isDebugEnabled()) { + log.debug("close exception: {}", ex) + } } dos.close() } @@ -165,14 +174,18 @@ object HDFSOperatorV2 { } } catch { case ex: Exception => - println("file save exception") + if (log.isInfoEnabled()) { + log.info("file save exception") + } } finally { if (null != dos) { try { dos.close() } catch { case ex: Exception => - println("close exception") + if (log.isInfoEnabled()) { + log.info("close exception") + } } dos.close() } diff --git a/streamingpro-core/src/test/scala/tech/mlsql/runtime/FunctionsTest.scala b/streamingpro-core/src/test/scala/tech/mlsql/runtime/FunctionsTest.scala index eb1f9368e..4d936a091 100644 --- a/streamingpro-core/src/test/scala/tech/mlsql/runtime/FunctionsTest.scala +++ b/streamingpro-core/src/test/scala/tech/mlsql/runtime/FunctionsTest.scala @@ -30,6 +30,7 @@ import org.mockito.Mockito.{mock, mockStatic, when} import org.mockito.{ArgumentMatchers, MockedStatic} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should +import tech.mlsql.common.utils.log.Logging import tech.mlsql.crawler.RestUtils import tech.mlsql.tool.CipherUtils @@ -40,7 +41,7 @@ import scala.language.reflectiveCalls * 22/01/2022 hellozepp(lisheng.zhanglin@163.com) */ -class FunctionsTest extends AnyFlatSpec with should.Matchers { +class FunctionsTest extends AnyFlatSpec with should.Matchers with Logging { /** * This function mocks a Request. For any request url, get & post, bodyForm & bodyString, @@ -86,8 +87,9 @@ class FunctionsTest extends AnyFlatSpec with should.Matchers { reqStatic => { val (status, content) = RestUtils.rest_request_string("http://www.byzer.org/home", "get", params = Map("foo" -> "bar", "foo1" -> "bar"), headers = Map("Content-Type" -> "application/x-www-form-urlencoded"), Map()) - - println(s"status:$status, content:$content") + if (log.isInfoEnabled()) { + log.info(s"status:$status, content:$content") + } assertEquals(200, status) assertEquals("{\"code\":\"200\",\"content\":\"ok\"}", content) // verify url concat is legal. @@ -102,7 +104,10 @@ class FunctionsTest extends AnyFlatSpec with should.Matchers { params = Map("foo" -> "bar", "foo1" -> "bar"), Map(), config = Map("socket-timeout" -> "a")) throw new MLSQLException("The configuration is illegal, but no exception is displayed!") } catch { - case e: Exception => println("success! e:" + e.getMessage) + case e: Exception => + if (log.isInfoEnabled()) { + log.info("success! e:" + e.getMessage) + } } // verify config set illegal of connect-timeout. try { @@ -110,7 +115,10 @@ class FunctionsTest extends AnyFlatSpec with should.Matchers { params = Map("foo" -> "bar", "foo1" -> "bar"), Map(), config = Map("connect-timeout" -> "a")) throw new MLSQLException("The configuration is illegal, but no exception is displayed!") } catch { - case e: Exception => println("success! e:" + e.getMessage) + case e: Exception => + if (log.isInfoEnabled()) { + log.info("success! e:" + e.getMessage) + } } } } @@ -124,7 +132,9 @@ class FunctionsTest extends AnyFlatSpec with should.Matchers { val (status, content) = RestUtils.rest_request_string("http://www.byzer.org/home", "post", params = Map("body" -> "{\"a\":1,\"b\":2}"), headers = Map("Content-Type" -> "application/json"), Map()) - println(s"status:$status, content:$content") + if (log.isInfoEnabled()) { + log.info(s"status:$status, content:$content") + } assertEquals(200, status ) assertEquals(content, "{\"code\":\"200\",\"content\":\"ok\"}") // verify url concat is legal. @@ -137,7 +147,9 @@ class FunctionsTest extends AnyFlatSpec with should.Matchers { { val (status, content) = RestUtils.rest_request_string("http://www.byzer.org/home", "post", params = Map("foo" -> "bar", "foo1" -> "bar"), headers = Map("Content-Type" -> "application/x-www-form-urlencoded"), Map()) - println(s"status:$status, content:$content") + if (log.isInfoEnabled()) { + log.info(s"status:$status, content:$content") + } } } diff --git a/streamingpro-core/src/test/scala/tech/mlsql/runtime/ScannerAndParserTest.scala b/streamingpro-core/src/test/scala/tech/mlsql/runtime/ScannerAndParserTest.scala index a20157781..4953716f2 100644 --- a/streamingpro-core/src/test/scala/tech/mlsql/runtime/ScannerAndParserTest.scala +++ b/streamingpro-core/src/test/scala/tech/mlsql/runtime/ScannerAndParserTest.scala @@ -2,6 +2,7 @@ package tech.mlsql.runtime import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should +import tech.mlsql.common.utils.log.Logging import tech.mlsql.lang.cmd.compile.internal.gc._ import scala.collection.mutable.ArrayBuffer @@ -9,7 +10,7 @@ import scala.collection.mutable.ArrayBuffer /** * 2/10/2020 WilliamZhu(allwefantasy@gmail.com) */ -class ScannerAndParserTest extends AnyFlatSpec with should.Matchers { +class ScannerAndParserTest extends AnyFlatSpec with should.Matchers with Logging { def want(items: List[Token], index: Int, t: Scanner.TokenType, str: String) = { assert(items(index).t == t && items(index).text == str) @@ -223,7 +224,9 @@ class ScannerAndParserTest extends AnyFlatSpec with should.Matchers { val str = "select :{:jack} as :name as b;" val textTemplate = new TextTemplate(Map("jack" -> "wow"), str) val tokens = textTemplate.parse - println(tokens.map(_.chars.mkString(""))) + if (log.isInfoEnabled()) { + log.info(tokens.map(_.chars.mkString("")).toString()) + } } diff --git a/streamingpro-it/src/test/scala/tech/mlsql/it/ByzerScriptTestSuite.scala b/streamingpro-it/src/test/scala/tech/mlsql/it/ByzerScriptTestSuite.scala index ec4efa09c..118503754 100644 --- a/streamingpro-it/src/test/scala/tech/mlsql/it/ByzerScriptTestSuite.scala +++ b/streamingpro-it/src/test/scala/tech/mlsql/it/ByzerScriptTestSuite.scala @@ -29,7 +29,9 @@ class ByzerScriptTestSuite extends LocalBaseTestSuite with Logging { } override def afterAll(): Unit = { - println("The integration test is complete, and a graceful shutdown is performed...") + if (log.isInfoEnabled()) { + log.info("The integration test is complete, and a graceful shutdown is performed...") + } if (cluster != null) { cluster.stop() cluster = null @@ -37,7 +39,9 @@ class ByzerScriptTestSuite extends LocalBaseTestSuite with Logging { } override def beforeAll(): Unit = { - println("Initialize configuration before integration test execution...") + if (log.isInfoEnabled()) { + log.info("Initialize configuration before integration test execution...") + } setupWorkingDirectory() setupRunParams() copyDataToUserHome(user) @@ -98,7 +102,9 @@ class ByzerScriptTestSuite extends LocalBaseTestSuite with Logging { if ("3.0".equals(version)) { before() - println("Current spark version is 3.0, step to javaContainer test...") + if (log.isInfoEnabled()) { + log.info("Current spark version is 3.0, step to javaContainer test...") + } val cluster: ByzerCluster = setupCluster() val hadoopContainer = cluster.hadoopContainer val byzerLangContainer = cluster.byzerLangContainer @@ -106,17 +112,19 @@ class ByzerScriptTestSuite extends LocalBaseTestSuite with Logging { url = s"http://${javaContainer.getHost}:${javaContainer.getMappedPort(9003)}/run/script" test("javaContainer") { - // 9870, 8088, 19888, 10002, 8042 - println("Current hadoop ui port(8088) is :" + hadoopContainer.container.getMappedPort(8088)) - println("Current containerlogs ui port(8042) is :" + hadoopContainer.container.getMappedPort(8042)) - println("Current hdfs ui port is :" + hadoopContainer.container.getMappedPort(9870)) - println("Current jobhistory ui port is :" + hadoopContainer.container.getMappedPort(19888)) - println("Current spark xdebug(10002) port is :" + hadoopContainer.container.getMappedPort(10002)) - // 9003, 4040, 8265, 10002 - println("Current byzer ui port is :" + javaContainer.getMappedPort(9003)) - println("Current spark ui port is :" + javaContainer.getMappedPort(4040)) - println("Current ray dashboard port is :" + javaContainer.getMappedPort(8265)) - println("Current ray head port is :" + javaContainer.getMappedPort(10002)) + if (log.isInfoEnabled()) { + // 9870, 8088, 19888, 10002, 8042 + log.info("Current hadoop ui port(8088) is :" + hadoopContainer.container.getMappedPort(8088)) + log.info("Current containerlogs ui port(8042) is :" + hadoopContainer.container.getMappedPort(8042)) + log.info("Current hdfs ui port is :" + hadoopContainer.container.getMappedPort(9870)) + log.info("Current jobhistory ui port is :" + hadoopContainer.container.getMappedPort(19888)) + log.info("Current spark xdebug(10002) port is :" + hadoopContainer.container.getMappedPort(10002)) + // 9003, 4040, 8265, 10002 + log.info("Current byzer ui port is :" + javaContainer.getMappedPort(9003)) + log.info("Current spark ui port is :" + javaContainer.getMappedPort(4040)) + log.info("Current ray dashboard port is :" + javaContainer.getMappedPort(8265)) + log.info("Current ray head port is :" + javaContainer.getMappedPort(10002)) + } runScript(url, user, "select 1 as a,'jack' as b as bbc;") } diff --git a/streamingpro-it/src/test/scala/tech/mlsql/it/Comparator.scala b/streamingpro-it/src/test/scala/tech/mlsql/it/Comparator.scala index 39ee727af..3440e9b43 100644 --- a/streamingpro-it/src/test/scala/tech/mlsql/it/Comparator.scala +++ b/streamingpro-it/src/test/scala/tech/mlsql/it/Comparator.scala @@ -1,9 +1,9 @@ package tech.mlsql.it import java.io.{FileReader, PrintWriter, StringWriter} - import breeze.io.CSVReader import org.apache.commons.io.FileUtils +import tech.mlsql.common.utils.log.Logging import scala.compat.Platform.EOL @@ -23,12 +23,14 @@ trait Comparator { } -class DefaultComparator extends Comparator { +class DefaultComparator extends Comparator with Logging{ def getExceptionStackAsString(exception: Exception): String = { val sw = new StringWriter() val pw = new PrintWriter(sw) - exception.printStackTrace(pw) + if (log.isDebugEnabled()) { + log.debug(String.format("Error: %s\n PrintWriter: %s", exception, pw.toString)) + } sw.toString } diff --git a/streamingpro-it/src/test/scala/tech/mlsql/it/LocalBaseTestSuite.scala b/streamingpro-it/src/test/scala/tech/mlsql/it/LocalBaseTestSuite.scala index 0ea63ad3f..869e36b52 100644 --- a/streamingpro-it/src/test/scala/tech/mlsql/it/LocalBaseTestSuite.scala +++ b/streamingpro-it/src/test/scala/tech/mlsql/it/LocalBaseTestSuite.scala @@ -1,17 +1,17 @@ package tech.mlsql.it import java.io.File - import org.apache.commons.io.FileUtils import org.apache.spark.streaming.SparkOperationUtil import org.scalatest.{BeforeAndAfterAll, FunSuite} import serviceframework.dispatcher.StrategyDispatcher import streaming.core.StreamingApp import streaming.core.strategy.platform.{PlatformManager, SparkRuntime} +import tech.mlsql.common.utils.log.Logging import tech.mlsql.job.JobManager -trait LocalBaseTestSuite extends FunSuite with SparkOperationUtil with BeforeAndAfterAll { +trait LocalBaseTestSuite extends FunSuite with SparkOperationUtil with BeforeAndAfterAll with Logging { var runtime: SparkRuntime = _ var runParams: Array[String] = Array() @@ -91,7 +91,9 @@ trait LocalBaseTestSuite extends FunSuite with SparkOperationUtil with BeforeAnd } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("afterAll Error: {}", e) + } } } diff --git a/streamingpro-it/src/test/scala/tech/mlsql/it/TestManager.scala b/streamingpro-it/src/test/scala/tech/mlsql/it/TestManager.scala index 8399e7581..fa8edd830 100644 --- a/streamingpro-it/src/test/scala/tech/mlsql/it/TestManager.scala +++ b/streamingpro-it/src/test/scala/tech/mlsql/it/TestManager.scala @@ -62,7 +62,9 @@ object TestManager extends Logging { def recordError(testCase: TestCase, t: Throwable): Unit = { recordError(testCase, ExceptionUtils.getRootCause(t)) - t.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("recordError Error: {}", t) + } } def accept(testCase: TestCase, result: Seq[Seq[String]], exception: Exception): Unit = { diff --git a/streamingpro-mlsql/src/main/java/org/apache/hadoop/fs/shell/WowCount.java b/streamingpro-mlsql/src/main/java/org/apache/hadoop/fs/shell/WowCount.java index 4c80b6f44..165bcf587 100644 --- a/streamingpro-mlsql/src/main/java/org/apache/hadoop/fs/shell/WowCount.java +++ b/streamingpro-mlsql/src/main/java/org/apache/hadoop/fs/shell/WowCount.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; +import org.apache.log4j.Logger; import java.io.IOException; import java.io.PrintStream; @@ -30,6 +31,8 @@ */ public class WowCount extends WowFsCommand { + + private Logger log = Logger.getLogger(WowCount.class); public WowCount(Configuration conf, String basePath, PrintStream out, PrintStream error) { super(conf, basePath, out, error); } @@ -75,7 +78,9 @@ protected void processOptions(LinkedList args) { @Override protected void processPath(PathData src) throws IOException { ContentSummary summary = src.fs.getContentSummary(src.path); - out.println(summary.toString(showQuotas, isHumanReadable()) + cleanPath(src.toString())); + if (log.isInfoEnabled()) { + log.info(summary.toString(showQuotas, isHumanReadable()) + cleanPath(src.toString())); + } } /** diff --git a/streamingpro-mlsql/src/main/java/org/apache/hadoop/fs/shell/WowDelete.java b/streamingpro-mlsql/src/main/java/org/apache/hadoop/fs/shell/WowDelete.java index 6d1a9babf..21c134cce 100644 --- a/streamingpro-mlsql/src/main/java/org/apache/hadoop/fs/shell/WowDelete.java +++ b/streamingpro-mlsql/src/main/java/org/apache/hadoop/fs/shell/WowDelete.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; +import org.apache.log4j.Logger; import java.io.FileNotFoundException; import java.io.IOException; @@ -52,6 +53,8 @@ public static class Rm extends WowFsCommand { private boolean deleteDirs = false; private boolean ignoreFNF = false; + private Logger log = Logger.getLogger(this.getClass()); + public Rm(Configuration conf, String basePath, PrintStream out, PrintStream error) { super(conf, basePath, out, error); } @@ -102,7 +105,9 @@ protected void processPath(PathData item) throws IOException { if (!item.fs.delete(item.path, deleteDirs)) { throw new PathIOException(item.toString()); } - out.println("Deleted " + cleanPath(item.toString())); + if (log.isInfoEnabled()) { + log.info("Deleted " + cleanPath(item.toString())); + } } private boolean moveToTrash(PathData item) throws IOException { diff --git a/streamingpro-mlsql/src/main/java/org/apache/hadoop/fs/shell/WowLs.java b/streamingpro-mlsql/src/main/java/org/apache/hadoop/fs/shell/WowLs.java index 14b343b14..70624ffd6 100644 --- a/streamingpro-mlsql/src/main/java/org/apache/hadoop/fs/shell/WowLs.java +++ b/streamingpro-mlsql/src/main/java/org/apache/hadoop/fs/shell/WowLs.java @@ -4,6 +4,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.Logger; import java.io.IOException; import java.io.PrintStream; @@ -45,6 +46,8 @@ public class WowLs extends WowFsCommand { protected boolean format = false; + private Logger log = Logger.getLogger(WowLs.class); + public WowLs(Configuration conf, String basePath, PrintStream out, PrintStream error) { super(conf, basePath, out, error); } @@ -82,8 +85,8 @@ protected void processPathArgument(PathData item) throws IOException { @Override protected void processPaths(PathData parent, PathData... items) throws IOException { - if (!format && parent != null && !isRecursive() && items.length != 0) { - out.println("Found " + items.length + " items"); + if (!format && parent != null && !isRecursive() && items.length != 0 && log.isInfoEnabled()) { + log.info("Found " + items.length + " items"); } adjustColumnWidths(items); super.processPaths(parent, items); @@ -120,7 +123,9 @@ protected void processPath(PathData item) throws IOException { lineMap.put("name", itemName); line = new Gson().toJson(lineMap); } - out.println(line); + if (log.isInfoEnabled()) { + log.info(line); + } } /** diff --git a/streamingpro-mlsql/src/main/java/org/apache/spark/streaming/SparkOperationUtil.scala b/streamingpro-mlsql/src/main/java/org/apache/spark/streaming/SparkOperationUtil.scala index 78fb8907b..c5d98d2ac 100644 --- a/streamingpro-mlsql/src/main/java/org/apache/spark/streaming/SparkOperationUtil.scala +++ b/streamingpro-mlsql/src/main/java/org/apache/spark/streaming/SparkOperationUtil.scala @@ -9,12 +9,13 @@ import org.apache.spark.sql.{DataFrame, Row} import serviceframework.dispatcher.{Compositor, StrategyDispatcher} import streaming.core.strategy.platform.{PlatformManager, SparkRuntime} import streaming.dsl.{MLSQLExecuteContext, ScriptSQLExec, ScriptSQLExecListener} +import tech.mlsql.common.utils.log.Logging import tech.mlsql.common.utils.shell.command.ParamsUtil import tech.mlsql.ets.ScriptRunner import tech.mlsql.job.{JobManager, MLSQLJobInfo, MLSQLJobProgress, MLSQLJobType} import tech.mlsql.runtime.MLSQLPlatformLifecycle -trait SparkOperationUtil { +trait SparkOperationUtil extends Logging { def waitJobStarted(groupId: String, timeoutSec: Long = 10) = { var count = timeoutSec @@ -178,7 +179,9 @@ trait SparkOperationUtil { } } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("withContext Error: {}", e) + } } } } @@ -197,7 +200,9 @@ trait SparkOperationUtil { } } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("withBatchContext Error: {}", e) + } } } } @@ -244,7 +249,9 @@ trait SparkOperationUtil { FileUtils.deleteDirectory(new File("./metastore_db")) } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("appWithBatchContext Error: {}", e) + } } } } diff --git a/streamingpro-mlsql/src/main/java/streaming/core/compositor/spark/output/AlgorithmOutputCompositor.scala b/streamingpro-mlsql/src/main/java/streaming/core/compositor/spark/output/AlgorithmOutputCompositor.scala index 1a2174e7a..2e7013cf1 100644 --- a/streamingpro-mlsql/src/main/java/streaming/core/compositor/spark/output/AlgorithmOutputCompositor.scala +++ b/streamingpro-mlsql/src/main/java/streaming/core/compositor/spark/output/AlgorithmOutputCompositor.scala @@ -19,20 +19,20 @@ package streaming.core.compositor.spark.output import java.util - import org.apache.log4j.Logger import org.apache.spark.ml.BaseAlgorithmEstimator import org.apache.spark.ml.tuning.TrainValidationSplitModel import org.apache.spark.sql.DataFrame import serviceframework.dispatcher.{Processor, Strategy} import streaming.core.compositor.spark.transformation.{BaseAlgorithmCompositor, SQLCompositor} +import tech.mlsql.common.utils.log.Logging import scala.collection.JavaConversions._ /** * 7/27/16 WilliamZhu(allwefantasy@gmail.com) */ -class AlgorithmOutputCompositor[T] extends BaseAlgorithmCompositor[T] { +class AlgorithmOutputCompositor[T] extends BaseAlgorithmCompositor[T] with Logging { val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName) @@ -72,7 +72,10 @@ class AlgorithmOutputCompositor[T] extends BaseAlgorithmCompositor[T] { } catch { - case e: Exception => e.printStackTrace() + case e: Exception => + if (log.isDebugEnabled()) { + log.debug("result Error: {}", e) + } } @@ -90,7 +93,10 @@ class AlgorithmOutputCompositor[T] extends BaseAlgorithmCompositor[T] { val model = bae.fit model.getClass.getMethod("save", classOf[String]).invoke(model, path) } catch { - case e: Exception => e.printStackTrace() + case e: Exception => + if(log.isDebugEnabled()) { + log.debug("result Error: {}", e) + } } diff --git a/streamingpro-mlsql/src/main/java/streaming/core/compositor/spark/ss/output/MultiSQLOutputCompositor.scala b/streamingpro-mlsql/src/main/java/streaming/core/compositor/spark/ss/output/MultiSQLOutputCompositor.scala index fc3681d51..3b1088bfd 100644 --- a/streamingpro-mlsql/src/main/java/streaming/core/compositor/spark/ss/output/MultiSQLOutputCompositor.scala +++ b/streamingpro-mlsql/src/main/java/streaming/core/compositor/spark/ss/output/MultiSQLOutputCompositor.scala @@ -20,19 +20,19 @@ package streaming.core.compositor.spark.ss.output import java.util import java.util.concurrent.TimeUnit - import org.apache.log4j.Logger import org.apache.spark.sql.streaming.Trigger import serviceframework.dispatcher.{Compositor, Processor, Strategy} import streaming.core.CompositorHelper import streaming.core.strategy.ParamsValidator +import tech.mlsql.common.utils.log.Logging import scala.collection.JavaConversions._ /** * 5/11/16 WilliamZhu(allwefantasy@gmail.com) */ -class MultiSQLOutputCompositor[T] extends Compositor[T] with CompositorHelper with ParamsValidator { +class MultiSQLOutputCompositor[T] extends Compositor[T] with CompositorHelper with ParamsValidator with Logging { private var _configParams: util.List[util.Map[Any, Any]] = _ val logger = Logger.getLogger(classOf[MultiSQLOutputCompositor[T]].getName) @@ -98,7 +98,10 @@ class MultiSQLOutputCompositor[T] extends Compositor[T] with CompositorHelper wi query.trigger(Trigger.ProcessingTime(_cfg.getOrElse("duration", "10").toInt, TimeUnit.SECONDS)).start() } catch { - case e: Exception => e.printStackTrace() + case e: Exception => + if (log.isDebugEnabled()) { + log.debug("result Error: {}", e) + } } } diff --git a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLDownloadExt.scala b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLDownloadExt.scala index bb850ab52..46c0a2874 100644 --- a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLDownloadExt.scala +++ b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLDownloadExt.scala @@ -135,7 +135,16 @@ class SQLDownloadExt(override val uid: String) extends SQLAlg with DslTool with logInfo(s"Downloaded to ${to}") } catch { - case e: Exception => e.printStackTrace() + case e: Exception => + if (log.isDebugEnabled()) { + log.debug( + String.format( + "urlencode Error: %s\n getUrl: %s", + e, + getUrl + ) + ) + } } finally { if( tarIS != null ) tarIS.close() diff --git a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLJDBCUpdatExt.scala b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLJDBCUpdatExt.scala index 6114075b9..1f97b92d5 100644 --- a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLJDBCUpdatExt.scala +++ b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLJDBCUpdatExt.scala @@ -19,7 +19,6 @@ package streaming.dsl.mmlib.algs import java.sql.{Connection, SQLException} - import org.apache.spark.ml.param.Param import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.mlsql.session.MLSQLException @@ -28,9 +27,10 @@ import streaming.dsl.{ConnectMeta, DBMappingKey, ScriptSQLExec} import org.apache.spark.sql.types._ import streaming.dsl.mmlib.SQLAlg import streaming.dsl.mmlib.algs.param.{BaseParams, WowParams} +import tech.mlsql.common.utils.log.Logging import tech.mlsql.job.JobManager -class SQLJDBCUpdatExt(override val uid: String) extends SQLAlg with WowParams{ +class SQLJDBCUpdatExt(override val uid: String) extends SQLAlg with WowParams with Logging { def this() = this(BaseParams.randomUID()) override def train(df: DataFrame, path: String, params: Map[String, String]):DataFrame={ @@ -139,7 +139,9 @@ class SQLJDBCUpdatExt(override val uid: String) extends SQLAlg with WowParams{ }catch{ case ex: SQLException => { connection.rollback() - ex.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("Error: {}", ex) + } throw new SQLException("Update Exception") } } finally { diff --git a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLPythonAlgBatchPrediction.scala b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLPythonAlgBatchPrediction.scala index 92c1fab98..b8d435cdc 100644 --- a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLPythonAlgBatchPrediction.scala +++ b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLPythonAlgBatchPrediction.scala @@ -23,7 +23,6 @@ import java.nio.charset.Charset import java.nio.file.{Files, Paths} import java.util import java.util.UUID - import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -34,6 +33,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.util.ExternalCommandRunner import streaming.dsl.mmlib.SQLAlg import streaming.dsl.mmlib.algs.SQLPythonFunc._ +import tech.mlsql.common.utils.log.Logging import tech.mlsql.tool.HDFSOperatorV2 import scala.collection.JavaConverters._ @@ -42,7 +42,7 @@ import scala.collection.JavaConverters._ * Created by allwefantasy on 5/2/2018. * This Module support training or predicting with user-defined python script */ -class SQLPythonAlgBatchPrediction extends SQLAlg with Functions { +class SQLPythonAlgBatchPrediction extends SQLAlg with Functions with Logging { override def train(df: DataFrame, wowPath: String, params: Map[String, String]): DataFrame = { val kafkaParam = mapParams("kafkaParam", params) @@ -151,7 +151,9 @@ class SQLPythonAlgBatchPrediction extends SQLAlg with Functions { score = recordUserLog(algIndex, pythonScript, kafkaParam, res) } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("Error: {}", e) + } trainFailFlag = true } @@ -162,7 +164,9 @@ class SQLPythonAlgBatchPrediction extends SQLAlg with Functions { new Path(resultHDFSPath)) } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("Error: {}", e) + } trainFailFlag = true } finally { // delete local model diff --git a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLTokenAnalysis.scala b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLTokenAnalysis.scala index 00f74387e..60db186ae 100644 --- a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLTokenAnalysis.scala +++ b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/SQLTokenAnalysis.scala @@ -22,9 +22,9 @@ import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.{functions => F} import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} import streaming.dsl.mmlib.SQLAlg - import scala.collection.JavaConversions._ import scala.collection.mutable @@ -88,6 +88,8 @@ class SQLTokenAnalysis extends SQLAlg with Functions { } object SQLTokenAnalysis { + private val log: Logger = LoggerFactory.getLogger(SQLTokenAnalysis.getClass) + def parseStr(parser: Any, content: String, params: Map[String, String]) = { val ignoreNature = params.getOrElse("ignoreNature", "true").toBoolean @@ -98,7 +100,9 @@ object SQLTokenAnalysis { parser.getClass.getMethod("parseStr", classOf[String]).invoke(parser, content) } catch { case e: Exception => - println(s"parser invoke error:${content}") + if (log.isDebugEnabled()) { + log.debug(s"parser invoke error:${content}\n exception: ${e}") + } throw e } diff --git a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/feature/DoubleFeature.scala b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/feature/DoubleFeature.scala index b25c372fa..1549f2eaa 100644 --- a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/feature/DoubleFeature.scala +++ b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/feature/DoubleFeature.scala @@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, MLSQLUtils, Row, SaveMode, SparkSession, functions => F} import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.types.{ArrayType, DoubleType} +import org.slf4j.{Logger, LoggerFactory} import streaming.dsl.mmlib.algs.MetaConst._ import streaming.dsl.mmlib.algs.meta.{MinMaxValueMeta, OutlierValueMeta, StandardScalerValueMeta} @@ -38,6 +39,8 @@ import streaming.dsl.mmlib.algs.meta.{MinMaxValueMeta, OutlierValueMeta, Standar */ object DoubleFeature extends BaseFeatureFunctions { + private val log: Logger = LoggerFactory.getLogger(DoubleFeature.getClass) + def killOutlierValue(df: DataFrame, metaPath: String, fields: Seq[String]): DataFrame = { var newDF = df val metas = new ArrayBuffer[OutlierValueMeta]() @@ -97,7 +100,9 @@ object DoubleFeature extends BaseFeatureFunctions { val min = trainParams.getOrElse("min", "0").toDouble val max = trainParams.getOrElse("max", "1").toDouble val scaleRange = max - min - println(s"predict: ${originalRange.mkString(",")} ${minArray.mkString(",")} ${scaleRange} $min") + if (log.isInfoEnabled()) { + log.info(s"predict: ${originalRange.mkString(",")} ${minArray.mkString(",")} ${scaleRange} $min") + } minMaxFunc(originalRange, minArray, scaleRange, min) } @@ -180,7 +185,9 @@ object DoubleFeature extends BaseFeatureFunctions { val max = params.getOrElse("max", "1").toDouble val scaleRange = max - min - println(s"train: ${originalRange.mkString(",")} ${minArray.mkString(",")} ${scaleRange} $min") + if (log.isInfoEnabled()) { + log.info(s"train: ${originalRange.mkString(",")} ${minArray.mkString(",")} ${scaleRange} $min") + } minMaxFunc(originalRange, minArray, scaleRange, min) case "log2" => diff --git a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/feature/StringFeature.scala b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/feature/StringFeature.scala index 49956cb11..82da0b486 100644 --- a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/feature/StringFeature.scala +++ b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/feature/StringFeature.scala @@ -338,7 +338,9 @@ object StringFeature extends BaseFeatureFunctions with Logging with WowLog { if (outputWordAndIndex) { val wordToIndex = HSQLStringIndex.wordToIndex(df.sparkSession, siModel) val res = wordToIndex.toSeq.sortBy(f => f._2).map(f => s"${f._1}:${f._2}").mkString("\n") - println(res) + if (log.isInfoEnabled()) { + log.info(res) + } } val funcMap = si.internal_predict(df.sparkSession, siModel, "wow") val predictSingleWordFunc = funcMap("wow").asInstanceOf[(String) => Int] diff --git a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/python/PythonTrain.scala b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/python/PythonTrain.scala index d409604a8..6212c3c4f 100644 --- a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/python/PythonTrain.scala +++ b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/python/PythonTrain.scala @@ -21,7 +21,6 @@ package streaming.dsl.mmlib.algs.python import java.io.{File, FileWriter} import java.util import java.util.UUID - import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -34,13 +33,14 @@ import streaming.dsl.mmlib.algs.SQLPythonFunc._ import streaming.dsl.mmlib.algs.{Functions, SQLPythonAlg, SQLPythonFunc} import tech.mlsql.common.utils.env.python.BasicCondaEnvManager import tech.mlsql.common.utils.lang.sc.ScalaMethodMacros._ +import tech.mlsql.common.utils.log.Logging import tech.mlsql.common.utils.network.NetUtils import tech.mlsql.common.utils.path.PathFun import tech.mlsql.tool.HDFSOperatorV2 import scala.collection.JavaConverters._ -class PythonTrain extends Functions with Serializable { +class PythonTrain extends Functions with Serializable with Logging{ def train_per_partition(df: DataFrame, path: String, params: Map[String, String]): DataFrame = { val keepVersion = params.getOrElse("keepVersion", "false").toBoolean val keepLocalDirectory = params.getOrElse("keepLocalDirectory", "false").toBoolean @@ -267,7 +267,9 @@ class PythonTrain extends Functions with Serializable { } } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("Error: {}", e) + } trainFailFlag = true } finally { // delete resource @@ -516,7 +518,9 @@ class PythonTrain extends Functions with Serializable { } } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("copy file exception: {}", e) + } trainFailFlag = true } finally { // delete local model diff --git a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/PythonCommand.scala b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/PythonCommand.scala index 6bb8e862a..3138b4a4a 100644 --- a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/PythonCommand.scala +++ b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/PythonCommand.scala @@ -4,7 +4,6 @@ import java.io.{DataInputStream, DataOutputStream} import java.net.Socket import java.util import java.util.concurrent.atomic.AtomicReference - import org.apache.spark.ml.param.Param import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.mlsql.session.MLSQLException @@ -21,6 +20,7 @@ import tech.mlsql.arrow.python.ispark.SparkContextImp import tech.mlsql.arrow.python.runner.{ArrowPythonRunner, ChainedPythonFunctions, PythonConf, PythonFunction} import tech.mlsql.common.utils.distribute.socket.server.{ReportHostAndPort, SocketServerInExecutor, SocketServerSerDer, TempSocketServerInDriver} import tech.mlsql.common.utils.lang.sc.ScalaMethodMacros +import tech.mlsql.common.utils.log.Logging import tech.mlsql.common.utils.net.NetTool import tech.mlsql.common.utils.serder.json.JSONTool import tech.mlsql.ets.python._ @@ -33,7 +33,7 @@ import scala.collection.mutable.ArrayBuffer /** * 2019-08-16 WilliamZhu(allwefantasy@gmail.com) */ -class PythonCommand(override val uid: String) extends SQLAlg with Functions with WowParams { +class PythonCommand(override val uid: String) extends SQLAlg with Functions with WowParams with Logging { def this() = this(BaseParams.randomUID()) final val inputTable: Param[String] = new Param[String](this, "inputTable", " ") @@ -112,7 +112,10 @@ class PythonCommand(override val uid: String) extends SQLAlg with Functions with try { tempServer._server.close() } catch { - case e: Exception => e.printStackTrace() + case e: Exception => + if (log.isDebugEnabled()) { + log.debug("launchPythonServer Error: {}", e) + } } } diff --git a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/hdfs/WowFsShell.java b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/hdfs/WowFsShell.java index 7de0ab0f8..58715dee9 100644 --- a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/hdfs/WowFsShell.java +++ b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/hdfs/WowFsShell.java @@ -29,6 +29,7 @@ import org.apache.hadoop.tools.TableListing; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -56,6 +57,8 @@ public class WowFsShell extends Configured implements Tool { public final PrintStream out = new PrintStream(outS); public final PrintStream error = new PrintStream(errorS); + private Logger log = Logger.getLogger(this.getClass()); + public String getOut() { return _out(outS); } @@ -66,7 +69,9 @@ public String _out(ByteArrayOutputStream wow) { wow.reset(); return temp; } catch (Exception e) { - e.printStackTrace(); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("_out Error!!! \n exception: %s",e)); + } } return null; } @@ -224,35 +229,39 @@ private void printInfo(PrintStream out, String cmd, boolean showHelp) { } } else { // display help or usage for all commands - out.println(usagePrefix); + if (log.isInfoEnabled()) { + log.info(usagePrefix); + } // display list of short usages ArrayList instances = new ArrayList(); for (String name : commandFactory.getNames()) { Command instance = commandFactory.getInstance(name); if (!instance.isDeprecated()) { - out.println("\t[" + instance.getUsage() + "]"); + log.info("\t[" + instance.getUsage() + "]"); instances.add(instance); } } // display long descriptions for each command if (showHelp) { for (Command instance : instances) { - out.println(); printInstanceHelp(out, instance); } } - out.println(); ToolRunner.printGenericCommandUsage(out); } } private void printInstanceUsage(PrintStream out, Command instance) { - out.println(usagePrefix + " " + instance.getUsage()); + if (log.isInfoEnabled()) { + log.info(usagePrefix + " " + instance.getUsage()); + } } private void printInstanceHelp(PrintStream out, Command instance) { - out.println(instance.getUsage() + " :"); + if (log.isInfoEnabled()) { + log.info(instance.getUsage() + " :"); + } TableListing listing = null; final String prefix = " "; for (String line : instance.getDescription().split("\n")) { @@ -270,20 +279,26 @@ private void printInstanceHelp(PrintStream out, Command instance) { // Normal literal description. if (listing != null) { for (String listingLine : listing.toString().split("\n")) { - out.println(prefix + listingLine); + if (log.isInfoEnabled()) { + log.info(prefix + listingLine); + } } listing = null; } for (String descLine : WordUtils.wrap( line, MAX_LINE_WIDTH, "\n", true).split("\n")) { - out.println(prefix + descLine); + if (log.isInfoEnabled()) { + log.info(prefix + descLine); + } } } if (listing != null) { for (String listingLine : listing.toString().split("\n")) { - out.println(prefix + listingLine); + if (log.isInfoEnabled()) { + log.info(prefix + listingLine); + } } } } @@ -316,15 +331,18 @@ public int run(String argv[]) throws Exception { } exitCode = instance.run(Arrays.copyOfRange(argv, 1, argv.length)); } catch (IllegalArgumentException e) { - displayError(cmd, e.getLocalizedMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("run Error: \n cmd: %s \n exception: %s", cmd, e)); + } if (instance != null) { printInstanceUsage(error, instance); } } catch (Exception e) { // instance.run catches IOE, so something is REALLY wrong if here - LOG.debug("Error", e); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("run Error: \n cmd: %s \n exception: %s", cmd, e)); + } displayError(cmd, "Fatal internal error"); - e.printStackTrace(error); } } return exitCode; @@ -332,12 +350,14 @@ public int run(String argv[]) throws Exception { private void displayError(String cmd, String message) { for (String line : message.split("\n")) { - error.println(cmd + ": " + line); + if (log.isDebugEnabled()) { + log.debug(cmd + ": " + line); + } if (cmd.charAt(0) != '-') { Command instance = null; instance = commandFactory.getInstance("-" + cmd); if (instance != null) { - error.println("Did you mean -" + cmd + "? This command " + + log.debug("Did you mean -" + cmd + "? This command " + "begins with a dash."); } } diff --git a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/ray/CollectServerInDriver.scala b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/ray/CollectServerInDriver.scala index 8b63af914..4d2570edb 100644 --- a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/ray/CollectServerInDriver.scala +++ b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/ray/CollectServerInDriver.scala @@ -45,7 +45,9 @@ class CollectServerInDriver[T](context: AtomicReference[ArrayBuffer[ReportHostAn val dOut = new DataOutputStream(socket.getOutputStream) client.readRequest(dIn) match { case rha: ReportHostAndPort => - println(rha) + if (log.isInfoEnabled()) ( + log.info(rha.toString) + ) context.get() += rha client.sendRequest(dOut, rha) } diff --git a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/tensorflow/DistributedTensorflow.scala b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/tensorflow/DistributedTensorflow.scala index 043b51e30..be418e9b5 100644 --- a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/tensorflow/DistributedTensorflow.scala +++ b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/tensorflow/DistributedTensorflow.scala @@ -2,7 +2,6 @@ package tech.mlsql.ets.tensorflow import java.io.File import java.util.concurrent.atomic.AtomicReference - import org.apache.commons.io.FileUtils import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.mlsql.session.MLSQLException @@ -20,12 +19,13 @@ import tech.mlsql.arrow.python.PythonWorkerFactory import tech.mlsql.arrow.python.runner.{PythonConf, PythonProjectRunner} import tech.mlsql.common.utils.cluster.ml._ import tech.mlsql.common.utils.lang.sc.ScalaMethodMacros +import tech.mlsql.common.utils.log.Logging import tech.mlsql.common.utils.network.NetUtils import tech.mlsql.ets.ml.cluster._ import tech.mlsql.log.WriteLog import tech.mlsql.tool.HDFSOperatorV2 -class DistributedTensorflow(override val uid: String) extends SQLAlg with SQLPythonAlgParams with Functions { +class DistributedTensorflow(override val uid: String) extends SQLAlg with SQLPythonAlgParams with Functions with Logging { def this() = this(BaseParams.randomUID()) override def train(df: DataFrame, path: String, _params: Map[String, String]): DataFrame = { @@ -258,7 +258,7 @@ class DistributedTensorflow(override val uid: String) extends SQLAlg with SQLPyt val modelHDFSPath = SQLPythonFunc.getAlgModelPath(path, keepVersion) + "/" + algIndex try { - //模型保存到hdfs上 + // 模型保存到hdfs上 if (!keepVersion) { HDFSOperatorV2.deleteDir(modelHDFSPath) } @@ -267,7 +267,9 @@ class DistributedTensorflow(override val uid: String) extends SQLAlg with SQLPyt } } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("Error: {}", e) + } } finally { // delete local model FileUtils.deleteDirectory(new File(tempModelLocalPath)) diff --git a/streamingpro-mlsql/src/main/java/tech/mlsql/tool/TarfileUtil.java b/streamingpro-mlsql/src/main/java/tech/mlsql/tool/TarfileUtil.java index 8ee19dab2..4d04c2207 100644 --- a/streamingpro-mlsql/src/main/java/tech/mlsql/tool/TarfileUtil.java +++ b/streamingpro-mlsql/src/main/java/tech/mlsql/tool/TarfileUtil.java @@ -87,7 +87,15 @@ public static int createTarFileStream(OutputStream output, String pathStr) throw } else return 400; } catch (Exception e) { - e.printStackTrace(); + if (logger.isDebugEnabled()) { + logger.debug( + String.format( + "createTarFileStream Error!!!\n pathStr: %s \n exception: %s", + pathStr, + e + ) + ); + } return 500; } diff --git a/streamingpro-mlsql/src/test/scala/org/apache/spark/sql/jdbc/UpsertBuilderTest.scala b/streamingpro-mlsql/src/test/scala/org/apache/spark/sql/jdbc/UpsertBuilderTest.scala index 1ce32a108..85c5815c3 100644 --- a/streamingpro-mlsql/src/test/scala/org/apache/spark/sql/jdbc/UpsertBuilderTest.scala +++ b/streamingpro-mlsql/src/test/scala/org/apache/spark/sql/jdbc/UpsertBuilderTest.scala @@ -2,15 +2,18 @@ package org.apache.spark.sql.jdbc import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.scalatest.FunSuite +import tech.mlsql.common.utils.log.Logging -class UpsertBuilderTest extends FunSuite { +class UpsertBuilderTest extends FunSuite with Logging { val idField = Seq(StructField("c1", StringType, nullable = false)) val schema = StructType( Seq( StructField("c1", StringType, nullable = false), StructField("c2", IntegerType, nullable = true)) ) test("generating oracle merge into statement and schema") { val (stmt, upsertSchema) = OracleUpsertBuilder.generateStatement( "test_table", idField, schema) - println( stmt) + if (log.isInfoEnabled()) { + log.info(stmt) + } assert( upsertSchema.fields.length == 4, "There should be 4 fields in schema") assert( stmt.startsWith("MERGE INTO test_table")) } @@ -18,7 +21,9 @@ class UpsertBuilderTest extends FunSuite { test("generating mysql insert on duplicate statement and schema") { val dialect = JdbcDialects.get("jdbc:mysql://127.0.0.1:3306") val (stmt, upsertSchema) = MysqlUpsertBuilder.generateStatement("table_1", dialect, idField, schema) - println(stmt) + if (log.isInfoEnabled()) { + log.info(stmt) + } assert( stmt.startsWith("insert into table_1")) assert( upsertSchema.length == 3 ) } diff --git a/streamingpro-mlsql/src/test/scala/streaming/core/SpecFunctions.scala b/streamingpro-mlsql/src/test/scala/streaming/core/SpecFunctions.scala index da7c25320..5134514be 100644 --- a/streamingpro-mlsql/src/test/scala/streaming/core/SpecFunctions.scala +++ b/streamingpro-mlsql/src/test/scala/streaming/core/SpecFunctions.scala @@ -20,7 +20,6 @@ package streaming.core import java.io.{File, FileNotFoundException} import java.sql.{DriverManager, Statement} - import net.csdn.ServiceFramwork import net.csdn.bootstrap.Bootstrap import org.apache.commons.io.FileUtils @@ -29,12 +28,13 @@ import org.apache.http.client.fluent.{Form, Request} import org.apache.http.util.EntityUtils import org.apache.spark.sql.SparkSession import streaming.dsl.{MLSQLExecuteContext, ScriptSQLExec, ScriptSQLExecListener} +import tech.mlsql.common.utils.log.Logging import tech.mlsql.job.{JobManager, MLSQLJobInfo, MLSQLJobProgress, MLSQLJobType} /** * Created by allwefantasy on 28/4/2018. */ -trait SpecFunctions { +trait SpecFunctions extends Logging{ def password = "mlsql" @@ -164,7 +164,7 @@ trait SpecFunctions { FileUtils.forceDelete(new File(file)) } catch { - case ex: FileNotFoundException => println(ex) + case ex: FileNotFoundException => log.debug("delDir Error: {}", ex) } } diff --git a/streamingpro-mlsql/src/test/scala/streaming/test/Test.scala b/streamingpro-mlsql/src/test/scala/streaming/test/Test.scala index e5f0a7c72..d639a9d61 100644 --- a/streamingpro-mlsql/src/test/scala/streaming/test/Test.scala +++ b/streamingpro-mlsql/src/test/scala/streaming/test/Test.scala @@ -3,9 +3,9 @@ package streaming.test import java.util.UUID import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, Future} - import org.apache.http.client.fluent.{Form, Request} import org.apache.spark.internal.Logging +import org.slf4j.{Logger, LoggerFactory} import streaming.dsl.DBMappingKey import tech.mlsql.common.utils.lang.sc.ScalaReflect @@ -107,7 +107,9 @@ object ProfileUtils extends Logging { }) }) val t = b.map(task => task.get()) - println("meanTime " + mean(removeOuterlier(t))) + if (log.isInfoEnabled()) { + log.info("meanTime " + mean(removeOuterlier(t))) + } executorService.shutdownNow() } @@ -135,8 +137,12 @@ object ProfileUtils extends Logging { } object Wow { + + private val log: Logger = LoggerFactory.getLogger(Wow.getClass) def main(args: Array[String]): Unit = { val dbmapping = ScalaReflect.fromObjectStr("streaming.dsl.ConnectMeta").field("dbMapping").invoke().asInstanceOf[ConcurrentHashMap[DBMappingKey, Map[String, String]]] - println(dbmapping) + if (log.isInfoEnabled()) { + log.info(dbmapping.toString) + } } } \ No newline at end of file diff --git a/streamingpro-mlsql/src/test/scala/streaming/test/processing/TextModuleSpec.scala b/streamingpro-mlsql/src/test/scala/streaming/test/processing/TextModuleSpec.scala index cbfae9ab2..912cf212b 100644 --- a/streamingpro-mlsql/src/test/scala/streaming/test/processing/TextModuleSpec.scala +++ b/streamingpro-mlsql/src/test/scala/streaming/test/processing/TextModuleSpec.scala @@ -66,7 +66,9 @@ class TextModuleSpec extends BasicSparkOperation with SpecFunctions with BasicML sq = createSSEL ScriptSQLExec.parse(loadSQLScriptStr("token-extract"), sq) val res = spark.sql("select * from tb").toJSON.collect().mkString("\n") - println(res) + if (log.isInfoEnabled()) { + log.info(res) + } import scala.collection.JavaConversions._ assume(JSONObject.fromObject(res).getJSONArray("keywords"). filter(f => f.asInstanceOf[String]. diff --git a/streamingpro-mlsql/src/test/scala/streaming/test/processing/TextSpec.scala b/streamingpro-mlsql/src/test/scala/streaming/test/processing/TextSpec.scala index b9b519dc6..b7a660592 100644 --- a/streamingpro-mlsql/src/test/scala/streaming/test/processing/TextSpec.scala +++ b/streamingpro-mlsql/src/test/scala/streaming/test/processing/TextSpec.scala @@ -92,7 +92,9 @@ class TextSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLCon v(3) != 0 } assume(res2.size == 2) - println(newDF.toJSON.collect().mkString("\n")) + if (log.isInfoEnabled()) { + log.info(newDF.toJSON.collect().mkString("\n")) + } newDF = StringFeature.tfidf(df, "/tmp/tfidf/mapping", "", "content", "", null, 100000.0, Seq(), null, true) res = newDF.collect() @@ -174,7 +176,9 @@ class TextSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLCon val df = spark.createDataFrame(dataRDD, StructType(Seq(StructField("content", StringType)))) val newDF = StringFeature.word2vec(df, "/tmp/word2vec/mapping", "", "", "content", null, "", null) - println(newDF.toJSON.collect().mkString("\n")) + if (log.isInfoEnabled()) { + log.info(newDF.toJSON.collect().mkString("\n")) + } }) @@ -280,7 +284,7 @@ class TextSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLCon // we should make sure train vector and predict vector the same val trainVector = newSession.sql("select * from parquet.`/tmp/william/tmp/tfidfinplace/data`").toJSON.collect() trainVector.foreach { - f => println(f) + f => log.info(f) } val predictVector = newSession.sql("select jack(content) as content from orginal_text_corpus").toJSON.collect() predictVector.foreach { f => @@ -463,8 +467,8 @@ class TextSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLCon // we should make sure train vector and predict vector the same val trainVector = newSession.sql("select * from parquet.`/tmp/william/tmp/scaler/data`").toJSON.collect() val predictVector = newSession.sql("select jack(array(a,b))[0] a,jack(array(a,b))[1] b, c from orginal_text_corpus").toJSON.collect() - predictVector.foreach(println(_)) - trainVector.foreach(println(_)) + predictVector.foreach(log.info(_)) + trainVector.foreach(log.info(_)) predictVector.foreach { f => assume(trainVector.contains(f)) } @@ -495,8 +499,8 @@ class TextSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLCon def validate(spark:SparkSession) = { val trainVector = spark.sql("select * from parquet.`/tmp/william/tmp/scaler2/data`").toJSON.collect() val predictVector = spark.sql("select jack(array(a,b))[0] a,jack(array(a,b))[1] b, c from orginal_text_corpus").toJSON.collect() - predictVector.foreach(println(_)) - trainVector.foreach(println(_)) + predictVector.foreach(log.info(_)) + trainVector.foreach(log.info(_)) predictVector.foreach { f => assume(trainVector.contains(f)) } diff --git a/streamingpro-mlsql/src/test/scala/streaming/test/stream/Stream2Spec.scala b/streamingpro-mlsql/src/test/scala/streaming/test/stream/Stream2Spec.scala index be8a323f4..2a2e6ddbe 100644 --- a/streamingpro-mlsql/src/test/scala/streaming/test/stream/Stream2Spec.scala +++ b/streamingpro-mlsql/src/test/scala/streaming/test/stream/Stream2Spec.scala @@ -23,9 +23,10 @@ import org.scalatest.BeforeAndAfterAll import streaming.core.strategy.platform.SparkRuntime import streaming.core.{BasicMLSQLConfig, SpecFunctions} import streaming.dsl.ScriptSQLExec +import tech.mlsql.common.utils.log.Logging import tech.mlsql.common.utils.shell.ShellCommand -class Stream2Spec extends BasicSparkOperation with SpecFunctions with BasicMLSQLConfig with BeforeAndAfterAll { +class Stream2Spec extends BasicSparkOperation with SpecFunctions with BasicMLSQLConfig with BeforeAndAfterAll with Logging { val topic_name = "test_cool" @@ -48,7 +49,9 @@ class Stream2Spec extends BasicSparkOperation with SpecFunctions with BasicMLSQL """.stripMargin, ssel) } catch { case e: Exception => - print(e.getMessage) + if (log.isDebugEnabled()) { + log.debug("Error: {}", e) + } } Thread.sleep(1000) count -= 1 @@ -112,7 +115,9 @@ class Stream2Spec extends BasicSparkOperation with SpecFunctions with BasicMLSQL """.stripMargin, ssel) } catch { case e: Exception => - print(e.getMessage) + if (log.isDebugEnabled()) { + log.debug("Error: {}", e) + } } Thread.sleep(1000) count -= 1 diff --git a/streamingpro-mlsql/src/test/scala/streaming/test/stream/StreamKafka1_xSpec.scala b/streamingpro-mlsql/src/test/scala/streaming/test/stream/StreamKafka1_xSpec.scala index 265f22707..7b75729e2 100644 --- a/streamingpro-mlsql/src/test/scala/streaming/test/stream/StreamKafka1_xSpec.scala +++ b/streamingpro-mlsql/src/test/scala/streaming/test/stream/StreamKafka1_xSpec.scala @@ -56,7 +56,10 @@ class StreamKafka1_xSpec extends BasicSparkOperation with SpecFunctions with Bas |as kafka.`${topic_name}` where metadata.broker.list="127.0.0.1:9092"; """.stripMargin, ssel) } catch { - case e: Exception => print(e.getMessage) + case e: Exception => + if (log.isDebugEnabled()) { + log.debug("Error: {}", e) + } } Thread.sleep(1000) @@ -105,7 +108,10 @@ class StreamKafka1_xSpec extends BasicSparkOperation with SpecFunctions with Bas |as kafka.`${topic_name}` where metadata.broker.list="127.0.0.1:9092"; """.stripMargin, ssel) } catch { - case e: Exception => print(e.getMessage) + case e: Exception => + if (log.isDebugEnabled()) { + log.debug("Error: {}", e) + } } Thread.sleep(1000) @@ -159,7 +165,10 @@ class StreamKafka1_xSpec extends BasicSparkOperation with SpecFunctions with Bas |as kafka.`${topic_name}` where metadata.broker.list="127.0.0.1:9092"; """.stripMargin) } catch { - case e: Exception => print(e.getMessage) + case e: Exception => + if (log.isDebugEnabled()) { + log.debug("Error: {}", e) + } } Thread.sleep(1000) diff --git a/streamingpro-mlsql/src/test/scala/streaming/test/stream/StreamSpec.scala b/streamingpro-mlsql/src/test/scala/streaming/test/stream/StreamSpec.scala index 1c7f82fc3..f5d2886ba 100644 --- a/streamingpro-mlsql/src/test/scala/streaming/test/stream/StreamSpec.scala +++ b/streamingpro-mlsql/src/test/scala/streaming/test/stream/StreamSpec.scala @@ -50,7 +50,10 @@ class StreamSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLC |as kafka8.`${topic_name}` where metadata.broker.list="127.0.0.1:9092"; """.stripMargin, ssel) } catch { - case e: Exception => print(e.getMessage) + case e: Exception => + if (log.isDebugEnabled()) { + log.debug("Error: {}", e) + } } Thread.sleep(1000) @@ -100,7 +103,10 @@ class StreamSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLC |as kafka8.`${topic_name}` where metadata.broker.list="127.0.0.1:9092"; """.stripMargin, ssel) } catch { - case e: Exception => print(e.getMessage) + case e: Exception => + if (log.isDebugEnabled()) { + log.debug("Error: {}", e) + } } Thread.sleep(1000) diff --git a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/NativeTest.scala b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/NativeTest.scala index bfbe35131..dada2c5ca 100644 --- a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/NativeTest.scala +++ b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/NativeTest.scala @@ -1,18 +1,23 @@ package tech.mlsql.test import org.scalatest.{BeforeAndAfterAll, FunSuite} +import tech.mlsql.common.utils.log.Logging import tech.mlsql.nativelib.runtime.MLSQLNativeRuntime /** * 20/10/2020 WilliamZhu(allwefantasy@gmail.com) */ -class NativeTest extends FunSuite with BeforeAndAfterAll { +class NativeTest extends FunSuite with BeforeAndAfterAll with Logging { test("nativeFuncLower") { - println(MLSQLNativeRuntime.funcLower("Dj")) + if (log.isInfoEnabled()) { + log.info(MLSQLNativeRuntime.funcLower("Dj")) + } assert("dj".equals(MLSQLNativeRuntime.funcLower("Dj"))) } test("nativeGetCPULoad"){ - println(MLSQLNativeRuntime.getCPULoad()) + if (log.isInfoEnabled()) { + log.info(MLSQLNativeRuntime.getCPULoad.toString) + } } } diff --git a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/Test2.scala b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/Test2.scala index ae7771a13..795f755ec 100644 --- a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/Test2.scala +++ b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/Test2.scala @@ -3,11 +3,12 @@ package tech.mlsql.test import org.apache.spark.streaming.BasicSparkOperation import streaming.core.strategy.platform.SparkRuntime import streaming.core.{BasicMLSQLConfig, SpecFunctions} +import tech.mlsql.common.utils.log.Logging /** * 10/11/2020 WilliamZhu(allwefantasy@gmail.com) */ -class Test2 extends BasicSparkOperation with SpecFunctions with BasicMLSQLConfig { +class Test2 extends BasicSparkOperation with SpecFunctions with BasicMLSQLConfig with Logging{ "data-drive" should "framework" in { @@ -17,7 +18,7 @@ class Test2 extends BasicSparkOperation with SpecFunctions with BasicMLSQLConfig session.sparkContext. parallelize(Seq(10), 2). mapPartitions { iter => - iter.foreach(item => println(item)) + iter.foreach(item => log.info(item.toString)) iter }. collect() diff --git a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/ds/MLSQLRestTest.scala b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/ds/MLSQLRestTest.scala index 03be0e415..23fd37d78 100644 --- a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/ds/MLSQLRestTest.scala +++ b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/ds/MLSQLRestTest.scala @@ -271,7 +271,7 @@ class MLSQLRestTest extends FunSuite with SparkOperationUtil with BasicMLSQLConf ), Option(session.emptyDataFrame) )) val rows = res.collect() - rows.foreach(println(_)) + rows.foreach(i => log.info(i.toString())) // the last one will also been append in the result with empty value assertEquals(3, rows.length) } @@ -321,7 +321,7 @@ class MLSQLRestTest extends FunSuite with SparkOperationUtil with BasicMLSQLConf ), Option(session.emptyDataFrame) )) val rows = res.collect() - rows.foreach(println(_)) + rows.foreach(i => log.info(i.toString())) // the last one will also been append in the result with empty value assertEquals(3, rows.length) } diff --git a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/dsl/DslSpec.scala b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/dsl/DslSpec.scala index dcacb2fa1..cfec0e806 100644 --- a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/dsl/DslSpec.scala +++ b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/dsl/DslSpec.scala @@ -665,7 +665,9 @@ class DslSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLConf | |${command}; """.stripMargin, ssel) - println(ssel.preProcessListener.get.toScript) + if (log.isInfoEnabled()) { + log.info(ssel.preProcessListener.get.toScript) + } assert(ssel.preProcessListener.get.toScript == targetStr) } @@ -697,7 +699,9 @@ class DslSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLConf def compareDSL(command: String, targetStr: String) = { val ssel = createSSEL executeScript(command, ssel) - println(ssel.preProcessListener.get.toScript) + if (log.isInfoEnabled()) { + log.info(ssel.preProcessListener.get.toScript) + } assert(ssel.preProcessListener.get.toScript contains targetStr) } diff --git a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/dsl/TemplateSpec.scala b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/dsl/TemplateSpec.scala index 879ce7669..db20ba59f 100644 --- a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/dsl/TemplateSpec.scala +++ b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/dsl/TemplateSpec.scala @@ -3,14 +3,17 @@ package tech.mlsql.test.dsl import org.apache.spark.streaming.BasicSparkOperation import streaming.core.{BasicMLSQLConfig, SpecFunctions} import tech.mlsql.common.utils.base.Templates +import tech.mlsql.common.utils.log.Logging /** * 25/8/2020 WilliamZhu(allwefantasy@gmail.com) */ -class TemplateSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLConfig { +class TemplateSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLConfig with Logging { "named template" should "work" in { - println(Templates.evaluate(" hello {} ",Seq("jack"))) - println(Templates.evaluate(" hello {0} {1} {0}",Seq("jack","wow"))) - println(Templates.evaluate(" hello {0} {1} {2:uuid()}",Seq("jack","wow"))) + if (log.isInfoEnabled()) { + log.info(Templates.evaluate(" hello {} ",Seq("jack"))) + log.info(Templates.evaluate(" hello {0} {1} {0}",Seq("jack","wow"))) + log.info(Templates.evaluate(" hello {0} {1} {2:uuid()}",Seq("jack","wow"))) + } } } diff --git a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/tf/TFSpec.scala b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/tf/TFSpec.scala index 737522d44..026012a29 100644 --- a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/tf/TFSpec.scala +++ b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/tf/TFSpec.scala @@ -28,7 +28,6 @@ class TFSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLConfi val source = Source.fromFile(new File(getExampleProject + "/tf_demo.py")).getLines().mkString("\n") writeStringToFile("/tmp/william/tmp/tf/tf_demo.py", source) executeCode(runtime, TFExmaple.code("/tmp/tf/tf_demo/model", "/tmp/william/tmp/tf/tf_demo.py")) - println() } } diff --git a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/udf/UDFSuite.scala b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/udf/UDFSuite.scala index 6ab4fea03..3b291286e 100644 --- a/streamingpro-mlsql/src/test/scala/tech/mlsql/test/udf/UDFSuite.scala +++ b/streamingpro-mlsql/src/test/scala/tech/mlsql/test/udf/UDFSuite.scala @@ -174,7 +174,7 @@ class UDFSuite extends BasicSparkOperation with SpecFunctions with BasicMLSQLCon assert(result.size == 1) - result.foreach(println) + result.foreach(i => log.info(i.toString())) assert(result.head.getAs[Map[String, WrappedArray[Int]]]("res")("a")(0) == 1) } } diff --git a/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/sql/execution/streaming/jdbc.scala b/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/sql/execution/streaming/jdbc.scala index 945a255c0..a9296ece7 100644 --- a/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/sql/execution/streaming/jdbc.scala +++ b/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/sql/execution/streaming/jdbc.scala @@ -123,7 +123,9 @@ class JDBCSink(_options: Map[String, String]) extends Sink with Logging { connection.commit() } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("close Error: {}", e) + } connection.rollback() } finally { sqlArray.map(_.close()) diff --git a/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/util/WowXORShiftRandom.scala b/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/util/WowXORShiftRandom.scala index e1f5ec3af..65358a8e3 100644 --- a/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/util/WowXORShiftRandom.scala +++ b/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/util/WowXORShiftRandom.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import org.apache.spark.util.random.XORShiftRandom +import org.slf4j.{Logger, LoggerFactory} /** * Created by allwefantasy on 5/2/2018. @@ -33,10 +34,15 @@ class WowXORShiftRandom { } object WowXORShiftRandom { + + private val log: Logger = LoggerFactory.getLogger(WowXORShiftRandom.getClass) def main(args: Array[String]): Unit = { val random = new WowXORShiftRandom() (0 until 1000).foreach { f => - println(random.nextDouble) + if (log.isInfoEnabled()) { + log.info(f.toString) + } + } } } diff --git a/streamingpro-spark-3.0.0-adaptor/src/main/java/org/apache/spark/sql/execution/streaming/jdbc.scala b/streamingpro-spark-3.0.0-adaptor/src/main/java/org/apache/spark/sql/execution/streaming/jdbc.scala index 61044cc94..1789e683d 100644 --- a/streamingpro-spark-3.0.0-adaptor/src/main/java/org/apache/spark/sql/execution/streaming/jdbc.scala +++ b/streamingpro-spark-3.0.0-adaptor/src/main/java/org/apache/spark/sql/execution/streaming/jdbc.scala @@ -123,7 +123,9 @@ class JDBCSink(_options: Map[String, String]) extends Sink with Logging { connection.commit() } catch { case e: Exception => - e.printStackTrace() + if (log.isDebugEnabled()) { + log.debug("close Error: {}", e) + } connection.rollback() } finally { sqlArray.map(_.close()) diff --git a/streamingpro-spark-3.0.0-adaptor/src/main/java/org/apache/spark/util/WowXORShiftRandom.scala b/streamingpro-spark-3.0.0-adaptor/src/main/java/org/apache/spark/util/WowXORShiftRandom.scala index e1f5ec3af..94dc18117 100644 --- a/streamingpro-spark-3.0.0-adaptor/src/main/java/org/apache/spark/util/WowXORShiftRandom.scala +++ b/streamingpro-spark-3.0.0-adaptor/src/main/java/org/apache/spark/util/WowXORShiftRandom.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import org.apache.spark.util.random.XORShiftRandom +import org.slf4j.{Logger, LoggerFactory} /** * Created by allwefantasy on 5/2/2018. @@ -33,10 +34,14 @@ class WowXORShiftRandom { } object WowXORShiftRandom { + + private val log: Logger = LoggerFactory.getLogger(WowXORShiftRandom.getClass) def main(args: Array[String]): Unit = { val random = new WowXORShiftRandom() (0 until 1000).foreach { f => - println(random.nextDouble) + if (log.isInfoEnabled()) { + log.info(f.toString) + } } } } diff --git a/streamingpro-spark-common/src/main/java/streaming/core/DownloadRunner.java b/streamingpro-spark-common/src/main/java/streaming/core/DownloadRunner.java index e5729f2b9..bdc61360b 100644 --- a/streamingpro-spark-common/src/main/java/streaming/core/DownloadRunner.java +++ b/streamingpro-spark-common/src/main/java/streaming/core/DownloadRunner.java @@ -89,7 +89,9 @@ public static int createTarFileStream(OutputStream output, String pathStr) { } else return 400; } catch (Exception e) { - e.printStackTrace(); + if (logger.isDebugEnabled()) { + logger.debug(String.format("createTarFileStream Error!!!\n pathStr: %s \n exception: %s", pathStr, e)); + } return 500; } @@ -99,7 +101,9 @@ public static int getTarFileByPath(HttpServletResponse res, String pathStr) { try { return createTarFileStream(res.getOutputStream(), pathStr); } catch (IOException e) { - e.printStackTrace(); + if (logger.isDebugEnabled()) { + logger.debug(String.format("getTarFileByPath Error!!!\n res: %s \n pathStr: %s \n exception: %s", res, pathStr, e)); + } return 500; } } @@ -178,8 +182,9 @@ public static int getRawFileByPath(HttpServletResponse res, String path, long po } catch (Exception e) { - e.printStackTrace(); - + if (logger.isDebugEnabled()) { + logger.debug(String.format("getRawFileByPath Error!!!\n res: %s \n path: %s \n position: %s \n exception: %s", res, path, position, e)); + } } return 500;