Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service integration #7

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ public void create() {
});
}
this.createdPod.set(createdPod);
Runtime.getRuntime().addShutdownHook(new Thread(() -> removePod(podName, coreV1Api)));

if (this.waitStrategy != null) {
try (Watch<V1Pod> watch = Watch.createWatch(
coreV1Api.getApiClient(),
Expand All @@ -367,7 +369,6 @@ public void create() {
throw new RuntimeException(e);
}
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> removePod(podName, coreV1Api)));

filesToMountOnceStarted.forEach(fileToMountOnceStarted -> copyFileToPodContainer(fileToMountOnceStarted.getContainerName(), fileToMountOnceStarted.getSrcPath(), fileToMountOnceStarted.getDestPath()));
onKubernetesObjectReady();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package com.github.jeanbaptistewatenberg.junit5kubernetes.core;

import com.github.jeanbaptistewatenberg.junit5kubernetes.core.wait.WaitStrategy;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

public class Service extends KubernetesGenericObject<Service> {
protected static final String DEBUG = System.getProperty("junitKubernetesDebug");
protected static final String DISABLE_HTTP2 = System.getProperty("junitKubernetesDisableHttp2");
protected static final String SYSTEM_NAMESPACE = System.getProperty("kubernetesNamespace");
protected static final String NAMESPACE = SYSTEM_NAMESPACE != null && !SYSTEM_NAMESPACE.trim().equals("") ? SYSTEM_NAMESPACE : "default";
private static final Logger LOGGER = Logger.getLogger(Service.class.getName());

protected final V1Service serviceToCreate;
protected final CoreV1Api coreV1Api;
protected WaitStrategy<V1Service> waitStrategy;

protected final ThreadLocal<V1Service> createdService = new ThreadLocal<>();

public Service(V1Service serviceToCreate) {
this.serviceToCreate = serviceToCreate;
this.coreV1Api = initiateCoreV1Api();
}

private static CoreV1Api initiateCoreV1Api() {
CoreV1Api coreV1Api;
try {
ApiClient client = Config.defaultClient();
if (DEBUG != null && DEBUG.equalsIgnoreCase("true")) {
client.setDebugging(true);
}
// infinite timeout
OkHttpClient.Builder builder = client.getHttpClient().newBuilder()
.readTimeout(0, TimeUnit.SECONDS);

if (DISABLE_HTTP2 != null && DISABLE_HTTP2.equalsIgnoreCase("true")) {
builder.protocols(Collections.singletonList(Protocol.HTTP_1_1));
}

client.setVerifyingSsl(false);

OkHttpClient httpClient = builder.build();
client.setHttpClient(httpClient);
Configuration.setDefaultApiClient(client);

coreV1Api = new CoreV1Api(client);

} catch (IOException e) {
throw new RuntimeException(e);
}
return coreV1Api;
}
Comment on lines +39 to +66

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to factorize this client initiation in kubernetesGenericObject to avoid duplicating it from Pod.java.


@Override
public String getObjectName() {
V1Service v1Service = this.createdService.get();
if (v1Service != null) {
return v1Service.getMetadata().getName();
} else {
throw new RuntimeException("Can't get name of a non running object.");
}
}

@Override
public String getObjectHostIp() {
V1Service v1Service = this.createdService.get();
if (v1Service != null) {
try {
V1Service retrievedService = coreV1Api.readNamespacedService(v1Service.getMetadata().getName(), NAMESPACE, null, null, null);
if (retrievedService.getStatus() == null) {
throw new RuntimeException("Can't get ip of a non running object.");
}
//Anhand der Umgebungsvariable entscheiden, ob der Test im Cluster oder lokal ausgeführt wird

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//Anhand der Umgebungsvariable entscheiden, ob der Test im Cluster oder lokal ausgeführt wird
//Use the environment variable to decide whether to run the test in the cluster or locally

if(System.getenv("KUBERNETES_PORT") != null) {
return retrievedService.getSpec().getClusterIP();
}else{
return retrievedService.getSpec().getExternalIPs().get(0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some clusters a service may not have any externalIps and under those circumstances I think we should raise an exception with an explicit message.

}
} catch (ApiException e) {
throw logAndThrowApiException(e);
}
} else {
throw new RuntimeException("Can't get ip of a non running object.");
}
}

@Override
public void create() {
try{
createdService.set(coreV1Api.createNamespacedService(NAMESPACE, serviceToCreate, null, null, null));
final String serviceName = createdService.get().getMetadata().getName();

if (this.waitStrategy != null) {
try (Watch<V1Service> watch = Watch.createWatch(
coreV1Api.getApiClient(),
coreV1Api.listNamespacedServiceCall(NAMESPACE, null, null, null,
null, null, null, null, null, true, null),
new TypeToken<Watch.Response<V1Service>>() {
}.getType())) {

this.waitStrategy.apply(watch, createdService.get());
} catch (IOException | ApiException e) {
if (e instanceof ApiException) {
throw logAndThrowApiException((ApiException) e);
}
throw new RuntimeException(e);
}
}

Runtime.getRuntime().addShutdownHook(new Thread(() -> removeService(serviceName, coreV1Api)));
onKubernetesObjectReady();
} catch (ApiException e) {
throw logAndThrowApiException(e);
}
}

@Override
public void remove() {
V1Service v1Service = this.createdService.get();
if (v1Service != null) {
String serviceName = v1Service.getMetadata().getName();
removeService(serviceName, coreV1Api);
}
}

private static void removeService(String serviceName, CoreV1Api coreV1Api) {
try {
coreV1Api.deleteNamespacedService(serviceName, NAMESPACE, null, null, null, null, null, null);
} catch (ApiException e) {
throw logAndThrowApiException(e);
} catch (JsonSyntaxException e) {
if (e.getCause() instanceof IllegalStateException) {
IllegalStateException ise = (IllegalStateException) e.getCause();
if (ise.getMessage() != null && ise.getMessage().contains("Expected a string but was BEGIN_OBJECT")) {
// Catching exception because of issue https://github.com/kubernetes-client/java/issues/86
} else throw e;
} else throw e;
}
}

@Override
public Service withWaitStrategy(WaitStrategy waitStrategy) {
this.waitStrategy = waitStrategy;
return this;
}

protected static RuntimeException logAndThrowApiException(ApiException e) {
LOGGER.severe("Kubernetes API replied with " + e.getCode() + " status code and body " + e.getResponseBody());
System.out.println("Kubernetes API replied with " + e.getCode() + " status code and body " + e.getResponseBody());
return new RuntimeException(e.getResponseBody(), e);
}
Comment on lines +161 to +165
Copy link
Owner

@JeanBaptisteWATENBERG JeanBaptisteWATENBERG Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should factorize this as well in the abstract class

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @JeanBaptisteWATENBERG,

thank you for your quick response! Hopefully this week I will find some time to look closer at your Points.

I needed to create a more specific service for the pods. Instead of extending the service config over a pod, I thougth making an explictit service would be clearer.

Best regards
Lars

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.github.jeanbaptistewatenberg.junit5kubernetes.core.impl;

import com.github.jeanbaptistewatenberg.junit5kubernetes.core.Service;
import com.github.jeanbaptistewatenberg.junit5kubernetes.core.wait.WaitStrategy;
import io.kubernetes.client.fluent.VisitableBuilder;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceBuilder;
import io.kubernetes.client.openapi.models.V1ServiceFluent;
import io.kubernetes.client.openapi.models.V1ServiceFluentImpl;

public class GenericServiceBuilder extends V1ServiceFluentImpl<GenericServiceBuilder> implements VisitableBuilder<Service, GenericServiceBuilder> {

private WaitStrategy waitStrategy;
V1ServiceFluent<?> fluent;

public GenericServiceBuilder() {
this(new V1Service());
}

public GenericServiceBuilder(V1ServiceFluent<?> fluent, V1Service instance) {

this.fluent = fluent;
fluent.withApiVersion(instance.getApiVersion());
fluent.withKind(instance.getKind());
fluent.withMetadata(instance.getMetadata());
fluent.withSpec(instance.getSpec());
fluent.withStatus(instance.getStatus());
}

public GenericServiceBuilder(V1Service instance) {

this.fluent = this;
this.withApiVersion(instance.getApiVersion());
this.withKind(instance.getKind());
this.withMetadata(instance.getMetadata());
this.withSpec(instance.getSpec());
this.withStatus(instance.getStatus());
}

@Override
public Service build() {

V1Service serviceToCreate = new V1ServiceBuilder()
.withApiVersion(fluent.getApiVersion())
.withKind(fluent.getKind())
.withStatus(fluent.buildStatus())
.withMetadata(fluent.buildMetadata())
.withSpec(fluent.buildSpec())
.build();

Service buildable = new Service(serviceToCreate);
return buildable.withWaitStrategy(waitStrategy);
}

public GenericServiceBuilder withWaitStrategy(WaitStrategy<V1Service> waitStrategy) {
this.waitStrategy = waitStrategy;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Scanner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public class PodWaitLogStrategy extends WaitStrategy<V1Pod> {
private String text;
Expand Down Expand Up @@ -68,27 +73,37 @@ public void apply(Watch<V1Pod> resourceWatch, V1Pod createdResource) throws ApiE
}
}

//Read pods logs
readPodsLogs(logs, createdResource);
}

private void readPodsLogs(PodLogs logs, V1Pod createdResource) {

try (InputStream is = logs.streamNamespacedPodLog(createdResource)) {
Scanner sc = new Scanner(is);
int conditionMetTimes = 0;
String textOrRegex = this.getText();
AtomicInteger conditionMetTimes = new AtomicInteger();
int howManyTimesShouldConditionMet = this.getTimes();
while (sc.hasNextLine() && LocalDateTime.now().isBefore(startTime.plus(this.getTimeout()))) {
String input = sc.nextLine();
//Check if log line matches the expected text or regex
if (input.matches(textOrRegex) || input.contains(textOrRegex)) {
conditionMetTimes++;
if (conditionMetTimes == howManyTimesShouldConditionMet) {
break;

CompletableFuture.runAsync(() -> {
Scanner sc = new Scanner(is);
while (sc.hasNextLine()) {
String input = sc.nextLine();
//Check if log line matches the expected text or regex
if (input.matches(textOrRegex) || input.contains(textOrRegex)) {
conditionMetTimes.getAndIncrement();
if (conditionMetTimes.get() == howManyTimesShouldConditionMet) {
break;
}
}
}
}
}).get(this.getTimeout().toMillis(), TimeUnit.MILLISECONDS);

if (conditionMetTimes != howManyTimesShouldConditionMet) {
throw new RuntimeException("Failed to find (x" + howManyTimesShouldConditionMet + ") " + textOrRegex + " in log of resource " + createdResource + " before timeout " + this.getTimeout());
if (conditionMetTimes.get() != howManyTimesShouldConditionMet) {
throw new RuntimeException(
"Failed to find (x" + howManyTimesShouldConditionMet + ") " + textOrRegex + " in log of resource " + createdResource + " before timeout "
+ this.getTimeout());
}
} catch (IOException e) {

} catch (InterruptedException | ExecutionException | TimeoutException | ApiException | IOException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.github.jeanbaptistewatenberg.junit5kubernetes.core.wait.impl.service;

import com.github.jeanbaptistewatenberg.junit5kubernetes.core.wait.WaitStrategy;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.util.Watch;

import java.time.LocalDateTime;

public class ServiceWaitReadyStrategy extends WaitStrategy<V1Service> {

private final CoreV1Api coreClient;

public ServiceWaitReadyStrategy() {
this.coreClient = new CoreV1Api(Configuration.getDefaultApiClient());
}

@Override
public void apply(Watch<V1Service> resourceWatch, V1Service createdResource) throws ApiException {

boolean serviceSuccessfullyStarted = false;
LocalDateTime startTime = LocalDateTime.now();
for (Watch.Response<V1Service> item : resourceWatch) {
String name = item.object.getMetadata().getName();
if (name.equals(createdResource.getMetadata().getName())) {

while (LocalDateTime.now().isBefore(startTime.plus(this.getTimeout()))) {

V1Service retrievedService = coreClient.readNamespacedService(createdResource.getMetadata().getName(),
createdResource.getMetadata().getNamespace(), null, null, null);
if (retrievedService.getSpec().getExternalIPs() != null && retrievedService.getSpec().getExternalIPs().size() > 0) {
serviceSuccessfullyStarted = true;
break;
}
}
break;
}
}

if (!serviceSuccessfullyStarted) {
throw new RuntimeException("Failed to run pod " + this + " before timeout " + this.getTimeout());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new RuntimeException("Failed to run pod " + this + " before timeout " + this.getTimeout());
throw new RuntimeException("Failed to run service " + this + " before timeout " + this.getTimeout());

}
}
}