Skip to content

Commit

Permalink
Migrates from WebClient.exchange() to retrieve()
Browse files Browse the repository at this point in the history
retrieve() should clean up resources automatically while exchange() requires explicit handling that wasn't being done.

Fixes spring-cloudgh-3862
  • Loading branch information
spencergibb committed Mar 3, 2023
1 parent ded713e commit 147d50f
Showing 1 changed file with 40 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.springframework.cloud.netflix.eureka.http;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import com.netflix.appinfo.InstanceInfo;
Expand All @@ -28,12 +26,12 @@
import com.netflix.discovery.shared.transport.EurekaHttpResponse;
import com.netflix.discovery.shared.transport.EurekaHttpResponse.EurekaHttpResponseBuilder;
import com.netflix.discovery.util.StringUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
Expand All @@ -46,8 +44,6 @@
*/
public class WebClientEurekaHttpClient implements EurekaHttpClient {

protected final Log logger = LogFactory.getLog(getClass());

private WebClient webClient;

public WebClientEurekaHttpClient(WebClient webClient) {
Expand All @@ -56,16 +52,18 @@ public WebClientEurekaHttpClient(WebClient webClient) {

@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
return webClient.post().uri("apps/" + info.getAppName(), Void.class).body(BodyInserters.fromValue(info))
return webClient.post().uri("apps/" + info.getAppName()).body(BodyInserters.fromValue(info))
.header(HttpHeaders.ACCEPT_ENCODING, "gzip")
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).exchange()
.map(response -> eurekaHttpResponse(response)).block();
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).retrieve()
.onStatus(HttpStatus::isError, this::ignoreError).toBodilessEntity().map(this::eurekaHttpResponse)
.block();
}

@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
return webClient.delete().uri("apps/" + appName + '/' + id, Void.class).exchange()
.map(response -> eurekaHttpResponse(response)).block();
return webClient.delete().uri("apps/" + appName + '/' + id).retrieve()
.onStatus(HttpStatus::isError, this::ignoreError).toBodilessEntity().map(this::eurekaHttpResponse)
.block();
}

@Override
Expand All @@ -75,14 +73,15 @@ public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id,
+ "&lastDirtyTimestamp=" + info.getLastDirtyTimestamp().toString()
+ (overriddenStatus != null ? "&overriddenstatus=" + overriddenStatus.name() : "");

ClientResponse response = webClient.put().uri(urlPath, InstanceInfo.class)
ResponseEntity<InstanceInfo> response = webClient.put().uri(urlPath)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE).exchange().block();
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE).retrieve()
.onStatus(HttpStatus::isError, this::ignoreError).toEntity(InstanceInfo.class).block();

EurekaHttpResponseBuilder<InstanceInfo> builder = anEurekaHttpResponse(statusCodeValueOf(response),
InstanceInfo.class).headers(headersOf(response));

InstanceInfo entity = response.toEntity(InstanceInfo.class).block().getBody();
InstanceInfo entity = response.getBody();

if (entity != null) {
builder.entity(entity);
Expand All @@ -98,19 +97,19 @@ public EurekaHttpResponse<Void> statusUpdate(String appName, String id, Instance
String urlPath = "apps/" + appName + '/' + id + "/status?value=" + newStatus.name() + "&lastDirtyTimestamp="
+ info.getLastDirtyTimestamp().toString();

return webClient.put().uri(urlPath, Void.class)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).exchange()
.map(response -> eurekaHttpResponse(response)).block();
return webClient.put().uri(urlPath).header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.retrieve().onStatus(HttpStatus::isError, this::ignoreError).toBodilessEntity()
.map(this::eurekaHttpResponse).block();
}

@Override
public EurekaHttpResponse<Void> deleteStatusOverride(String appName, String id, InstanceInfo info) {
String urlPath = "apps/" + appName + '/' + id + "/status?lastDirtyTimestamp="
+ info.getLastDirtyTimestamp().toString();

return webClient.delete().uri(urlPath, Void.class)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).exchange()
.map(response -> eurekaHttpResponse(response)).block();
return webClient.delete().uri(urlPath).header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.retrieve().onStatus(HttpStatus::isError, this::ignoreError).toBodilessEntity()
.map(this::eurekaHttpResponse).block();
}

