Skip to content

Commit

Permalink
fix: incorrect implementation of request and response timing
Browse files Browse the repository at this point in the history
  • Loading branch information
kenluluuuluuuuu authored and JulioDeLeon committed May 20, 2024
1 parent 81c406b commit 31f8963
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2023 Expedia Inc.
Copyright (C) 2013-2024 Expedia Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,6 @@

import static com.hotels.styx.api.HttpHeaderNames.HOST;
import static com.hotels.styx.client.connectionpool.LatencyTiming.finishRequestTiming;
import static com.hotels.styx.client.connectionpool.LatencyTiming.startResponseTiming;
import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -130,11 +129,11 @@ public Flux<LiveHttpResponse> execute(NettyConnection nettyConnection, HttpInter
executeCount.incrementAndGet();

Flux<LiveHttpResponse> responseFlux = Flux.create(sink -> {
finishRequestTiming(context);
if (nettyConnection.isConnected()) {
RequestBodyChunkSubscriber bodyChunkSubscriber = new RequestBodyChunkSubscriber(request, nettyConnection);
requestRequestBodyChunkSubscriber.set(bodyChunkSubscriber);
addProxyBridgeHandlers(nettyConnection, sink);
finishRequestTiming(context);
new WriteRequestToOrigin(sink, nettyConnection, request, bodyChunkSubscriber)
.write();
if (requestLoggingEnabled) {
Expand All @@ -145,11 +144,6 @@ public Flux<LiveHttpResponse> execute(NettyConnection nettyConnection, HttpInter
}
});

responseFlux = responseFlux.map(response -> {
startResponseTiming(metrics, context);
return response;
});

if (requestLoggingEnabled) {
responseFlux = responseFlux
.doOnNext(response -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2023 Expedia Inc.
Copyright (C) 2013-2024 Expedia Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,7 @@ import com.hotels.styx.metrics.TimerPurpose.RESPONSE_PROCESSING
/**
* Convenience methods for request and response processing latency measurement.
*/
object LatencyTiming {
object LatencyTiming {
/**
* Stops the request processing timer, if one is present in the context.
* Note that this only has an effect if the context is [TimeMeasurable].
Expand All @@ -46,9 +46,25 @@ object LatencyTiming {
* @param context context that can have timers set in it
*/
@JvmStatic
fun startResponseTiming(metrics: CentralisedMetrics?, context: HttpInterceptor.Context?) {
fun startResponseTiming(
metrics: CentralisedMetrics?,
context: HttpInterceptor.Context?,
) {
if (metrics != null && context is TimeMeasurable) {
context.timers?.startTiming(metrics, RESPONSE_PROCESSING)
}
}

/**
* Stops the response processing timer, if one is present in the context.
* Note that this only has an effect if the context is [TimeMeasurable].
*
* @param context context that may contain timers
*/
@JvmStatic
fun finishResponseTiming(context: HttpInterceptor.Context?) {
if (context is TimeMeasurable) {
context.timers?.stopTiming(RESPONSE_PROCESSING)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.slf4j.Logger;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;

import javax.net.ssl.SSLHandshakeException;
import java.io.IOException;
Expand All @@ -60,8 +59,9 @@
import static com.hotels.styx.api.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static com.hotels.styx.api.HttpVersion.HTTP_1_1;
import static com.hotels.styx.api.LiveHttpResponse.response;
import static com.hotels.styx.client.connectionpool.LatencyTiming.startResponseTiming;
import static com.hotels.styx.client.connectionpool.LatencyTiming.finishResponseTiming;
import static com.hotels.styx.metrics.TimerPurpose.REQUEST_PROCESSING;
import static com.hotels.styx.metrics.TimerPurpose.RESPONSE_PROCESSING;
import static com.hotels.styx.server.HttpErrorStatusListener.IGNORE_ERROR_STATUS;
import static com.hotels.styx.server.RequestProgressListener.IGNORE_REQUEST_PROGRESS;
import static com.hotels.styx.server.netty.connectors.HttpPipelineHandler.State.ACCEPTING_REQUESTS;
Expand Down Expand Up @@ -129,7 +129,7 @@ private StateMachine<State> createStateMachine() {
.transition(ACCEPTING_REQUESTS, ChannelExceptionEvent.class, event -> onChannelExceptionWhenAcceptingRequests(event.ctx, event.cause))
.transition(ACCEPTING_REQUESTS, ResponseObservableCompletedEvent.class, event -> ACCEPTING_REQUESTS)

.transition(WAITING_FOR_RESPONSE, ResponseReceivedEvent.class, event -> onResponseReceived(event.response, event.ctx))
.transition(WAITING_FOR_RESPONSE, ResponseReceivedEvent.class, event -> onResponseReceived(event.response, event.ctx, event.interceptorContext))
.transition(WAITING_FOR_RESPONSE, RequestReceivedEvent.class, event -> onSpuriousRequest(event.request))
.transition(WAITING_FOR_RESPONSE, ChannelInactiveEvent.class, event -> onChannelInactive())
.transition(WAITING_FOR_RESPONSE, ChannelExceptionEvent.class, event -> onChannelExceptionWhenWaitingForResponse(event.ctx, event.cause))
Expand Down Expand Up @@ -243,13 +243,14 @@ private State onLegitimateRequest(LiveHttpRequest request, ChannelHandlerContext
// generates a response.
try {
ContextualTimers timers = new ContextualTimers();
timers.startTiming(metrics, REQUEST_PROCESSING);

HttpInterceptorContext context = new HttpInterceptorContext(secure, remoteAddress(ctx), ctx.executor(), timers);
Eventual<LiveHttpResponse> responseEventual = httpPipeline.handle(v11Request, context);
responseEventual.subscribe(new BaseSubscriber<>() {
@Override
public void hookOnSubscribe(Subscription s) {
timers.startTiming(metrics, REQUEST_PROCESSING);

subscription = s;
s.request(1);
}
Expand All @@ -266,21 +267,13 @@ public void hookOnError(Throwable cause) {

@Override
public void hookOnNext(LiveHttpResponse response) {
eventProcessor.submit(new ResponseReceivedEvent(response, ctx));
eventProcessor.submit(new ResponseReceivedEvent(response, ctx, context));
}

@Override
protected void hookOnCancel() {
eventProcessor.submit(new ResponseObservableCancelledEvent(ctx, new ResponseCancelledException(), request.id()));
}

@Override
protected void hookFinally(SignalType type) {
timers.stopTiming(RESPONSE_PROCESSING);
timers.stopTiming(REQUEST_PROCESSING);

context.clear();
}
});

return WAITING_FOR_RESPONSE;
Expand All @@ -296,7 +289,9 @@ protected void hookFinally(SignalType type) {
}
}

private State onResponseReceived(LiveHttpResponse response, ChannelHandlerContext ctx) {
private State onResponseReceived(LiveHttpResponse response, ChannelHandlerContext ctx, HttpInterceptorContext interceptorContext) {
startResponseTiming(metrics, interceptorContext);

ongoingResponse = response;
HttpResponseWriter httpResponseWriter = responseWriterFactory.create(ctx);

Expand All @@ -307,6 +302,7 @@ private State onResponseReceived(LiveHttpResponse response, ChannelHandlerContex
} else {
eventProcessor.submit(new ResponseSentEvent(ctx));
}
finishResponseTiming(interceptorContext);
return null;
});

Expand Down Expand Up @@ -558,10 +554,12 @@ private static class RequestReceivedEvent {
private static class ResponseReceivedEvent {
private final LiveHttpResponse response;
private final ChannelHandlerContext ctx;
private final HttpInterceptorContext interceptorContext;

ResponseReceivedEvent(LiveHttpResponse response, ChannelHandlerContext ctx) {
ResponseReceivedEvent(LiveHttpResponse response, ChannelHandlerContext ctx, HttpInterceptorContext interceptorContext) {
this.response = response;
this.ctx = ctx;
this.interceptorContext = interceptorContext;
}
}

Expand Down

0 comments on commit 31f8963

Please sign in to comment.