diff --git a/build.sbt b/build.sbt index d710ad693..3764e9cbf 100644 --- a/build.sbt +++ b/build.sbt @@ -117,6 +117,7 @@ lazy val instrumentation = (project in file("instrumentation")) `kamon-twitter-future`, `kamon-scalaz-future`, `kamon-cats-io`, + `kamon-monix`, `kamon-logback`, `kamon-jdbc`, `kamon-kafka`, @@ -234,6 +235,27 @@ lazy val `kamon-cats-io` = (project in file("instrumentation/kamon-cats-io")) ).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test") +lazy val `kamon-monix` = (project in file("instrumentation/kamon-monix")) + .disablePlugins(AssemblyPlugin) + .enablePlugins(JavaAgent) + .settings(instrumentationSettings) + .settings( + // do not build for 2.11 + crossScalaVersions := crossScalaVersions.value.filter(!_.startsWith("2.11")), + libraryDependencies ++= Seq( + kanelaAgent % "provided", + scalatest % "test", + logbackClassic % "test" + ) ++ { + // dependencies must be added only for scala versions strictly above 2.11, otherwise the resolution will be + // attempted for 2.11 and fail + if (scalaBinaryVersion.value == "2.11") Nil else Seq( + "io.monix" %% "monix-eval" % "3.3.0" % "provided", + "io.monix" %% "monix-bio" % "1.1.0" % "provided") + } + ) + .dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test", `kamon-cats-io` % "compile->compile;test->test") + lazy val `kamon-logback` = (project in file("instrumentation/kamon-logback")) .disablePlugins(AssemblyPlugin) @@ -644,6 +666,7 @@ val `kamon-bundle` = (project in file("bundle/kamon-bundle")) `kamon-twitter-future` % "shaded", `kamon-scalaz-future` % "shaded", `kamon-cats-io` % "shaded", + `kamon-monix` % "shaded", `kamon-logback` % "shaded", `kamon-jdbc` % "shaded", `kamon-kafka` % "shaded", diff --git a/instrumentation/kamon-cats-io/src/main/resources/reference.conf b/instrumentation/kamon-cats-io/src/main/resources/reference.conf index 9d31317c2..cee6d050b 100644 --- a/instrumentation/kamon-cats-io/src/main/resources/reference.conf +++ b/instrumentation/kamon-cats-io/src/main/resources/reference.conf @@ -4,7 +4,6 @@ kanela.modules { executor-service { - within += "cats.effect.internals.IOShift\\$Tick" - within += "cats.effect.internals.IOTimer\\$ShiftTick" + within += "cats.effect.internals..*" } } \ No newline at end of file diff --git a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/io/Tracing.scala b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/io/Tracing.scala new file mode 100644 index 000000000..57d8da3aa --- /dev/null +++ b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/io/Tracing.scala @@ -0,0 +1,86 @@ +package kamon.instrumentation.cats.io + +import cats.effect.{ExitCase, Sync} +import cats.implicits._ +import kamon.Kamon +import kamon.tag.TagSet +import kamon.trace.Span + +object Tracing { + + /** + * Wraps the effect `fa` in a new span with the provided name and tags. The created span is marked as finished after + * the effect is completed or cancelled. + * + * @param name the span name + * @param tags the collection of tags to apply to the span + * @param takeSamplingDecision if true, it ensures that a Sampling Decision is taken in case none has been taken so far + * @param fa the effect to execute + * @tparam F the effect type + * @tparam A the value produced in the effect F + * @return the same effect wrapped within a named span + */ + def operationName[F[_] : Sync, A](name: String, tags: Map[String, Any] = Map.empty, takeSamplingDecision: Boolean = true)(fa: F[A]): F[A] = { + val F = implicitly[Sync[F]] + buildSpan(name, tags).flatMap { span => + val ctx = Kamon.currentContext() + val scope = Kamon.storeContext(ctx.withEntry(Span.Key, span)) + F.guaranteeCase(fa) { + case ExitCase.Completed => F.delay { + finishSpan(span, takeSamplingDecision) + scope.close() + } + case ExitCase.Error(err) => F.delay { + failSpan(span, err, takeSamplingDecision) + scope.close() + } + case ExitCase.Canceled => F.delay { + finishSpan(span.tag("cancel", value = true), takeSamplingDecision) + scope.close() + } + } + } + } + + private def buildSpan[F[_]](name: String, tags: Map[String, Any])(implicit F: Sync[F]): F[Span] = + F.delay( + Kamon + .serverSpanBuilder(name, "cats-effect") + .asChildOf(Kamon.currentSpan()) + .tagMetrics(TagSet.from(tags)) + .start() + ) + + private def finishSpan(span: Span, takeSamplingDecision: Boolean): Span = { + if (takeSamplingDecision) span.takeSamplingDecision() + span.finish() + span + } + + private def failSpan(span: Span, err: Throwable, takeSamplingDecision: Boolean): Span = { + if (err.getMessage == null) span.fail(err) + else span.fail(err.getMessage, err) + + finishSpan(span, takeSamplingDecision) + } + + object Implicits { + + final class KamonOps[F[_] : Sync, A](fa: F[A]) { + /** + * Wraps the effect in a new span with the provided name and tags. The created span is marked as finished after + * the effect is completed or cancelled. + * + * @param name the span name + * @param tags the collection of tags to apply to the span + * @param takeSamplingDecision if true, it ensures that a Sampling Decision is taken in case none has been taken so far + * @return the same effect wrapped within a named span + */ + def named(name: String, tags: Map[String, Any] = Map.empty, takeSamplingDecision: Boolean = true): F[A] = + operationName(name, tags, takeSamplingDecision)(fa) + } + + implicit final def kamonTracingSyntax[F[_] : Sync, A](fa: F[A]): KamonOps[F, A] = new KamonOps(fa) + } + +} \ No newline at end of file diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala new file mode 100644 index 000000000..6a85e190e --- /dev/null +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/AbstractCatsEffectInstrumentationSpec.scala @@ -0,0 +1,115 @@ +package kamon.instrumentation.cats.io + +import java.util.concurrent.Executors + +import cats.effect.{Async, ContextShift, Effect, LiftIO, Timer} +import cats.implicits._ +import kamon.Kamon +import kamon.context.Context +import kamon.instrumentation.cats.io.Tracing.Implicits._ +import kamon.tag.Lookups.plain +import kamon.testkit.TestSpanReporter +import kamon.trace.Span +import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Inspectors, Matchers, WordSpec} + +import scala.concurrent.ExecutionContext +import scala.concurrent.ExecutionContext.global +import scala.concurrent.duration.Duration + +// NOTE: We have this test just to ensure that the Context propagation is working, but starting with Kamon 2.0 there +// is no need to have explicit Runnable/Callable instrumentation because the instrumentation brought by the +// kamon-executors module should take care of all non-JDK Runnable/Callable implementations. +abstract class AbstractCatsEffectInstrumentationSpec[F[_]: LiftIO](effectName: String)(implicit F: Effect[F]) + extends WordSpec + with ScalaFutures + with Matchers + with PatienceConfiguration + with TestSpanReporter + with Inspectors + with Eventually + with BeforeAndAfterAll + with BeforeAndAfter { + + implicit def contextShift: ContextShift[F] + + implicit def timer: Timer[F] + + private val customExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) + + override protected def afterAll(): Unit = { + customExecutionContext.shutdown() + shutdownTestSpanReporter() + super.afterAll() + } + + before { + Kamon.storeContext(Context.Empty) + testSpanReporter().clear() + } + + after { + Kamon.storeContext(Context.Empty) + testSpanReporter().clear() + } + + s"A Cats Effect $effectName" should { + "capture the active span available when created" which { + "must be available across asynchronous boundaries" in { + val context = Context.of("key", "value") + + val contextTagF: F[String] = + for { + scope <- F.delay(Kamon.storeContext(context)) + _ <- Async.shift[F](customExecutionContext) + len <- F.delay("Hello Kamon!").map(_.length) + _ <- F.pure(len.toString) + _ <- timer.sleep(Duration.Zero) + _ <- Async.shift[F](global) + tagValue <- F.delay(Kamon.currentContext().getTag(plain("key"))) + _ <- F.delay(scope.close()) + } yield tagValue + + val contextTag = F.toIO(contextTagF).unsafeRunSync() + contextTag shouldEqual "value" + } + } + + "nest spans correctly" in { + // the test expects the following span tree, but for some reason, it doesn't work: + // - root + // - 1 (value = 1) + // - 2 (value = 2) + // - 3 (value = 3) + val rootSpan = for { + rootAndScope <- F.delay { + val span = Kamon.spanBuilder("root").start() + val ctx = Kamon.storeContext(Kamon.currentContext().withEntry(Span.Key, span)) + (span, ctx) + } + (root, scope) = rootAndScope + _ <- (1L to 3L) + .toList + .traverse { idx => + F.delay(idx).named(idx.toString, Map("value" -> idx)) + } + _ <- F.delay { + root.finish() + scope.close() + } + } yield root + + val root = F.toIO(rootSpan).unsafeRunSync() + + eventually { + testSpanReporter().spans().size shouldEqual 4 + testSpanReporter().spans().map(_.operationName).toSet shouldEqual Set("root", "1", "2", "3") + } + + val childrenSpans = testSpanReporter().spans().filter(_.id.string != root.id.string) + forAll(childrenSpans) { span => + span.parentId.string shouldEqual root.id.string + } + } + } +} diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/CatsIOInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/CatsIOInstrumentationSpec.scala new file mode 100644 index 000000000..6abd9f9f3 --- /dev/null +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/cats/io/CatsIOInstrumentationSpec.scala @@ -0,0 +1,12 @@ +package kamon.instrumentation.cats.io + +import cats.effect.{ContextShift, IO, Timer} + +import scala.concurrent.ExecutionContext.global + +class CatsIOInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[IO]("IO") { + + override implicit def contextShift: ContextShift[IO] = IO.contextShift(global) + + override implicit def timer: Timer[IO] = IO.timer(global) +} \ No newline at end of file diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala deleted file mode 100644 index c36902ffe..000000000 --- a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala +++ /dev/null @@ -1,59 +0,0 @@ -package kamon.instrumentation.futures.cats - -import java.util.concurrent.Executors - -import cats.effect.{ContextShift, IO} -import kamon.Kamon -import kamon.tag.Lookups.plain -import kamon.context.Context -import org.scalatest.{Matchers, OptionValues, WordSpec} -import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} - -import scala.concurrent.ExecutionContext.global -import scala.concurrent.ExecutionContext -import scala.concurrent.duration._ - -class CatsIoInstrumentationSpec extends WordSpec with ScalaFutures with Matchers with PatienceConfiguration - with OptionValues with Eventually { - - // NOTE: We have this test just to ensure that the Context propagation is working, but starting with Kamon 2.0 there - // is no need to have explicit Runnable/Callable instrumentation because the instrumentation brought by the - // kamon-executors module should take care of all non-JDK Runnable/Callable implementations. - - "an cats.effect IO created when instrumentation is active" should { - "capture the active span available when created" which { - "must be available across asynchronous boundaries" in { - implicit val ctxShift: ContextShift[IO] = IO.contextShift(global) - val anotherExecutionContext: ExecutionContext = - ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) - val context = Context.of("key", "value") - - implicit val timer = IO.timer(global) - - val contextTagAfterTransformations = - for { - scope <- IO { - Kamon.storeContext(context) - } - len <- IO("Hello Kamon!").map(_.length) - _ <- IO(len.toString) - _ <- IO.shift(global) - _ <- IO.shift - _ <- IO.sleep(Duration.Zero) - _ <- IO.shift(anotherExecutionContext) - } yield { - val tagValue = Kamon.currentContext().getTag(plain("key")) - scope.close() - tagValue - } - - val contextTagFuture = contextTagAfterTransformations.unsafeToFuture() - - - eventually(timeout(10 seconds)) { - contextTagFuture.value.get.get shouldBe "value" - } - } - } - } -} \ No newline at end of file diff --git a/instrumentation/kamon-monix/src/main/resources/reference.conf b/instrumentation/kamon-monix/src/main/resources/reference.conf new file mode 100644 index 000000000..6b8433de2 --- /dev/null +++ b/instrumentation/kamon-monix/src/main/resources/reference.conf @@ -0,0 +1,11 @@ +############################################# +# Kamon Monix Reference Configuration # +############################################# + +kanela.modules { + executor-service { + within += "monix.eval..*" + within += "monix.execution..*" + within += "monix.bio..*" + } +} \ No newline at end of file diff --git a/instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixBIOInstrumentationSpec.scala b/instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixBIOInstrumentationSpec.scala new file mode 100644 index 000000000..e3ae3c31c --- /dev/null +++ b/instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixBIOInstrumentationSpec.scala @@ -0,0 +1,13 @@ +package kamon.instrumentation.monix + +import cats.effect.{ContextShift, Timer} +import kamon.instrumentation.cats.io.AbstractCatsEffectInstrumentationSpec +import monix.bio.{IO, Task} +import monix.execution.Scheduler.Implicits.global + +class MonixBIOInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[Task]("Monix Bifunctor IO") { + + override implicit def contextShift: ContextShift[Task] = IO.contextShift + + override implicit def timer: Timer[Task] = IO.timer +} \ No newline at end of file diff --git a/instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixInstrumentationSpec.scala b/instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixInstrumentationSpec.scala new file mode 100644 index 000000000..459911897 --- /dev/null +++ b/instrumentation/kamon-monix/src/test/scala/kamon/instrumentation/monix/MonixInstrumentationSpec.scala @@ -0,0 +1,13 @@ +package kamon.instrumentation.monix + +import cats.effect.{ContextShift, Timer} +import kamon.instrumentation.cats.io.AbstractCatsEffectInstrumentationSpec +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global + +class MonixInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[Task]("Monix Task") { + + override implicit def contextShift: ContextShift[Task] = Task.contextShift + + override implicit def timer: Timer[Task] = Task.timer +} \ No newline at end of file