@Override
Expand All @@ -125,13 +124,14 @@ private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath,
url = url + (urlPath.contains("?") ? "&" : "?") + "regions=" + StringUtil.join(regions);
}

ClientResponse response = webClient.get().uri(url, Applications.class)
ResponseEntity<Applications> response = webClient.get().uri(url)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE).exchange().block();
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE).retrieve()
.onStatus(HttpStatus::isError, this::ignoreError).toEntity(Applications.class).block();

int statusCode = statusCodeValueOf(response);

Applications body = response.toEntity(Applications.class).block().getBody();
Applications body = response.getBody();

return anEurekaHttpResponse(statusCode, statusCode == HttpStatus.OK.value() && body != null ? body : null)
.headers(headersOf(response)).build();
Expand All @@ -155,11 +155,12 @@ public EurekaHttpResponse<Applications> getSecureVip(String secureVipAddress, St
@Override
public EurekaHttpResponse<Application> getApplication(String appName) {

ClientResponse response = webClient.get().uri("apps/" + appName, Application.class)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE).exchange().block();
ResponseEntity<Application> response = webClient.get().uri("apps/" + appName)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE).retrieve()
.onStatus(HttpStatus::isError, this::ignoreError).toEntity(Application.class).block();

int statusCode = statusCodeValueOf(response);
Application body = response.toEntity(Application.class).block().getBody();
Application body = response.getBody();

Application application = statusCode == HttpStatus.OK.value() && body != null ? body : null;

Expand All @@ -177,11 +178,12 @@ public EurekaHttpResponse<InstanceInfo> getInstance(String id) {
}

private EurekaHttpResponse<InstanceInfo> getInstanceInternal(String urlPath) {
ClientResponse response = webClient.get().uri(urlPath, InstanceInfo.class)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE).exchange().block();
ResponseEntity<InstanceInfo> response = webClient.get().uri(urlPath)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE).retrieve()
.onStatus(HttpStatus::isError, this::ignoreError).toEntity(InstanceInfo.class).block();

int statusCode = statusCodeValueOf(response);
InstanceInfo body = response.toEntity(InstanceInfo.class).block().getBody();
InstanceInfo body = response.getBody();

return anEurekaHttpResponse(statusCode, statusCode == HttpStatus.OK.value() && body != null ? body : null)
.headers(headersOf(response)).build();
Expand All @@ -196,26 +198,19 @@ public WebClient getWebClient() {
return this.webClient;
}

private static Map<String, String> headersOf(ClientResponse response) {
ClientResponse.Headers httpHeaders = response.headers();
if (httpHeaders == null) {
return Collections.emptyMap();
}
HttpHeaders asHeaders = httpHeaders.asHttpHeaders();
if (asHeaders == null) {
return Collections.emptyMap();
}
Map<String, String> headers = new HashMap<>();
asHeaders.entrySet().stream()
.forEach(entry -> entry.getValue().stream().forEach(v -> headers.put(entry.getKey(), v)));
return headers;
private Mono<? extends Throwable> ignoreError(ClientResponse response) {
return Mono.empty();
}

private static Map<String, String> headersOf(ResponseEntity<?> response) {
return response.getHeaders().toSingleValueMap();
}

private int statusCodeValueOf(ClientResponse response) {
return response.statusCode().value();
private int statusCodeValueOf(ResponseEntity<?> response) {
return response.getStatusCode().value();
}

private EurekaHttpResponse<Void> eurekaHttpResponse(ClientResponse response) {
private EurekaHttpResponse<Void> eurekaHttpResponse(ResponseEntity<?> response) {
return anEurekaHttpResponse(statusCodeValueOf(response)).headers(headersOf(response)).build();
}

Expand Down

0 comments on commit 147d50f

Please sign in to comment.