From a7490df138621ce7b6c7b886da9a39fb75af7416 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi <106849+romac@users.noreply.github.com> Date: Tue, 30 Jan 2024 21:55:53 +0100 Subject: [PATCH] Move backends into their own package --- core/src/main/scala/Backend.scala | 108 ---------------------- core/src/main/scala/Choreo.scala | 1 + core/src/main/scala/backend/Backend.scala | 18 ++++ core/src/main/scala/backend/HTTP.scala | 46 +++++++++ core/src/main/scala/backend/Local.scala | 57 ++++++++++++ examples/src/main/scala/Bookseller.scala | 2 + examples/src/main/scala/KV.scala | 2 + 7 files changed, 126 insertions(+), 108 deletions(-) delete mode 100644 core/src/main/scala/Backend.scala create mode 100644 core/src/main/scala/backend/Backend.scala create mode 100644 core/src/main/scala/backend/HTTP.scala create mode 100644 core/src/main/scala/backend/Local.scala diff --git a/core/src/main/scala/Backend.scala b/core/src/main/scala/Backend.scala deleted file mode 100644 index 2fb03b5..0000000 --- a/core/src/main/scala/Backend.scala +++ /dev/null @@ -1,108 +0,0 @@ -package choreo - -import cats.Monad -import cats.free.Free -import cats.arrow.FunctionK -import cats.effect.std.Queue -import cats.effect.kernel.Concurrent -import cats.syntax.all.* - -import choreo.utils.toFunctionK - -trait Backend[B, M[_]]: - extension (backend: B) - def runNetwork[A](at: Loc)(network: Network[M, A]): M[A] - -object Backend: - def local[M[_]: Concurrent]( - locs: List[Loc] - ): M[LocalBackend[M]] = - for inboxes <- LocalBackend.makeInboxes(locs) - yield LocalBackend(inboxes) - - def http[M[_]: Concurrent]( - locs: List[Loc] - ): HTTPBackend[M] = - HTTPBackend(locs) - -class LocalBackend[M[_]](inboxes: Map[Loc, Queue[M, Any]]): - val locs = inboxes.keys.toSeq - - def runNetwork[A](at: Loc)( - network: Network[M, A] - )(using M: Monad[M]): M[A] = - network.foldMap(run(at, inboxes).toFunctionK) - - private[choreo] def run( - at: Loc, - inboxes: Map[Loc, Queue[M, Any]] - )(using M: Monad[M]): [A] => NetworkSig[M, A] => M[A] = [A] => - (na: NetworkSig[M, A]) => - na match - case NetworkSig.Run(ma) => - ma - - case NetworkSig.Send(a, to, ser) => - val inbox = inboxes.get(to).get - inbox.offer(a) - - case NetworkSig.Recv(from, ser) => - val inbox = inboxes.get(at).get - inbox.take.map(_.asInstanceOf[A]) - - case NetworkSig.Broadcast(a, ser) => - locs - .filter(_ != at) - .traverse_ { to => - run(at, inboxes)(NetworkSig.Send(a, to, ser)) - } - -object LocalBackend: - def makeInboxes[M[_]: Concurrent]( - locs: Seq[Loc] - ): M[Map[Loc, Queue[M, Any]]] = - for queues <- locs.traverse(_ => Queue.unbounded[M, Any]) - yield locs.zip(queues).toMap - - given backend[M[_]: Monad]: Backend[LocalBackend[M], M] with - extension (backend: LocalBackend[M]) - def runNetwork[A](at: Loc)(network: Network[M, A]): M[A] = - runNetwork(at)(network) - -class HTTPBackend[M[_]](locs: List[Loc]): - def runNetwork[A](at: Loc)( - network: Network[M, A] - )(using M: Monad[M]): M[A] = - network.foldMap(run(at, locs).toFunctionK) - - private[choreo] def run( - at: Loc, - locs: List[Loc] - )(using M: Monad[M]): [A] => NetworkSig[M, A] => M[A] = [A] => - (na: NetworkSig[M, A]) => - na match - case NetworkSig.Run(ma) => - ma - - case NetworkSig.Send(a, to, ser) => - val encoded = ser.encode(a) - // TODO: send to network - M.pure(()) - - case NetworkSig.Recv(from, ser) => - val encoded: ser.Encoding = ??? // TODO: receive from network - val value = ser.decode(encoded).get - M.pure(value) - - case NetworkSig.Broadcast(a, ser) => - locs - .filter(_ != at) - .traverse_ { to => - run(at, locs)(NetworkSig.Send(a, to, ser)) - } - -object HTTPBackend: - given backend[M[_]: Monad]: Backend[HTTPBackend[M], M] with - extension (backend: HTTPBackend[M]) - def runNetwork[A](at: Loc)(network: Network[M, A]): M[A] = - runNetwork(at)(network) diff --git a/core/src/main/scala/Choreo.scala b/core/src/main/scala/Choreo.scala index 1c0f0b7..fa8f327 100644 --- a/core/src/main/scala/Choreo.scala +++ b/core/src/main/scala/Choreo.scala @@ -7,6 +7,7 @@ import cats.effect.kernel.Concurrent import cats.syntax.all.* import cats.arrow.FunctionK +import choreo.backend.Backend import choreo.utils.toFunctionK type Choreo[M[_], A] = Free[[X] =>> ChoreoSig[M, X], A] diff --git a/core/src/main/scala/backend/Backend.scala b/core/src/main/scala/backend/Backend.scala new file mode 100644 index 0000000..9ce6db3 --- /dev/null +++ b/core/src/main/scala/backend/Backend.scala @@ -0,0 +1,18 @@ +package choreo +package backend + +import cats.Monad +import cats.effect.std.Queue +import cats.effect.kernel.Concurrent +import cats.syntax.all.* + +trait Backend[B, M[_]]: + extension (backend: B) + def runNetwork[A](at: Loc)(network: Network[M, A]): M[A] + +object Backend: + def local[M[_]: Concurrent](locs: List[Loc]): M[LocalBackend[M]] = + LocalBackend.make(locs) + + def http[M[_]: Concurrent](locs: List[Loc]): HTTPBackend[M] = + HTTPBackend(locs) diff --git a/core/src/main/scala/backend/HTTP.scala b/core/src/main/scala/backend/HTTP.scala new file mode 100644 index 0000000..0439222 --- /dev/null +++ b/core/src/main/scala/backend/HTTP.scala @@ -0,0 +1,46 @@ +package choreo +package backend + +import cats.Monad +import cats.arrow.FunctionK +import cats.effect.kernel.Concurrent +import cats.syntax.all.* + +import choreo.utils.toFunctionK + +class HTTPBackend[M[_]](locs: List[Loc]): + def runNetwork[A](at: Loc)(network: Network[M, A])(using M: Monad[M]): M[A] = + network.foldMap(run(at, locs).toFunctionK) + + private[choreo] def run(at: Loc, locs: List[Loc])(using + M: Monad[M] + ): [A] => NetworkSig[M, A] => M[A] = [A] => + (na: NetworkSig[M, A]) => + na match + case NetworkSig.Run(ma) => + ma + + case NetworkSig.Send(a, to, ser) => + val encoded = ser.encode(a) + // TODO: send to network + M.pure(()) + + case NetworkSig.Recv(from, ser) => + val encoded: ser.Encoding = ??? // TODO: receive from network + val value = ser.decode(encoded).get + M.pure(value) + + case NetworkSig.Broadcast(a, ser) => + locs + .filter(_ != at) + .traverse_ { to => + run(at, locs)(NetworkSig.Send(a, to, ser)) + } +end HTTPBackend + +object HTTPBackend: + given backend[M[_]: Monad]: Backend[HTTPBackend[M], M] with + extension (backend: HTTPBackend[M]) + def runNetwork[A](at: Loc)(network: Network[M, A]): M[A] = + runNetwork(at)(network) +end HTTPBackend diff --git a/core/src/main/scala/backend/Local.scala b/core/src/main/scala/backend/Local.scala new file mode 100644 index 0000000..55c046a --- /dev/null +++ b/core/src/main/scala/backend/Local.scala @@ -0,0 +1,57 @@ +package choreo +package backend + +import cats.Monad +import cats.arrow.FunctionK +import cats.effect.std.Queue +import cats.effect.kernel.Concurrent +import cats.syntax.all.* + +import choreo.utils.toFunctionK + +class LocalBackend[M[_]](inboxes: Map[Loc, Queue[M, Any]]): + val locs = inboxes.keys.toSeq + + def runNetwork[A](at: Loc)(network: Network[M, A])(using M: Monad[M]): M[A] = + network.foldMap(run(at, inboxes).toFunctionK) + + private[choreo] def run( + at: Loc, + inboxes: Map[Loc, Queue[M, Any]] + )(using M: Monad[M]): [A] => NetworkSig[M, A] => M[A] = [A] => + (na: NetworkSig[M, A]) => + na match + case NetworkSig.Run(ma) => + ma + + case NetworkSig.Send(a, to, ser) => + val inbox = inboxes.get(to).get + inbox.offer(a) + + case NetworkSig.Recv(from, ser) => + val inbox = inboxes.get(at).get + inbox.take.map(_.asInstanceOf[A]) + + case NetworkSig.Broadcast(a, ser) => + locs + .filter(_ != at) + .traverse_ { to => + run(at, inboxes)(NetworkSig.Send(a, to, ser)) + } +end LocalBackend + +object LocalBackend: + def make[M[_]: Concurrent](locs: List[Loc]): M[LocalBackend[M]] = + makeInboxes(locs).map(LocalBackend(_)) + + given backend[M[_]: Monad]: Backend[LocalBackend[M], M] with + extension (backend: LocalBackend[M]) + def runNetwork[A](at: Loc)(network: Network[M, A]): M[A] = + runNetwork(at)(network) +end LocalBackend + +private[this] def makeInboxes[M[_]: Concurrent]( + locs: Seq[Loc] +): M[Map[Loc, Queue[M, Any]]] = + for queues <- locs.traverse(_ => Queue.unbounded[M, Any]) + yield locs.zip(queues).toMap diff --git a/examples/src/main/scala/Bookseller.scala b/examples/src/main/scala/Bookseller.scala index 857b24f..949ce38 100644 --- a/examples/src/main/scala/Bookseller.scala +++ b/examples/src/main/scala/Bookseller.scala @@ -6,6 +6,8 @@ import cats.effect.IO import cats.effect.IO.asyncForIO import cats.syntax.all.* +import choreo.backend.Backend + case class Book(title: String, price: Double) case class Date(year: Int, month: Int, day: Int): diff --git a/examples/src/main/scala/KV.scala b/examples/src/main/scala/KV.scala index 716bb6a..60ac97f 100644 --- a/examples/src/main/scala/KV.scala +++ b/examples/src/main/scala/KV.scala @@ -7,6 +7,8 @@ import cats.effect.IO.asyncForIO import cats.effect.kernel.Ref import cats.syntax.all.* +import choreo.backend.Backend + type State = Map[String, String] enum Request: