Skip to content

Commit

Permalink
Minor refactoring in JettyClientHttpConnector
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Oct 2, 2020
1 parent 86f2ebe commit e44b08f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,9 @@
import java.util.function.Function;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.reactive.client.ContentChunk;
import org.eclipse.jetty.reactive.client.ReactiveRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -81,7 +83,8 @@ public JettyClientHttpConnector(HttpClient httpClient, @Nullable JettyResourceFa
* Constructor with an {@link JettyResourceFactory} that will manage shared resources.
* @param resourceFactory the {@link JettyResourceFactory} to use
* @param customizer the lambda used to customize the {@link HttpClient}
* @deprecated as of 5.2, in favor of {@link JettyClientHttpConnector#JettyClientHttpConnector(HttpClient, JettyResourceFactory)}
* @deprecated as of 5.2, in favor of
* {@link JettyClientHttpConnector#JettyClientHttpConnector(HttpClient, JettyResourceFactory)}
*/
@Deprecated
public JettyClientHttpConnector(JettyResourceFactory resourceFactory, @Nullable Consumer<HttpClient> customizer) {
Expand Down Expand Up @@ -114,14 +117,14 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
}
}

JettyClientHttpRequest clientHttpRequest = new JettyClientHttpRequest(
this.httpClient.newRequest(uri).method(method.toString()), this.bufferFactory);
Request request = this.httpClient.newRequest(uri).method(method.toString());

return requestCallback.apply(clientHttpRequest).then(Mono.from(
clientHttpRequest.getReactiveRequest().response((response, chunks) -> {
Flux<DataBuffer> content = Flux.from(chunks).map(this::toDataBuffer);
return Mono.just(new JettyClientHttpResponse(response, content));
})));
return requestCallback.apply(new JettyClientHttpRequest(request, this.bufferFactory))
.then(Mono.from(ReactiveRequest.newBuilder(request).build()
.response((reactiveResponse, chunkPublisher) -> {
Flux<DataBuffer> content = Flux.from(chunkPublisher).map(this::toDataBuffer);
return Mono.just(new JettyClientHttpResponse(reactiveResponse, content));
})));
}

private DataBuffer toDataBuffer(ContentChunk chunk) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.reactive.client.ContentChunk;
import org.eclipse.jetty.reactive.client.ReactiveRequest;
import org.eclipse.jetty.reactive.client.internal.PublisherContentProvider;
import org.eclipse.jetty.util.Callback;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
Expand All @@ -37,7 +38,6 @@
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
Expand All @@ -53,9 +53,6 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {

private final DataBufferFactory bufferFactory;

@Nullable
private ReactiveRequest reactiveRequest;


public JettyClientHttpRequest(Request jettyRequest, DataBufferFactory bufferFactory) {
this.jettyRequest = jettyRequest;
Expand Down Expand Up @@ -87,20 +84,21 @@ public DataBufferFactory bufferFactory() {

@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Flux<ContentChunk> chunks = Flux.from(body).map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
ReactiveRequest.Content content = Flux.from(body)
.map(this::toContentChunk)
.as(chunks -> ReactiveRequest.Content.fromPublisher(chunks, getContentType()));
this.jettyRequest.content(new PublisherContentProvider(content));
return doCommit(this::completes);
}

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Flux<ContentChunk> chunks = Flux.from(body)
ReactiveRequest.Content content = Flux.from(body)
.flatMap(Function.identity())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
.map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
.map(this::toContentChunk)
.as(chunks -> ReactiveRequest.Content.fromPublisher(chunks, getContentType()));
this.jettyRequest.content(new PublisherContentProvider(content));
return doCommit(this::completes);
}

Expand Down Expand Up @@ -145,11 +143,4 @@ protected void applyHeaders() {
}
}

ReactiveRequest getReactiveRequest() {
if (this.reactiveRequest == null) {
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).build();
}
return this.reactiveRequest;
}

}

0 comments on commit e44b08f

Please sign in to comment.