Elasticsearch scala http client base on akka http. Integration with play json library for json (de)serialisation.
Work with elasticsearch 5, 6 and 7.
Dependencies :
libraryDependencies += "com.adelegue" %% "elastic-scala-http" % "X.X.X"
resolvers += Resolver.bintrayRepo("larousso", "maven")
Import :
For ES 5 / 6 :
import elastic.es6.api._
import elastic.es6.client._
// Play json support
For ES 7 :
import elastic.es7.api._
import elastic.es7.client._
// Play json support
Client creation :
implicit val actorSystem = ActorSystem()
val client = ElasticClient[JsValue](host = "localhost", port = 9200)
Or
val client: ElasticClient[JsValue] = ElasticClientBuilder()
.withHost("localhost")
.withPort(9200)
//Optional default one provided
.withActorSystem(actorSystem)
.build()
For ES 5 / 6 :
val jsonSettings =
"""
{
"settings" : { "number_of_shards" : 1 },
"mappings" : {
"type1" : {
"properties" : {
"field1" : { "type" : "string", "index" : "not_analyzed" }
}
}
}
}
"""
val indexCreation: Future[IndexOps] = client.createIndex("test", Json.parse(jsonSettings))
//True
val index: Future[Boolean] = client.verifyIndex("test")
//Json index
val indexDefinition: Future[JsValue] = client.getIndex("test")
//Json mapping
val mapping: Future[JsValue] = client.getMapping("test", "type1")
//Json mapping
val mappings: Future[JsValue] = client.getMappings(Seq("test1", "test2"), Seq("type1", "type2"))
val aliases = """
{
"actions" : [
{ "add" : { "index" : "test", "alias" : "alias" } }
]
} """
private val aliasesCreation: Future[IndexOps] = client.createAliases(Json.parse(aliases))
val deletion: Future[IndexOps] = client.deleteIndex("test")
//False
val indexExists: Future[Boolean] = client.verifyIndex("test")
//Index doesn't exists anymore
client.getIndex("test").onFailure {
case e: EsException[JsValue] =>
val httpCode: Int = e.httpCode //404
}
For ES 7 :
val jsonSettings =
"""
{
"settings" : { "number_of_shards" : 1 },
"mappings" : {
"properties" : {
"field1" : { "type" : "string", "index" : "not_analyzed" }
}
}
}
"""
val indexCreation: Future[IndexOps] = client.createIndex("test", Json.parse(jsonSettings))
//True
val index: Future[Boolean] = client.verifyIndex("test")
//Json index
val indexDefinition: Future[JsValue] = client.getIndex("test")
//Json mapping
val mapping: Future[JsValue] = client.getMapping("test")
//Json mapping
val mappings: Future[JsValue] = client.getMappings(Seq("test1", "test2"))
val aliases = """
{
"actions" : [
{ "add" : { "index" : "test", "alias" : "alias" } }
]
} """
private val aliasesCreation: Future[IndexOps] = client.createAliases(Json.parse(aliases))
val deletion: Future[IndexOps] = client.deleteIndex("test")
//False
val indexExists: Future[Boolean] = client.verifyIndex("test")
//Index doesn't exists anymore
client.getIndex("test").onFailure {
case e: EsException[JsValue] =>
val httpCode: Int = e.httpCode //404
}
case class MonDocument(name: String)
implicit val docFormat = Json.format[MonDocument]
//Index référence (ES 5 / 6)
//val indexTest: Index[JsValue] = client.index("test", Some("type"))
//Index référence (ES 7)
val indexTest: Index[JsValue] = client.index("test")
// Id autocreation
val create: Future[IndexResponse[JsValue]] = indexTest.index(MonDocument("name"))
// With id
val createWithId: Future[IndexResponse[JsValue]] = indexTest.index(MonDocument("name"), Some("id1"))
//Update
val update: Future[IndexResponse[JsValue]] = indexTest.index(MonDocument("name bis"), Some("id1"))
// Create on existing doc
indexTest.create(MonDocument("name bis"), Some("id1")).onFailure {
case e: EsException[JsValue] =>
e.httpCode // 409
e.json // / ("error") / ("type" -> "document_already_exists_exception")
}
//Read
val doc: Future[MonDocument] = indexTest.get("id1").map(_.as[MonDocument])
val delete: Future[IndexResponse[JsValue]] = indexTest.delete("id1")
//Indexing options
indexTest.index(MonDocument("test"),
id = Some("id"),
version = Some(1),
versionType = Some(INTERNAL),
create = false,
routing = Some("routing"),
parent = Some("parent id"),
refresh = false,
timeout = Some("1m"),
consistency = Some(QUORUM),
detectNoop = false
)
//Searching
val search: Future[SearchResponse[JsValue]] = indexTest.search(Json.obj("query" -> Json.obj("term" -> Json.obj("name" -> "nom"))))
val docs: Future[Seq[MonDocument]] = search.map(_.hitsAs[MonDocument])
case class Parent(name: String)
case class Child(name: String)
implicit val parentFormat = Json.format[Parent]
implicit val childFormat = Json.format[Child]
val jsonSettings =
"""
{
"mappings" : {
"parent" : {},
"child": {
"_parent": {"type": "parent"}
}
}
}
"""
client.createIndex("parentchild", Json.parse(jsonSettings))
val parent: Index[JsValue] = client.index("parentchild", Some("parent"))
val child: Index[JsValue] = client.index("parentchild", Some("child"))
val parentObj = Parent("I'am the parent")
private val parentIndexing: Future[IndexResponse[JsValue]] = parent.index(parentObj, Some("id1"))
val childObj = Child("I'am the child")
val childIndexing: Future[IndexResponse[JsValue]] = child.index(childObj, Some("id2"), parent = Some("id1"))
//Routing option with parent id to get child
val gettingChild: Future[GetResponse[JsValue]] = child.get("id2", routing = Some("id1"))
Reactive streams support for bulk and scroll search
Bulks (ES 5 / 6) :
val publisher: Publisher[Bulk[MonDocument]] = Source((1 to 105).map(_.toString))
.map(i => Bulk[MonDocument](BulkOpType(index = Some(BulkOpDetail(Some("index"), Some("type"), Some(i)))), Some(MonDocument(s"Nom $i"))) )
.runWith(Sink.asPublisher(fanout = false))
// Publisher of bulk operation
// batchSize is number elements send to es each request
val respPublisher: Publisher[BulkResponse[JsValue]] = client.bulk[MonDocument](publisher = publisher, batchSize = 10)
Or
val ids = (1 to 105).map(i => i.toString).toList
val res: Future[Seq[BulkResponse[JsValue]]] = Source(ids)
.map(i => Bulk[MonDocument](BulkOpType(index = Some(BulkOpDetail(Some("index"), Some("type"), Some(i)))), Some(MonDocument(s"Nom $i"))))
.via(client.bulkFlow[MonDocument](batchSize = 10))
.runWith(Sink.seq)
Bulks (ES 7) :
val publisher: Publisher[Bulk[MonDocument]] = Source((1 to 105).map(_.toString))
.map(i => Bulk[MonDocument](BulkOpType(index = Some(BulkOpDetail(Some("index"), Some(i)))), Some(MonDocument(s"Nom $i"))) )
.runWith(Sink.asPublisher(fanout = false))
// Publisher of bulk operation
// batchSize is number elements send to es each request
val respPublisher: Publisher[BulkResponse[JsValue]] = client.bulk[MonDocument](publisher = publisher, batchSize = 10)
Or
val ids = (1 to 105).map(i => i.toString).toList
val res: Future[Seq[BulkResponse[JsValue]]] = Source(ids)
.map(i => Bulk[MonDocument](BulkOpType(index = Some(BulkOpDetail(Some("index"), Some(i)))), Some(MonDocument(s"Nom $i"))))
.via(client.bulkFlow[MonDocument](batchSize = 10))
.runWith(Sink.seq)
Scroll search (ES 5 / 6)
val matchAllQuery = Json.obj("query" -> Json.obj("match_all" -> Json.obj()))
Source.fromPublisher(client.scrollPublisher(Seq("index"), Seq("type"), matchAllQuery, size = Some(10)))
.mapConcat[MonDocument](r => r.hitsAs[MonDocument].toList)
.runForeach(println)
//Or
client.scroll(index = Seq("index"), query = matchAllQuery, size = Some(10)))
.mapConcat[MonDocument](r => r.hitsAs[MonDocument].toList)
.runForeach(println)
Scroll search (ES 7)
val matchAllQuery = Json.obj("query" -> Json.obj("match_all" -> Json.obj()))
Source.fromPublisher(client.scrollPublisher(Seq("index"), matchAllQuery, size = Some(10)))
.mapConcat[MonDocument](r => r.hitsAs[MonDocument].toList)
.runForeach(println)
//Or
client.scroll(index = Seq("index"), query = matchAllQuery, size = Some(10)))
.mapConcat[MonDocument](r => r.hitsAs[MonDocument].toList)
.runForeach(println)
#TODO
msearch
sbt "release with-defaults"