Skip to content
This repository was archived by the owner on Mar 30, 2021. It is now read-only.

Commit

Permalink
initial transition
Browse files Browse the repository at this point in the history
  • Loading branch information
Harish Butani committed Oct 24, 2016
1 parent b582a7b commit 095c18b
Show file tree
Hide file tree
Showing 40 changed files with 825 additions and 506 deletions.
69 changes: 48 additions & 21 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

import SparkShim._
// import SparkShim._

scalaVersion := "2.10.5"
scalaVersion := "2.11.8"

crossScalaVersions := Seq("2.10.5", "2.11.6")
crossScalaVersions := Seq("2.10.5", "2.11.8")

parallelExecution in Test := false

Expand All @@ -15,14 +15,43 @@ val sparkdateTimeVersion = "0.0.2"
val scoptVersion = "3.3.0"
val druidVersion = "0.9.1"


val sparkVersion = sys.props.getOrElse("sparkVersion", default = "1.6.2")
val sparkNamExt = if (sparkVersion == "1.6.1") "-onesixone" else ""
val sparkVersion_161 = "1.6.1"
val sparkVersion_162 = "2.0.0"
val guava_version = "16.0.1"
val derbyVersion = "10.11.1.1"

val spark161Dependencies = Seq(
"com.google.guava" % "guava" % guava_version % "provided" force(),
"org.apache.derby" % "derby" % derbyVersion force(),
"org.apache.spark" %% "spark-core" % sparkVersion_161 % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion_161 % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion_161 % "provided",
"org.apache.spark" %% "spark-hive-thriftserver" % sparkVersion_161 % "provided"
)

val spark162Dependencies = Seq(
"com.google.guava" % "guava" % guava_version % "provided" force(),
"org.apache.derby" % "derby" % derbyVersion force(),
"org.apache.spark" %% "spark-core" % sparkVersion_162 % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion_162 % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion_162 % "provided",
"org.apache.spark" %% "spark-hive-thriftserver" % sparkVersion_162 % "provided"
)

val sparkDependencies =
if (sparkVersion == sparkVersion_161 ) spark161Dependencies else spark162Dependencies

val coreDependencies = Seq(
"com.github.nscala-time" %% "nscala-time" % nscalaVersion,
"org.apache.httpcomponents" % "httpclient" % httpclientVersion,
// "org.json4s" %% "json4s-native" % json4sVersion,
"org.json4s" %% "json4s-ext" % json4sVersion,
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-smile" % "2.4.6",
"com.fasterxml.jackson.jaxrs" % "jackson-jaxrs-smile-provider" % "2.4.6",
"com.sparklinedata" %% "spark-datetime" % sparkdateTimeVersion,
// "com.sparklinedata" %% "spark-datetime" % sparkdateTimeVersion,
"com.github.scopt" %% "scopt" % scoptVersion,
"org.scalatest" %% "scalatest" % scalatestVersion % "test"
)
Expand All @@ -31,6 +60,7 @@ val coreTestDependencies = Seq(
"org.scalatest" %% "scalatest" % scalatestVersion % "test",
"com.databricks" %% "spark-csv" % "1.1.0" % "test"
)

