-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathReferenceSpec.scala
145 lines (114 loc) · 4.16 KB
/
ReferenceSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.scaladsl
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.connectors.reference._
import pekko.stream.connectors.reference.scaladsl.Reference
import pekko.stream.connectors.testkit.scaladsl.LogCapturing
import pekko.stream.scaladsl.{ Flow, Sink, Source }
import pekko.testkit.TestKit
import pekko.util.ByteString
import pekko.{ Done, NotUsed }
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import scala.collection.immutable
import scala.concurrent.Future
/**
* Append "Spec" to every Scala test suite.
*/
class ReferenceSpec extends AnyWordSpec with BeforeAndAfterAll with ScalaFutures with Matchers with LogCapturing {
implicit val system: ActorSystem = ActorSystem("ReferenceSpec")
final val ClientId = "test-client-id"
"reference connector" should {
/**
* Type annotations not generally needed on local variables.
* However it allows to check if the types are really what we want.
*/
"compile settings" in {
val providedAuth: Authentication.Provided =
Authentication.Provided().withVerifier(c => true)
val noAuth: Authentication.None =
Authentication.None
val settings: SourceSettings = SourceSettings(ClientId)
settings.withAuthentication(providedAuth)
settings.withAuthentication(noAuth)
}
"compile source" in {
// #source
val settings: SourceSettings = SourceSettings(ClientId)
val source: Source[ReferenceReadResult, Future[Done]] =
Reference.source(settings)
// #source
source
}
"compile flow" in {
// #flow
val flow: Flow[ReferenceWriteMessage, ReferenceWriteResult, NotUsed] =
Reference.flow()
// #flow
flow
}
"run source" in {
val source = Reference.source(SourceSettings(ClientId))
val msg = source.runWith(Sink.head).futureValue
msg.data should contain theSameElementsAs Seq(ByteString("one"))
}
"run flow" in {
val flow = Reference.flow()
val source = Source(
immutable.Seq(
ReferenceWriteMessage()
.withData(immutable.Seq(ByteString("one")))
.withMetrics(Map("rps" -> 20L, "rpm" -> 30L)),
ReferenceWriteMessage().withData(
immutable.Seq(
ByteString("two"),
ByteString("three"),
ByteString("four"))),
ReferenceWriteMessage().withData(
immutable.Seq(
ByteString("five"),
ByteString("six"),
ByteString("seven")))))
val result = source.via(flow).runWith(Sink.seq).futureValue
result.flatMap(_.message.data) should contain theSameElementsAs Seq(
"one",
"two",
"three",
"four",
"five",
"six",
"seven").map(ByteString.apply)
result.head.metrics.get("total") should contain(50L)
}
"resolve resource from application config" in {
val result = Source
.single(ReferenceWriteMessage().withData(immutable.Seq(ByteString("one"))))
.via(Reference.flowWithResource())
.runWith(Sink.seq)
result.futureValue.flatMap(_.message.data).map(_.utf8String) shouldBe Seq("one default msg")
}
"use resource from attributes" in {
val resource = Resource(ResourceSettings("attributes msg"))
val result = Source
.single(ReferenceWriteMessage().withData(immutable.Seq(ByteString("one"))))
.via(Reference.flowWithResource().withAttributes(ReferenceAttributes.resource(resource)))
.runWith(Sink.seq)
result.futureValue.flatMap(_.message.data).map(_.utf8String) shouldBe Seq("one attributes msg")
}
}
override def afterAll() =
TestKit.shutdownActorSystem(system)
}