Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conform to PooledConnection #43

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ let package = Package(
.package(url: "https://github.com/apple/swift-nio.git", from: "2.56.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.0.0"),
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.19.0"),
],
targets: [
.target(
Expand All @@ -43,6 +44,7 @@ let package = Package(
.product(name: "NIOEmbedded", package: "swift-nio"),
.product(name: "Logging", package: "swift-log"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
.product(name: "_ConnectionPoolModule", package: "postgres-nio"),
]
),
.testTarget(
Expand Down
137 changes: 95 additions & 42 deletions Sources/Memcache/MemcacheConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,27 @@
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
@_spi(AsyncChannel)

import _ConnectionPoolModule
import Atomics
import NIOCore
import NIOPosix
import ServiceLifecycle

/// An actor to create a connection to a Memcache server.
///
/// This actor can be used to send commands to the server.
public actor MemcacheConnection: Service {
public actor MemcacheConnection: Service, PooledConnection {
public typealias ID = Int
public let id: ID
private static var nextID: ManagedAtomic<Int> = ManagedAtomic(0)

private let closePromise: EventLoopPromise<Void>

public var closeFuture: EventLoopFuture<Void> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be marked as nonisolated.

return self.closePromise.futureResult
}

private typealias StreamElement = (MemcacheRequest, CheckedContinuation<MemcacheResponse, Error>)
private let host: String
private let port: Int
Expand Down Expand Up @@ -56,23 +67,63 @@ public actor MemcacheConnection: Service {

private var state: State

/// Initialize a new MemcacheConnection.
/// Initialize a new MemcacheConnection, with an option to specify an ID.
/// If no ID is provided, a default value is used.
///
/// - Parameters:
/// - host: The host address of the Memcache server.
/// - port: The port number of the Memcache server.
/// - eventLoopGroup: The event loop group to use for this connection.
public init(host: String, port: Int, eventLoopGroup: EventLoopGroup) {
/// - id: The unique identifier for the connection (optional).
public init(host: String, port: Int, id: ID? = nil, eventLoopGroup: EventLoopGroup) {
self.host = host
self.port = port
self.id = id ?? MemcacheConnection.nextID.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent)
let (stream, continuation) = AsyncStream<StreamElement>.makeStream()
let bufferAllocator = ByteBufferAllocator()
self.state = .initial(
eventLoopGroup: eventLoopGroup,
bufferAllocator: bufferAllocator,
requestStream: stream,
requestContinuation: continuation
)
self.closePromise = eventLoopGroup.next().makePromise(of: Void.self)
self.state = .initial(eventLoopGroup: eventLoopGroup, bufferAllocator: bufferAllocator, requestStream: stream, requestContinuation: continuation)
}

deinit {
// Fulfill the promise if it has not been fulfilled yet
closePromise.fail(MemcacheError(code: .connectionShutdown,
message: "MemcacheConnection deinitialized without closing",
cause: nil,
location: .here()))
}

/// Closes the connection. This method is responsible for properly shutting down
/// and cleaning up resources associated with the connection.
public nonisolated func close() {
Task {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fabianfett just wanted to ping you here to show you the problems that we hit when adopting PooledConnection on an actor. I am curious to hear what you think about this.

Copy link
Contributor Author

@dkz2 dkz2 Mar 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is also a few notes I had in context to this.

One reason I have a Task {} in both close and onClose was b/c of a error stating Actor-isolated instance method 'onClose' cannot be used to satisfy nonisolated protocol requirement.

Xcode by default will give you a fix prompting you to Add 'nonisolated' to 'onClose' to make this instance method not isolated to the actor well once you adopt that change you will get a Actor-isolated property 'some prop' can not be referenced from a non-isolated context following that error spiral lead us here.

from my understanding onClose since its part of the actor is isolated but PooledConnection expects that its methods to be nonisolated, thus we hit this conflict.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is important that close is not async, because the whole point of close is to trigger the close. We don't want to wait for the channel to be closed 100% here.

We can see this as an issue of the ConnectionPool design, or the dreaded we issue of we can't send messages to an actor, without waiting for the response. (which would totally solve the issue here)

Also I want to make explicit, that the run pattern here does not play nicely with the ConnectionPool, as the ConnectionFactory expects the Connection only to be returned, once the connection has been established. So you will need to add a onStarted callback by necessity.

I think I come back to my favorite point about structured concurrency:

  1. There are multiple sensible ways to structure your concurrency.
  2. Those different ways may not play nicely with each other.

Copy link
Member

@fabianfett fabianfett Mar 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FranzBusch I think the issue is not really the actor here, but the question which concurrency structure do you follow. All our approaches are structured. :)

await self.closeConnection()
}
}

private func closeConnection() async {
switch self.state {
case .running(_, let channel, _, _):
channel.channel.close().cascade(to: self.closePromise)
default:
self.closePromise.succeed(())
}
self.state = .finished
}

/// Registers a closure to be called when the connection is closed.
/// This is useful for performing cleanup or notification tasks.
public nonisolated func onClose(_ closure: @escaping ((any Error)?) -> Void) {
Task {
await self.closeFuture.whenComplete { result in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the closeFuture is marked as nonisolated we don't need a Task and an await here.

switch result {
case .success:
closure(nil)
case .failure(let error):
closure(error)
}
}
}
}

/// Runs the Memcache connection.
Expand All @@ -95,7 +146,7 @@ public actor MemcacheConnection: Service {
return channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(MemcacheRequestEncoder()))
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(MemcacheResponseDecoder()))
return try NIOAsyncChannel<MemcacheResponse, MemcacheRequest>(synchronouslyWrapping: channel)
return try NIOAsyncChannel<MemcacheResponse, MemcacheRequest>(wrappingChannelSynchronously: channel)
}
}.get()

Expand All @@ -106,39 +157,41 @@ public actor MemcacheConnection: Service {
requestContinuation: continuation
)

var iterator = channel.inboundStream.makeAsyncIterator()
switch self.state {
case .running(_, let channel, let requestStream, let requestContinuation):
for await (request, continuation) in requestStream {
do {
try await channel.outboundWriter.write(request)
let responseBuffer = try await iterator.next()

if let response = responseBuffer {
continuation.resume(returning: response)
} else {
self.state = .finished
requestContinuation.finish()
continuation.resume(throwing: MemcacheError(
code: .connectionShutdown,
message: "The connection to the Memcache server was unexpectedly closed.",
cause: nil,
location: .here()
))
}
} catch {
switch self.state {
case .running:
self.state = .finished
requestContinuation.finish()
continuation.resume(throwing: MemcacheError(
code: .connectionShutdown,
message: "The connection to the Memcache server has shut down while processing a request.",
cause: error,
location: .here()
))
case .initial, .finished:
break
try await channel.executeThenClose { inbound, outbound in
var inboundIterator = inbound.makeAsyncIterator()
for await (request, continuation) in requestStream {
do {
try await outbound.write(request)
let responseBuffer = try await inboundIterator.next()

if let response = responseBuffer {
continuation.resume(returning: response)
} else {
self.state = .finished
requestContinuation.finish()
continuation.resume(throwing: MemcacheError(
code: .connectionShutdown,
message: "The connection to the Memcache server was unexpectedly closed.",
cause: nil,
location: .here()
))
}
} catch {
switch self.state {
case .running:
self.state = .finished
requestContinuation.finish()
continuation.resume(throwing: MemcacheError(
code: .connectionShutdown,
message: "The connection to the Memcache server has shut down while processing a request.",
cause: error,
location: .here()
))
case .initial, .finished:
break
}
}
}
}
Expand Down