-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
HttpClientWithConsulVerticle.java
95 lines (79 loc) · 3.27 KB
/
HttpClientWithConsulVerticle.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package io.vertx.example.serviceresolver.consul;
import io.vertx.core.Future;
import io.vertx.core.VerticleBase;
import io.vertx.core.Vertx;
import io.vertx.core.http.*;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.endpoint.LoadBalancer;
import io.vertx.serviceresolver.ServiceAddress;
import io.vertx.serviceresolver.srv.SrvResolver;
import io.vertx.serviceresolver.srv.SrvResolverOptions;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.InternetProtocol;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import java.util.ArrayList;
import java.util.List;
public class HttpClientWithConsulVerticle extends VerticleBase {
private static final int NUM_SERVERS = 3;
private static final int NUM_QUERIES = 10;
public static void main(String[] args) throws Exception {
// Start a consul container, need to expose UDP
GenericContainer<?> container = new FixedHostPortGenericContainer<>("consul:1.9")
.withFixedExposedPort(8500, 8500)
.withFixedExposedPort(8600, 8600, InternetProtocol.UDP);
container.setWaitStrategy(new HostPortWaitStrategy().forPorts(8500));
container.start();
Vertx vertx = Vertx.vertx();
// Create a few services and populate consul
HttpClientAgent client = vertx.createHttpClient();
for (int i = 0; i < NUM_SERVERS; i++) {
String serviceId = "app" + i;
int servicePort = 8080 + i;
String serviceAddress = "localhost";
vertx.createHttpServer().requestHandler(req -> req
.response()
.end(serviceId))
.listen(servicePort, serviceAddress)
.await();
client
.request(HttpMethod.PUT, 8500, "localhost", "/v1/agent/service/register")
.compose(req -> req
.send(new JsonObject().put("ID", serviceId).put("Name", "svc").put("Address", serviceAddress).put("Port", servicePort).encode())
.expecting(HttpResponseExpectation.SC_OK)
.compose(HttpClientResponse::end))
.await();
}
vertx.deployVerticle(new HttpClientWithConsulVerticle());
}
private HttpClientAgent client;
@Override
public Future<?> start() {
// Default load balancer is round-robin, you can configure another one
LoadBalancer loadBalancer;
loadBalancer= LoadBalancer.ROUND_ROBIN;
// loadBalancer = LoadBalancer.POWER_OF_TWO_CHOICES;
client = vertx.httpClientBuilder()
.withLoadBalancer(loadBalancer)
.withAddressResolver(SrvResolver.create(new SrvResolverOptions()
.setServer(SocketAddress.inetSocketAddress(8600, "127.0.0.1"))
.setMinTTL(5)
)).build();
List<Future<?>> futs = new ArrayList<>();
for (int i = 0; i < NUM_QUERIES; i++) {
int idx = i;
ServiceAddress addr = ServiceAddress.of("svc.service.consul");
Future<?> fut = client.request(new RequestOptions().setServer(addr)).compose(req -> req
.send()
.expecting(HttpResponseExpectation.SC_OK)
.compose(HttpClientResponse::body)).andThen(ar -> {
if (ar.succeeded()) {
System.out.println(idx + " -> " + ar.result());
}
});
futs.add(fut);
}
return Future.all(futs);
}
}