diff --git a/airframe-benchmark/src/main/scala/wvlet/airframe/benchmark/rpc_request/RPCRequestBenchmark.scala b/airframe-benchmark/src/main/scala/wvlet/airframe/benchmark/rpc_request/RPCRequestBenchmark.scala index 885efee725..b502df4f6d 100644 --- a/airframe-benchmark/src/main/scala/wvlet/airframe/benchmark/rpc_request/RPCRequestBenchmark.scala +++ b/airframe-benchmark/src/main/scala/wvlet/airframe/benchmark/rpc_request/RPCRequestBenchmark.scala @@ -37,6 +37,8 @@ class RPCRequestBenchmark extends LogSupport { new HttpChannel { private val responseCodec = MessageCodec.of[GreeterResponse] + override val destination: ServerAddress = ServerAddress("localhost:8080") + override def send(req: HttpMessage.Request, channelConfig: HttpChannelConfig): HttpMessage.Response = { val ret = emptyServer.hello(req.message.toContentString) Http.response(HttpStatus.Ok_200).withJson(responseCodec.toJson(ret)) diff --git a/airframe-http-okhttp/src/main/scala/wvlet/airframe/http/okhttp/OkHttpChannel.scala b/airframe-http-okhttp/src/main/scala/wvlet/airframe/http/okhttp/OkHttpChannel.scala index f700f11d75..7da4a1eca7 100644 --- a/airframe-http-okhttp/src/main/scala/wvlet/airframe/http/okhttp/OkHttpChannel.scala +++ b/airframe-http-okhttp/src/main/scala/wvlet/airframe/http/okhttp/OkHttpChannel.scala @@ -22,7 +22,7 @@ import wvlet.log.LogSupport import java.io.IOException import java.util.concurrent.TimeUnit -class OkHttpChannel(serverAddress: ServerAddress, config: HttpClientConfig) extends HttpChannel with LogSupport { +class OkHttpChannel(val destination: ServerAddress, config: HttpClientConfig) extends HttpChannel with LogSupport { private[this] val client = { var builder = new okhttp3.OkHttpClient.Builder() .readTimeout(config.readTimeout.toMillis, TimeUnit.MILLISECONDS) @@ -56,7 +56,10 @@ class OkHttpChannel(serverAddress: ServerAddress, config: HttpClientConfig) exte newClient } - override def send(req: HttpMessage.Request, channelConfig: HttpChannelConfig): HttpMessage.Response = { + override def send( + req: HttpMessage.Request, + channelConfig: HttpChannelConfig + ): HttpMessage.Response = { val request: okhttp3.Request = convertRequest(req) val newClient = prepareClient(channelConfig) @@ -64,7 +67,10 @@ class OkHttpChannel(serverAddress: ServerAddress, config: HttpClientConfig) exte response.toHttpResponse } - override def sendAsync(req: HttpMessage.Request, channelConfig: HttpChannelConfig): Rx[HttpMessage.Response] = { + override def sendAsync( + req: HttpMessage.Request, + channelConfig: HttpChannelConfig + ): Rx[HttpMessage.Response] = { val request: okhttp3.Request = convertRequest(req) val newClient = prepareClient(channelConfig) val v = Rx.variable[Option[HttpMessage.Response]](None) @@ -93,7 +99,8 @@ class OkHttpChannel(serverAddress: ServerAddress, config: HttpClientConfig) exte } val url = HttpUrl - .get(serverAddress.uri).newBuilder() + .get(request.dest.getOrElse(destination).uri) + .newBuilder() .encodedPath(request.path) .encodedQuery(queryParams) .build() diff --git a/airframe-http/.js/src/main/scala/wvlet/airframe/http/client/JSFetchChannel.scala b/airframe-http/.js/src/main/scala/wvlet/airframe/http/client/JSFetchChannel.scala index 8ee53bd617..db3035494b 100644 --- a/airframe-http/.js/src/main/scala/wvlet/airframe/http/client/JSFetchChannel.scala +++ b/airframe-http/.js/src/main/scala/wvlet/airframe/http/client/JSFetchChannel.scala @@ -30,21 +30,27 @@ import scala.util.{Failure, Success, Try} * @param serverAddress * @param config */ -class JSFetchChannel(serverAddress: ServerAddress, config: HttpClientConfig) extends HttpChannel with LogSupport { +class JSFetchChannel(val destination: ServerAddress, config: HttpClientConfig) extends HttpChannel with LogSupport { private[client] implicit val executionContext: ExecutionContext = Compat.defaultExecutionContext override def close(): Unit = { // nothing to do } - override def send(req: HttpMessage.Request, channelConfig: HttpChannelConfig): HttpMessage.Response = { + override def send( + req: HttpMessage.Request, + channelConfig: HttpChannelConfig + ): HttpMessage.Response = { // Blocking call cannot be supported in JS ??? } - override def sendAsync(request: HttpMessage.Request, channelConfig: HttpChannelConfig): Rx[HttpMessage.Response] = { + override def sendAsync( + request: HttpMessage.Request, + channelConfig: HttpChannelConfig + ): Rx[HttpMessage.Response] = { val path = if (request.uri.startsWith("/")) request.uri else s"/${request.uri}" - val uri = s"${serverAddress.uri}${path}" + val uri = s"${request.dest.getOrElse(destination).uri}${path}" val req = new org.scalajs.dom.RequestInit { method = request.method match { diff --git a/airframe-http/.js/src/main/scala/wvlet/airframe/http/client/JSHttpClientChannel.scala b/airframe-http/.js/src/main/scala/wvlet/airframe/http/client/JSHttpClientChannel.scala index 0bd86e2a83..51d7a61ff4 100644 --- a/airframe-http/.js/src/main/scala/wvlet/airframe/http/client/JSHttpClientChannel.scala +++ b/airframe-http/.js/src/main/scala/wvlet/airframe/http/client/JSHttpClientChannel.scala @@ -25,7 +25,7 @@ import scala.concurrent.{ExecutionContext, Promise, TimeoutException} import scala.scalajs.js.typedarray.{ArrayBuffer, TypedArrayBuffer} import scala.util.Try -class JSHttpClientChannel(serverAddress: ServerAddress, private[client] val config: HttpClientConfig) +class JSHttpClientChannel(val destination: ServerAddress, private[client] val config: HttpClientConfig) extends HttpChannel with LogSupport { @@ -35,7 +35,10 @@ class JSHttpClientChannel(serverAddress: ServerAddress, private[client] val conf // nothing to do } - override def send(request: HttpMessage.Request, channelConfig: HttpChannelConfig): HttpMessage.Response = ??? + override def send( + request: HttpMessage.Request, + channelConfig: HttpChannelConfig + ): HttpMessage.Response = ??? override def sendAsync( request: HttpMessage.Request, @@ -45,7 +48,7 @@ class JSHttpClientChannel(serverAddress: ServerAddress, private[client] val conf val xhr = new dom.XMLHttpRequest() val path = if (request.uri.startsWith("/")) request.uri else s"/${request.uri}" - val uri = s"${serverAddress.uri}${path}" + val uri = s"${request.dest.getOrElse(destination).uri}${path}" trace(s"Sending request: ${request}: ${uri}") xhr.open(request.method, uri) diff --git a/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/client/JavaHttpClientChannel.scala b/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/client/JavaHttpClientChannel.scala index 42344ac7b4..743a7f73ed 100644 --- a/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/client/JavaHttpClientChannel.scala +++ b/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/client/JavaHttpClientChannel.scala @@ -32,10 +32,10 @@ import scala.jdk.CollectionConverters.* /** * Http connection implementation using Http Client of Java 11 - * @param serverAddress + * @param destination * @param config */ -class JavaHttpClientChannel(serverAddress: ServerAddress, private[http] val config: HttpClientConfig) +class JavaHttpClientChannel(val destination: ServerAddress, private[http] val config: HttpClientConfig) extends HttpChannel with LogSupport { private val javaHttpClient: HttpClient = initClient(config) @@ -63,7 +63,7 @@ class JavaHttpClientChannel(serverAddress: ServerAddress, private[http] val conf override def send(req: Request, channelConfig: HttpChannelConfig): Response = { // New Java's HttpRequest is immutable, so we can reuse the same request instance - val httpRequest = buildRequest(serverAddress, req, channelConfig) + val httpRequest = buildRequest(req, channelConfig) val httpResponse: HttpResponse[InputStream] = javaHttpClient.send(httpRequest, BodyHandlers.ofInputStream()) @@ -73,7 +73,7 @@ class JavaHttpClientChannel(serverAddress: ServerAddress, private[http] val conf override def sendAsync(req: Request, channelConfig: HttpChannelConfig): Rx[Response] = { val v = Rx.variable[Option[Response]](None) try { - val httpRequest = buildRequest(serverAddress, req, channelConfig) + val httpRequest = buildRequest(req, channelConfig) javaHttpClient .sendAsync(httpRequest, BodyHandlers.ofInputStream()) .thenAccept(new Consumer[HttpResponse[InputStream]] { @@ -96,11 +96,10 @@ class JavaHttpClientChannel(serverAddress: ServerAddress, private[http] val conf } private def buildRequest( - serverAddress: ServerAddress, request: Request, channelConfig: HttpChannelConfig ): HttpRequest = { - val uri = s"${serverAddress.uri}${if (request.uri.startsWith("/")) request.uri + val uri = s"${request.dest.getOrElse(destination).uri}${if (request.uri.startsWith("/")) request.uri else s"/${request.uri}"}" val requestBuilder = HttpRequest diff --git a/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/client/URLConnectionChannel.scala b/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/client/URLConnectionChannel.scala index c36aeec7af..8196bb4b99 100644 --- a/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/client/URLConnectionChannel.scala +++ b/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/client/URLConnectionChannel.scala @@ -29,9 +29,9 @@ import scala.jdk.CollectionConverters.* * @param serverAddress * @param config */ -class URLConnectionChannel(serverAddress: ServerAddress, config: HttpClientConfig) extends HttpChannel { +class URLConnectionChannel(val destination: ServerAddress, config: HttpClientConfig) extends HttpChannel { override def send(request: Request, channelConfig: HttpChannelConfig): Response = { - val url = s"${serverAddress.uri}${if (request.uri.startsWith("/")) request.uri + val url = s"${request.dest.getOrElse(destination).uri}${if (request.uri.startsWith("/")) request.uri else s"/${request.uri}"}" val conn0: HttpURLConnection = diff --git a/airframe-http/.jvm/src/test/scala/wvlet/airframe/http/client/HttpClientLoggingFilterTest.scala b/airframe-http/.jvm/src/test/scala/wvlet/airframe/http/client/HttpClientLoggingFilterTest.scala index e58fd56733..69d61b71aa 100644 --- a/airframe-http/.jvm/src/test/scala/wvlet/airframe/http/client/HttpClientLoggingFilterTest.scala +++ b/airframe-http/.jvm/src/test/scala/wvlet/airframe/http/client/HttpClientLoggingFilterTest.scala @@ -23,14 +23,21 @@ import wvlet.log.LogSupport class HttpClientLoggingFilterTest extends AirSpec { class DummyHttpChannel extends HttpChannel with LogSupport { - override def send(req: HttpMessage.Request, channelConfig: HttpChannelConfig): HttpMessage.Response = { + override def destination: ServerAddress = ServerAddress("localhost:8080") + override def send( + req: HttpMessage.Request, + channelConfig: HttpChannelConfig + ): HttpMessage.Response = { Http .response(HttpStatus.Ok_200).withJson("""{"message":"hello"}""") .withHeader(HttpHeader.xAirframeRPCStatus, RPCStatus.SUCCESS_S0.code.toString) } - override def sendAsync(req: HttpMessage.Request, channelConfig: HttpChannelConfig): Rx[HttpMessage.Response] = ??? - override def close(): Unit = {} + override def sendAsync( + req: HttpMessage.Request, + channelConfig: HttpChannelConfig + ): Rx[HttpMessage.Response] = ??? + override def close(): Unit = {} } protected override def design: Design = { @@ -55,6 +62,10 @@ class HttpClientLoggingFilterTest extends AirSpec { val m = RPCMethod("/rpc_method", "demo.RPCClass", "hello", Surface.of[Map[String, Any]], Surface.of[String]) client.rpc[Map[String, Any], String](m, Map("message" -> "world")) } + + test("switch dest") { + client.send(Http.GET("/").withDest(ServerAddress("localhost:8081"))) + } } } diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/HttpMessage.scala b/airframe-http/src/main/scala/wvlet/airframe/http/HttpMessage.scala index 0f0ceada80..d0b1551f8c 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/HttpMessage.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/HttpMessage.scala @@ -210,6 +210,9 @@ object HttpMessage { uri: String = "/", header: HttpMultiMap = HttpMultiMap.empty, message: Message = EmptyMessage, + // [optional] Destination address for sending the request. HttpChannel implementation should use this address + dest: Option[ServerAddress] = None, + // Remote address of the HTTP server, which is used for server-side logging purpose remoteAddress: Option[ServerAddress] = None ) extends HttpMessage[Request] { override def toString: String = s"Request(${method},${uri},${header})" @@ -230,9 +233,16 @@ object HttpMessage { */ def query: HttpMultiMap = extractQueryFromUri(uri) - def withFilter(f: Request => Request): Request = f(this) - def withMethod(method: String): Request = this.copy(method = method) - def withUri(uri: String): Request = this.copy(uri = uri) + def withFilter(f: Request => Request): Request = f(this) + def withMethod(method: String): Request = this.copy(method = method) + def withUri(uri: String): Request = this.copy(uri = uri) + + /** + * Overwrite the default destination address of the request + * @param dest + * @return + */ + def withDest(dest: ServerAddress): Request = this.copy(dest = Some(dest)) def withRemoteAddress(remoteAddress: ServerAddress): Request = this.copy(remoteAddress = Some(remoteAddress)) override protected def copyWith(newHeader: HttpMultiMap): Request = this.copy(header = newHeader) diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/AsyncClient.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/AsyncClient.scala index 86aed54e3d..6a26f3d794 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/AsyncClient.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/AsyncClient.scala @@ -29,7 +29,7 @@ trait AsyncClient extends AsyncClientCompat with HttpClientFactory[AsyncClient] protected def channel: HttpChannel def config: HttpClientConfig - private val httpLogger: HttpLogger = config.newHttpLogger + private val httpLogger: HttpLogger = config.newHttpLogger(channel.destination) private val loggingFilter: HttpClientFilter = config.newLoggingFilter(httpLogger) private val circuitBreaker: CircuitBreaker = config.circuitBreaker diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpChannel.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpChannel.scala index 95c5569ac0..5237672683 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpChannel.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpChannel.scala @@ -14,6 +14,7 @@ package wvlet.airframe.http.client import wvlet.airframe.http.HttpMessage.{Request, Response} +import wvlet.airframe.http.ServerAddress import wvlet.airframe.rx.Rx import scala.concurrent.duration.Duration @@ -33,11 +34,25 @@ trait HttpChannelConfig { trait HttpChannel extends AutoCloseable { /** - * Send the request without modification. + * The default destination address to send requests + * @return + */ + def destination: ServerAddress + + /** + * Send the request as is to the destination * @param req * @param channelConfig * @return */ def send(req: Request, channelConfig: HttpChannelConfig): Response + + /** + * Send an async request as is to the destination. Until the returned Rx is evaluated (e.g., by calling Rx.run), the + * request is not sent. + * @param req + * @param channelConfig + * @return + */ def sendAsync(req: Request, channelConfig: HttpChannelConfig): Rx[Response] } diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientConfig.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientConfig.scala index 20ec91d46b..3b058dc5d1 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientConfig.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientConfig.scala @@ -169,8 +169,15 @@ case class HttpClientConfig( this.copy(httpLoggerProvider = loggerProvider) } - def newHttpLogger: HttpLogger = { - httpLoggerProvider(httpLoggerConfig.addExtraTags(ListMap("client_name" -> name))) + def newHttpLogger(dest: ServerAddress): HttpLogger = { + httpLoggerProvider( + httpLoggerConfig.addExtraTags( + ListMap( + "client_name" -> name, + "dest" -> dest.hostAndPort + ) + ) + ) } def newLoggingFilter(logger: HttpLogger): HttpClientFilter = { diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/SyncClient.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/SyncClient.scala index c40f2fd4af..8ff59f5969 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/SyncClient.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/SyncClient.scala @@ -27,7 +27,7 @@ trait SyncClient extends SyncClientCompat with HttpClientFactory[SyncClient] wit protected def channel: HttpChannel def config: HttpClientConfig - private val clientLogger: HttpLogger = config.newHttpLogger + private val clientLogger: HttpLogger = config.newHttpLogger(channel.destination) private val loggingFilter: HttpClientFilter = config.newLoggingFilter(clientLogger) private val circuitBreaker: CircuitBreaker = config.circuitBreaker diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/internal/HttpLogs.scala b/airframe-http/src/main/scala/wvlet/airframe/http/internal/HttpLogs.scala index 2310b74594..b6a08ba6ce 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/internal/HttpLogs.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/internal/HttpLogs.scala @@ -110,6 +110,9 @@ object HttpLogs extends LogSupport { if (queryString.nonEmpty) { m += "query_string" -> queryString } + request.dest.foreach { d => + m += "dest" -> d.hostAndPort + } request.remoteAddress.foreach { remoteAddr => m += "remote_address" -> remoteAddr.hostAndPort } diff --git a/airframe-http/src/test/scala/wvlet/airframe/http/client/HttpClientFilterTest.scala b/airframe-http/src/test/scala/wvlet/airframe/http/client/HttpClientFilterTest.scala index 8093d78627..b688c78fdb 100644 --- a/airframe-http/src/test/scala/wvlet/airframe/http/client/HttpClientFilterTest.scala +++ b/airframe-http/src/test/scala/wvlet/airframe/http/client/HttpClientFilterTest.scala @@ -30,6 +30,7 @@ object HttpClientFilterTest extends AirSpec { private class DummyHttpChannel(reply: PartialFunction[Request, Response]) extends HttpChannel { private val requestCount = new AtomicInteger(0) + override def destination: ServerAddress = ServerAddress("localhost:8080") override def send(req: HttpMessage.Request, channelConfig: HttpChannelConfig): Response = { if (reply.isDefinedAt(req)) { reply(req) @@ -37,7 +38,10 @@ object HttpClientFilterTest extends AirSpec { throw RPCStatus.NOT_FOUND_U5.newException(s"RPC method not found: ${req.path}") } } - override def sendAsync(req: HttpMessage.Request, channelConfig: HttpChannelConfig): Rx[Response] = { + override def sendAsync( + req: HttpMessage.Request, + channelConfig: HttpChannelConfig + ): Rx[Response] = { Rx.single(send(req, channelConfig)) } override def close(): Unit = {}