From cb9bd978969b9c4b7e21ccb5c38df154e5f8bdef Mon Sep 17 00:00:00 2001 From: Bogdan Roman Date: Tue, 20 Oct 2020 18:34:51 +0200 Subject: [PATCH 1/4] Cats IO improvements and Monix Task and BIO support --- build.sbt | 17 ++++ .../src/main/resources/reference.conf | 3 +- .../instrumentation/cats/io/Tracing.scala | 75 ++++++++++++++++ ...bstractCatsEffectInstrumentationSpec.scala | 89 +++++++++++++++++++ .../cats/io/CatsIOInstrumentationSpec.scala | 12 +++ .../cats/CatsIOInstrumentationSpec.scala | 59 ------------ .../src/main/resources/reference.conf | 11 +++ .../monix/MonixBIOInstrumentationSpec.scala | 13 +++ .../monix/MonixInstrumentationSpec.scala | 13 +++ 9 files changed, 231 insertions(+), 61 deletions(-) create mode 100644 instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/io/Tracing.scala create mode 100644 instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala create mode 100644 instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/CatsIOInstrumentationSpec.scala delete mode 100644 instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala create mode 100644 instrumentation/kamon-monix/src/main/resources/reference.conf create mode 100644 instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixBIOInstrumentationSpec.scala create mode 100644 instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixInstrumentationSpec.scala diff --git a/build.sbt b/build.sbt index c1f1fbb15..93eb37bcc 100644 --- a/build.sbt +++ b/build.sbt @@ -117,6 +117,7 @@ lazy val instrumentation = (project in file("instrumentation")) `kamon-twitter-future`, `kamon-scalaz-future`, `kamon-cats-io`, + `kamon-monix`, `kamon-logback`, `kamon-jdbc`, `kamon-kafka`, @@ -234,6 +235,21 @@ lazy val `kamon-cats-io` = (project in file("instrumentation/kamon-cats-io")) ).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test") +lazy val `kamon-monix` = (project in file("instrumentation/kamon-monix")) + .disablePlugins(AssemblyPlugin) + .enablePlugins(JavaAgent) + .settings(instrumentationSettings) + .settings( + bintrayPackage := "kamon-futures", + libraryDependencies ++= Seq( + kanelaAgent % "provided", + "io.monix" %% "monix-eval" % "3.2.2" % "provided", + "io.monix" %% "monix-bio" % "1.0.0" % "provided", + scalatest % "test", + logbackClassic % "test" + ) + ).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test", `kamon-cats-io` % "compile->compile;test->test") + lazy val `kamon-logback` = (project in file("instrumentation/kamon-logback")) .disablePlugins(AssemblyPlugin) @@ -641,6 +657,7 @@ val `kamon-bundle` = (project in file("bundle/kamon-bundle")) `kamon-twitter-future` % "shaded", `kamon-scalaz-future` % "shaded", `kamon-cats-io` % "shaded", + `kamon-monix` % "shaded", `kamon-logback` % "shaded", `kamon-jdbc` % "shaded", `kamon-kafka` % "shaded", diff --git a/instrumentation/kamon-cats-io/src/main/resources/reference.conf b/instrumentation/kamon-cats-io/src/main/resources/reference.conf index 9d31317c2..cee6d050b 100644 --- a/instrumentation/kamon-cats-io/src/main/resources/reference.conf +++ b/instrumentation/kamon-cats-io/src/main/resources/reference.conf @@ -4,7 +4,6 @@ kanela.modules { executor-service { - within += "cats.effect.internals.IOShift\\$Tick" - within += "cats.effect.internals.IOTimer\\$ShiftTick" + within += "cats.effect.internals..*" } } \ No newline at end of file diff --git a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/io/Tracing.scala b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/io/Tracing.scala new file mode 100644 index 000000000..98ccf1100 --- /dev/null +++ b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/io/Tracing.scala @@ -0,0 +1,75 @@ +package kamon.instrumentation.cats.io + +import cats.effect.{ExitCase, Sync} +import cats.implicits._ +import kamon.Kamon +import kamon.tag.TagSet +import kamon.trace.Span + +object Tracing { + + /** + * Wraps the effect `fa` in a new span with the provided name and tags. The created span is marked as finished after + * the effect is completed or cancelled. + * + * @param name the span name + * @param tags the collection of tags to apply to the span + * @param takeSamplingDecision if true, it ensures that a Sampling Decision is taken in case none has been taken so far + * @param fa the effect to execute + * @tparam F the effect type + * @tparam A the value produced in the effect F + * @return the same effect wrapped within a named span + */ + def operationName[F[_] : Sync, A](name: String, tags: Map[String, Any] = Map.empty, takeSamplingDecision: Boolean = true)(fa: F[A]): F[A] = { + val F = implicitly[Sync[F]] + buildSpan(name, tags).flatMap { span => + F.guaranteeCase(fa) { + case ExitCase.Completed => F.delay(finishSpan(span, takeSamplingDecision)) + case ExitCase.Error(err) => F.delay(failSpan(span, err, takeSamplingDecision)) + case ExitCase.Canceled => F.delay(finishSpan(span.tag("cancel", value = true), takeSamplingDecision)) + } + } + } + + private def buildSpan[F[_]](name: String, tags: Map[String, Any])(implicit F: Sync[F]): F[Span] = + F.delay( + Kamon + .serverSpanBuilder(name, "cats-effect") + .asChildOf(Kamon.currentSpan()) + .tagMetrics(TagSet.from(tags)) + .start() + ) + + private def finishSpan(span: Span, takeSamplingDecision: Boolean): Span = { + if (takeSamplingDecision) span.takeSamplingDecision() + span.finish() + span + } + + private def failSpan(span: Span, err: Throwable, takeSamplingDecision: Boolean): Span = { + if (err.getMessage == null) span.fail(err) + else span.fail(err.getMessage, err) + + finishSpan(span, takeSamplingDecision) + } + + object Implicits { + + final class KamonOps[F[_] : Sync, A](fa: F[A]) { + /** + * Wraps the effect in a new span with the provided name and tags. The created span is marked as finished after + * the effect is completed or cancelled. + * + * @param name the span name + * @param tags the collection of tags to apply to the span + * @param takeSamplingDecision if true, it ensures that a Sampling Decision is taken in case none has been taken so far + * @return the same effect wrapped within a named span + */ + def named(name: String, tags: Map[String, Any] = Map.empty, takeSamplingDecision: Boolean = true): F[A] = + operationName(name, tags, takeSamplingDecision)(fa) + } + + implicit final def kamonTracingSyntax[F[_] : Sync, A](fa: F[A]): KamonOps[F, A] = new KamonOps(fa) + } + +} \ No newline at end of file diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala new file mode 100644 index 000000000..ae9ca8e18 --- /dev/null +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala @@ -0,0 +1,89 @@ +package kamon.instrumentation.cats.io + +import java.util.concurrent.Executors + +import cats.effect.{Async, ContextShift, Effect, LiftIO, Timer} +import cats.implicits._ +import kamon.Kamon +import kamon.context.Context +import kamon.instrumentation.cats.io.Tracing.Implicits._ +import kamon.tag.Lookups.plain +import kamon.testkit.TestSpanReporter +import org.scalatest.concurrent.{PatienceConfiguration, ScalaFutures} +import org.scalatest.{BeforeAndAfterAll, Inspectors, Matchers, WordSpec} + +import scala.concurrent.ExecutionContext +import scala.concurrent.ExecutionContext.global +import scala.concurrent.duration.Duration + +// NOTE: We have this test just to ensure that the Context propagation is working, but starting with Kamon 2.0 there +// is no need to have explicit Runnable/Callable instrumentation because the instrumentation brought by the +// kamon-executors module should take care of all non-JDK Runnable/Callable implementations. +abstract class AbstractCatsEffectInstrumentationSpec[F[_]: LiftIO](effectName: String)(implicit F: Effect[F]) + extends WordSpec + with ScalaFutures + with Matchers + with PatienceConfiguration + with TestSpanReporter + with Inspectors + with BeforeAndAfterAll { + + implicit def contextShift: ContextShift[F] + + implicit def timer: Timer[F] + + private val customExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) + + override protected def afterAll(): Unit = { + customExecutionContext.shutdown() + super.afterAll() + } + + s"A Cats Effect $effectName" should { + "capture the active span available when created" which { + "must be available across asynchronous boundaries" in { + val context = Context.of("key", "value") + + val contextTagF: F[String] = + for { + scope <- F.delay(Kamon.storeContext(context)) + _ <- Async.shift[F](customExecutionContext) + len <- F.delay("Hello Kamon!").map(_.length) + _ <- F.pure(len.toString) + _ <- timer.sleep(Duration.Zero) + _ <- Async.shift[F](global) + tagValue <- F.delay(Kamon.currentContext().getTag(plain("key"))) + _ <- F.delay(scope.close()) + } yield tagValue + + val contextTag = F.toIO(contextTagF).unsafeRunSync() + contextTag shouldEqual "value" + } + } + + "nest spans correctly" in { + // the test expects the following span tree, but for some reason, it doesn't work: + // - root + // - 1 (value = 1) + // - 2 (value = 2) + // - 3 (value = 3) + val rootSpan = for { + root <- F.delay(Kamon.spanBuilder("root").start()) + _ <- (1L to 3L) + .toList + .map { idx => + F.delay(idx).named(idx.toString, Map("value" -> idx)) + }.sequence + _ <- F.delay(root.finish()) + } yield root + + val root = F.toIO(rootSpan).unsafeRunSync() + + val spans = testSpanReporter().spans() + forAll(spans) { span => + span.parentId.string shouldEqual root.id.string + } + } + } + +} diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/CatsIOInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/CatsIOInstrumentationSpec.scala new file mode 100644 index 000000000..6abd9f9f3 --- /dev/null +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/CatsIOInstrumentationSpec.scala @@ -0,0 +1,12 @@ +package kamon.instrumentation.cats.io + +import cats.effect.{ContextShift, IO, Timer} + +import scala.concurrent.ExecutionContext.global + +class CatsIOInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[IO]("IO") { + + override implicit def contextShift: ContextShift[IO] = IO.contextShift(global) + + override implicit def timer: Timer[IO] = IO.timer(global) +} \ No newline at end of file diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala deleted file mode 100644 index c36902ffe..000000000 --- a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala +++ /dev/null @@ -1,59 +0,0 @@ -package kamon.instrumentation.futures.cats - -import java.util.concurrent.Executors - -import cats.effect.{ContextShift, IO} -import kamon.Kamon -import kamon.tag.Lookups.plain -import kamon.context.Context -import org.scalatest.{Matchers, OptionValues, WordSpec} -import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} - -import scala.concurrent.ExecutionContext.global -import scala.concurrent.ExecutionContext -import scala.concurrent.duration._ - -class CatsIoInstrumentationSpec extends WordSpec with ScalaFutures with Matchers with PatienceConfiguration - with OptionValues with Eventually { - - // NOTE: We have this test just to ensure that the Context propagation is working, but starting with Kamon 2.0 there - // is no need to have explicit Runnable/Callable instrumentation because the instrumentation brought by the - // kamon-executors module should take care of all non-JDK Runnable/Callable implementations. - - "an cats.effect IO created when instrumentation is active" should { - "capture the active span available when created" which { - "must be available across asynchronous boundaries" in { - implicit val ctxShift: ContextShift[IO] = IO.contextShift(global) - val anotherExecutionContext: ExecutionContext = - ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) - val context = Context.of("key", "value") - - implicit val timer = IO.timer(global) - - val contextTagAfterTransformations = - for { - scope <- IO { - Kamon.storeContext(context) - } - len <- IO("Hello Kamon!").map(_.length) - _ <- IO(len.toString) - _ <- IO.shift(global) - _ <- IO.shift - _ <- IO.sleep(Duration.Zero) - _ <- IO.shift(anotherExecutionContext) - } yield { - val tagValue = Kamon.currentContext().getTag(plain("key")) - scope.close() - tagValue - } - - val contextTagFuture = contextTagAfterTransformations.unsafeToFuture() - - - eventually(timeout(10 seconds)) { - contextTagFuture.value.get.get shouldBe "value" - } - } - } - } -} \ No newline at end of file diff --git a/instrumentation/kamon-monix/src/main/resources/reference.conf b/instrumentation/kamon-monix/src/main/resources/reference.conf new file mode 100644 index 000000000..6b8433de2 --- /dev/null +++ b/instrumentation/kamon-monix/src/main/resources/reference.conf @@ -0,0 +1,11 @@ +############################################# +# Kamon Monix Reference Configuration # +############################################# + +kanela.modules { + executor-service { + within += "monix.eval..*" + within += "monix.execution..*" + within += "monix.bio..*" + } +} \ No newline at end of file diff --git a/instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixBIOInstrumentationSpec.scala b/instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixBIOInstrumentationSpec.scala new file mode 100644 index 000000000..e3ae3c31c --- /dev/null +++ b/instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixBIOInstrumentationSpec.scala @@ -0,0 +1,13 @@ +package kamon.instrumentation.monix + +import cats.effect.{ContextShift, Timer} +import kamon.instrumentation.cats.io.AbstractCatsEffectInstrumentationSpec +import monix.bio.{IO, Task} +import monix.execution.Scheduler.Implicits.global + +class MonixBIOInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[Task]("Monix Bifunctor IO") { + + override implicit def contextShift: ContextShift[Task] = IO.contextShift + + override implicit def timer: Timer[Task] = IO.timer +} \ No newline at end of file diff --git a/instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixInstrumentationSpec.scala b/instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixInstrumentationSpec.scala new file mode 100644 index 000000000..459911897 --- /dev/null +++ b/instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixInstrumentationSpec.scala @@ -0,0 +1,13 @@ +package kamon.instrumentation.monix + +import cats.effect.{ContextShift, Timer} +import kamon.instrumentation.cats.io.AbstractCatsEffectInstrumentationSpec +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global + +class MonixInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[Task]("Monix Task") { + + override implicit def contextShift: ContextShift[Task] = Task.contextShift + + override implicit def timer: Timer[Task] = Task.timer +} \ No newline at end of file From 5c67e8675b84cc837cecac53be7789971e40f564 Mon Sep 17 00:00:00 2001 From: Bogdan Roman Date: Tue, 20 Oct 2020 19:14:21 +0200 Subject: [PATCH 2/4] Skip 2.11 build for kamon-monix --- build.sbt | 2 +- .../cats/io/AbstractCatsEffectInstrumentationSpec.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 93eb37bcc..8ccc83d80 100644 --- a/build.sbt +++ b/build.sbt @@ -240,7 +240,7 @@ lazy val `kamon-monix` = (project in file("instrumentation/kamon-monix")) .enablePlugins(JavaAgent) .settings(instrumentationSettings) .settings( - bintrayPackage := "kamon-futures", + crossScalaVersions := crossScalaVersions.value.filter(!_.startsWith("2.11")), libraryDependencies ++= Seq( kanelaAgent % "provided", "io.monix" %% "monix-eval" % "3.2.2" % "provided", diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala index ae9ca8e18..07914aecf 100644 --- a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala @@ -36,6 +36,7 @@ abstract class AbstractCatsEffectInstrumentationSpec[F[_]: LiftIO](effectName: S override protected def afterAll(): Unit = { customExecutionContext.shutdown() + shutdownTestSpanReporter() super.afterAll() } From 9349835d1aafd0bc5dfe2ea8f40ecb16c03c5e74 Mon Sep 17 00:00:00 2001 From: Bogdan Roman Date: Wed, 21 Oct 2020 13:59:54 +0200 Subject: [PATCH 3/4] Attempt to correct skiping cross-build for 2.11 --- build.sbt | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 8ccc83d80..d83d49ae4 100644 --- a/build.sbt +++ b/build.sbt @@ -240,15 +240,21 @@ lazy val `kamon-monix` = (project in file("instrumentation/kamon-monix")) .enablePlugins(JavaAgent) .settings(instrumentationSettings) .settings( + // do not build for 2.11 crossScalaVersions := crossScalaVersions.value.filter(!_.startsWith("2.11")), libraryDependencies ++= Seq( kanelaAgent % "provided", - "io.monix" %% "monix-eval" % "3.2.2" % "provided", - "io.monix" %% "monix-bio" % "1.0.0" % "provided", scalatest % "test", logbackClassic % "test" - ) - ).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test", `kamon-cats-io` % "compile->compile;test->test") + ) ++ { + // dependencies must be added only for scala versions strictly above 2.11, otherwise the resolution will be + // attempted for 2.11 and fail + if (scalaBinaryVersion.value == "2.11") Nil else Seq( + "io.monix" %% "monix-eval" % "3.2.2" % "provided", + "io.monix" %% "monix-bio" % "1.0.0" % "provided") + } + ) + .dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test", `kamon-cats-io` % "compile->compile;test->test") lazy val `kamon-logback` = (project in file("instrumentation/kamon-logback")) From 4196984d8f4aa232e679fecd74b8ecff34e8e024 Mon Sep 17 00:00:00 2001 From: Bogdan Roman Date: Thu, 26 Nov 2020 19:27:21 +0100 Subject: [PATCH 4/4] Adjusted tests and bumped dependencies --- build.sbt | 4 +- .../instrumentation/cats/io/Tracing.scala | 17 +++++-- ...bstractCatsEffectInstrumentationSpec.scala | 51 ++++++++++++++----- 3 files changed, 54 insertions(+), 18 deletions(-) diff --git a/build.sbt b/build.sbt index 02f92b0ef..3764e9cbf 100644 --- a/build.sbt +++ b/build.sbt @@ -250,8 +250,8 @@ lazy val `kamon-monix` = (project in file("instrumentation/kamon-monix")) // dependencies must be added only for scala versions strictly above 2.11, otherwise the resolution will be // attempted for 2.11 and fail if (scalaBinaryVersion.value == "2.11") Nil else Seq( - "io.monix" %% "monix-eval" % "3.2.2" % "provided", - "io.monix" %% "monix-bio" % "1.0.0" % "provided") + "io.monix" %% "monix-eval" % "3.3.0" % "provided", + "io.monix" %% "monix-bio" % "1.1.0" % "provided") } ) .dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test", `kamon-cats-io` % "compile->compile;test->test") diff --git a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/io/Tracing.scala b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/io/Tracing.scala index 98ccf1100..57d8da3aa 100644 --- a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/io/Tracing.scala +++ b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/io/Tracing.scala @@ -23,10 +23,21 @@ object Tracing { def operationName[F[_] : Sync, A](name: String, tags: Map[String, Any] = Map.empty, takeSamplingDecision: Boolean = true)(fa: F[A]): F[A] = { val F = implicitly[Sync[F]] buildSpan(name, tags).flatMap { span => + val ctx = Kamon.currentContext() + val scope = Kamon.storeContext(ctx.withEntry(Span.Key, span)) F.guaranteeCase(fa) { - case ExitCase.Completed => F.delay(finishSpan(span, takeSamplingDecision)) - case ExitCase.Error(err) => F.delay(failSpan(span, err, takeSamplingDecision)) - case ExitCase.Canceled => F.delay(finishSpan(span.tag("cancel", value = true), takeSamplingDecision)) + case ExitCase.Completed => F.delay { + finishSpan(span, takeSamplingDecision) + scope.close() + } + case ExitCase.Error(err) => F.delay { + failSpan(span, err, takeSamplingDecision) + scope.close() + } + case ExitCase.Canceled => F.delay { + finishSpan(span.tag("cancel", value = true), takeSamplingDecision) + scope.close() + } } } } diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala index 07914aecf..6a85e190e 100644 --- a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala @@ -9,8 +9,9 @@ import kamon.context.Context import kamon.instrumentation.cats.io.Tracing.Implicits._ import kamon.tag.Lookups.plain import kamon.testkit.TestSpanReporter -import org.scalatest.concurrent.{PatienceConfiguration, ScalaFutures} -import org.scalatest.{BeforeAndAfterAll, Inspectors, Matchers, WordSpec} +import kamon.trace.Span +import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Inspectors, Matchers, WordSpec} import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext.global @@ -26,7 +27,9 @@ abstract class AbstractCatsEffectInstrumentationSpec[F[_]: LiftIO](effectName: S with PatienceConfiguration with TestSpanReporter with Inspectors - with BeforeAndAfterAll { + with Eventually + with BeforeAndAfterAll + with BeforeAndAfter { implicit def contextShift: ContextShift[F] @@ -40,6 +43,16 @@ abstract class AbstractCatsEffectInstrumentationSpec[F[_]: LiftIO](effectName: S super.afterAll() } + before { + Kamon.storeContext(Context.Empty) + testSpanReporter().clear() + } + + after { + Kamon.storeContext(Context.Empty) + testSpanReporter().clear() + } + s"A Cats Effect $effectName" should { "capture the active span available when created" which { "must be available across asynchronous boundaries" in { @@ -69,22 +82,34 @@ abstract class AbstractCatsEffectInstrumentationSpec[F[_]: LiftIO](effectName: S // - 2 (value = 2) // - 3 (value = 3) val rootSpan = for { - root <- F.delay(Kamon.spanBuilder("root").start()) - _ <- (1L to 3L) - .toList - .map { idx => - F.delay(idx).named(idx.toString, Map("value" -> idx)) - }.sequence - _ <- F.delay(root.finish()) + rootAndScope <- F.delay { + val span = Kamon.spanBuilder("root").start() + val ctx = Kamon.storeContext(Kamon.currentContext().withEntry(Span.Key, span)) + (span, ctx) + } + (root, scope) = rootAndScope + _ <- (1L to 3L) + .toList + .traverse { idx => + F.delay(idx).named(idx.toString, Map("value" -> idx)) + } + _ <- F.delay { + root.finish() + scope.close() + } } yield root val root = F.toIO(rootSpan).unsafeRunSync() - val spans = testSpanReporter().spans() - forAll(spans) { span => + eventually { + testSpanReporter().spans().size shouldEqual 4 + testSpanReporter().spans().map(_.operationName).toSet shouldEqual Set("root", "1", "2", "3") + } + + val childrenSpans = testSpanReporter().spans().filter(_.id.string != root.id.string) + forAll(childrenSpans) { span => span.parentId.string shouldEqual root.id.string } } } - }