val druidTestEnvDependencies =Seq(
"com.google.guava" % "guava" % guava_version force(),
"org.apache.derby" % "derby" % derbyVersion force(),
Expand Down Expand Up @@ -118,18 +148,18 @@ lazy val commonSettings = Seq(
ReleaseKeys.publishArtifactsAction := PgpKeys.publishSigned.value
)

lazy val spark1_6_1 = project.in(file("shims/spark-1.6.1"))
.settings(
libraryDependencies ++= spark161Dependencies
)

lazy val spark1_6_2 = project.in(file("shims/spark-1.6.2"))
.settings(
libraryDependencies ++= spark162Dependencies
)

lazy val sparkShimProject =
if (sparkVersion == sparkVersion_161 ) spark1_6_1 else spark1_6_2
//lazy val spark1_6_1 = project.in(file("shims/spark-1.6.1"))
// .settings(
// libraryDependencies ++= spark161Dependencies
// )
//
//lazy val spark1_6_2 = project.in(file("shims/spark-1.6.2"))
// .settings(
// libraryDependencies ++= spark162Dependencies
// )
//
//lazy val sparkShimProject =
// if (sparkVersion == sparkVersion_161 ) spark1_6_1 else spark1_6_2

lazy val druidTestEnv = project.in(file("druidTestEnv"))
.settings(
Expand All @@ -140,15 +170,12 @@ lazy val root = project.in(file("."))
.settings(commonSettings: _*)
.settings(
name := s"spl-accel$sparkNamExt",
libraryDependencies ++= (sparkDependencies ++ coreDependencies ++ coreTestDependencies),
libraryDependencies ++= ( coreDependencies ++ sparkDependencies ++ coreTestDependencies),
assemblyOption in assembly :=
(assemblyOption in assembly).value.copy(includeScala = false),
publishArtifact in (Compile, packageBin) := false,
publishArtifact in Test := true
)
.settings(addArtifact(artifact in (Compile, assembly), assembly).settings: _*)
.dependsOn(
sparkShimProject
).
dependsOn(druidTestEnv % "test->test")
// .dependsOn(druidTestEnv % "test->test")

Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ trait DruidCluster {
}

object DruidCluster {
lazy val instance : DruidCluster = if (true) {
lazy val instance : DruidCluster = if (false) {
DruidTestCluster
} else {
LocalCluster
Expand Down
2 changes: 1 addition & 1 deletion project/SparkShim.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ object SparkShim {
val sparkVersion = sys.props.getOrElse("sparkVersion", default = "1.6.2")
val sparkNamExt = if (sparkVersion == "1.6.1") "-onesixone" else ""
val sparkVersion_161 = "1.6.1"
val sparkVersion_162 = "1.6.2"
val sparkVersion_162 = "2.0.0"
val guava_version = "16.0.1"
val derbyVersion = "10.11.1.1"

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// This file should only contain the version of sbt to use.
sbt.version=0.13.9
sbt.version=0.13.11
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql.sparklinedata.shim

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinOptimizer
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.execution.SparkOptimizer


trait ShimStrategyToRuleStrategy {
Expand All @@ -36,11 +38,11 @@ trait ShimStrategyToRuleStrategy {
case class ExtendedLogicalOptimizer private[shim](conf : SQLConf,
extraRules :
Seq[(String, SparkShim.RuleStrategy, Rule[LogicalPlan])]
) extends Optimizer(conf)
) extends Optimizer(null, conf)
with ShimStrategyToRuleStrategy {

override val batches =
DefaultOptimizer(conf).batches.asInstanceOf[List[Batch]] ++
new SparkOptimizer(null, conf, null).batches.asInstanceOf[List[Batch]] ++
extraRules.map(b => Batch(b._1, b._2, b._3)).toList
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/org/apache/spark/sql/CachedTablePattern.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import scala.collection.mutable.{Map => mMap}
*/
class CachedTablePattern(val sqlContext : SQLContext) extends PredicateHelper {

def cacheManager = sqlContext.cacheManager
def cacheManager = sqlContext.sparkSession.sharedState.cacheManager

@transient
private val cacheLock = new ReentrantReadWriteLock
Expand Down Expand Up @@ -73,7 +73,7 @@ class CachedTablePattern(val sqlContext : SQLContext) extends PredicateHelper {
}

def tablesToCheck : Array[String] = {
val l = sqlContext.getConf(DruidPlanner.SPARKLINEDATA_CACHE_TABLES_TOCHECK)
val l = sqlContext.conf.getConf(DruidPlanner.SPARKLINEDATA_CACHE_TABLES_TOCHECK)
l match {
case l if l.isEmpty => Array()
case l if l.size == 1 && l(0).trim == "" => Array()
Expand Down
65 changes: 65 additions & 0 deletions src/main/scala/org/apache/spark/sql/SPLLogging.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.internal.Logging


trait SPLLogging extends Logging {

override def logInfo(msg: => String) {
super.logInfo(msg)
}

override def logDebug(msg: => String) {
super.logDebug(msg)
}

override def logTrace(msg: => String) {
super.logTrace(msg)
}

override def logWarning(msg: => String) {
super.logWarning(msg)
}

override def logError(msg: => String) {
super.logError(msg)
}

override def logInfo(msg: => String, throwable: Throwable) {
super.logInfo(msg, throwable)
}

override def logDebug(msg: => String, throwable: Throwable) {
super.logDebug(msg, throwable)
}

override def logTrace(msg: => String, throwable: Throwable) {
super.logTrace(msg, throwable)
}

override def logWarning(msg: => String, throwable: Throwable) {
super.logWarning(msg, throwable)
}

override def logError(msg: => String, throwable: Throwable) {
super.logError(msg, throwable)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.sparklinedata

import scala.language.implicitConversions
import scala.util.parsing.combinator.lexical.StdLexical
import scala.util.parsing.combinator.syntactical.StandardTokenParsers
import scala.util.parsing.combinator.PackratParsers
import scala.util.parsing.input.CharArrayReader.EofCh
import org.apache.spark.sql.catalyst.plans.logical._

private[sql] abstract class AbstractSparkSQLParser
extends StandardTokenParsers with PackratParsers {

def parse(input: String): LogicalPlan = synchronized {
// Initialize the Keywords.
initLexical
phrase(start)(new lexical.Scanner(input)) match {
case Success(plan, _) => plan
case failureOrError => sys.error(failureOrError.toString)
}
}
/* One time initialization of lexical.This avoid reinitialization of lexical in parse method */
protected lazy val initLexical: Unit = lexical.initialize(reservedWords)

protected case class Keyword(str: String) {
def normalize: String = lexical.normalizeKeyword(str)
def parser: Parser[String] = normalize
}

protected implicit def asParser(k: Keyword): Parser[String] = k.parser

// By default, use Reflection to find the reserved words defined in the sub class.
// NOTICE, Since the Keyword properties defined by sub class, we couldn't call this
// method during the parent class instantiation, because the sub class instance
// isn't created yet.
protected lazy val reservedWords: Seq[String] =
this
.getClass
.getMethods
.filter(_.getReturnType == classOf[Keyword])
.map(_.invoke(this).asInstanceOf[Keyword].normalize)

// Set the keywords as empty by default, will change that later.
override val lexical = new SqlLexical

protected def start: Parser[LogicalPlan]

// Returns the whole input string
protected lazy val wholeInput: Parser[String] = new Parser[String] {
def apply(in: Input): ParseResult[String] =
Success(in.source.toString, in.drop(in.source.length()))
}

// Returns the rest of the input string that are not parsed yet
protected lazy val restInput: Parser[String] = new Parser[String] {
def apply(in: Input): ParseResult[String] =
Success(
in.source.subSequence(in.offset, in.source.length()).toString,
in.drop(in.source.length()))
}
}

class SqlLexical extends StdLexical {
case class DecimalLit(chars: String) extends Token {
override def toString: String = chars
}

/* This is a work around to support the lazy setting */
def initialize(keywords: Seq[String]): Unit = {
reserved.clear()
reserved ++= keywords
}

/* Normal the keyword string */
def normalizeKeyword(str: String): String = str.toLowerCase

delimiters += (
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>"
)

protected override def processIdent(name: String) = {
val token = normalizeKeyword(name)
if (reserved contains token) Keyword(token) else Identifier(name)
}

override lazy val token: Parser[Token] =
( rep1(digit) ~ scientificNotation ^^ { case i ~ s => DecimalLit(i.mkString + s) }
| '.' ~> (rep1(digit) ~ scientificNotation) ^^
{ case i ~ s => DecimalLit("0." + i.mkString + s) }
| rep1(digit) ~ ('.' ~> digit.*) ~ scientificNotation ^^
{ case i1 ~ i2 ~ s => DecimalLit(i1.mkString + "." + i2.mkString + s) }
| digit.* ~ identChar ~ (identChar | digit).* ^^
{ case first ~ middle ~ rest => processIdent((first ++ (middle :: rest)).mkString) }
| rep1(digit) ~ ('.' ~> digit.*).? ^^ {
case i ~ None => NumericLit(i.mkString)
case i ~ Some(d) => DecimalLit(i.mkString + "." + d.mkString)
}
| '\'' ~> chrExcept('\'', '\n', EofCh).* <~ '\'' ^^
{ case chars => StringLit(chars mkString "") }
| '"' ~> chrExcept('"', '\n', EofCh).* <~ '"' ^^
{ case chars => StringLit(chars mkString "") }
| '`' ~> chrExcept('`', '\n', EofCh).* <~ '`' ^^
{ case chars => Identifier(chars mkString "") }
| EofCh ^^^ EOF
| '\'' ~> failure("unclosed string literal")
| '"' ~> failure("unclosed string literal")
| delim
| failure("illegal character")
)

override def identChar: Parser[Elem] = letter | elem('_')

private lazy val scientificNotation: Parser[String] =
(elem('e') | elem('E')) ~> (elem('+') | elem('-')).? ~ rep1(digit) ^^ {
case s ~ rest => "e" + s.mkString + rest.mkString
}

override def whitespace: Parser[Any] =
( whitespaceChar
| '/' ~ '*' ~ comment
| '/' ~ '/' ~ chrExcept(EofCh, '\n').*
| '#' ~ chrExcept(EofCh, '\n').*
| '-' ~ '-' ~ chrExcept(EofCh, '\n').*
| '/' ~ '*' ~ failure("unclosed comment")
).*
}

Loading

0 comments on commit 095c18b

Please sign in to comment.