Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

+ Monix Task and BIO support and Cats IO improvements #879

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ lazy val instrumentation = (project in file("instrumentation"))
`kamon-twitter-future`,
`kamon-scalaz-future`,
`kamon-cats-io`,
`kamon-monix`,
`kamon-logback`,
`kamon-jdbc`,
`kamon-kafka`,
Expand Down Expand Up @@ -234,6 +235,27 @@ lazy val `kamon-cats-io` = (project in file("instrumentation/kamon-cats-io"))

).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")

lazy val `kamon-monix` = (project in file("instrumentation/kamon-monix"))
.disablePlugins(AssemblyPlugin)
.enablePlugins(JavaAgent)
.settings(instrumentationSettings)
.settings(
// do not build for 2.11
crossScalaVersions := crossScalaVersions.value.filter(!_.startsWith("2.11")),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
scalatest % "test",
logbackClassic % "test"
) ++ {
// dependencies must be added only for scala versions strictly above 2.11, otherwise the resolution will be
// attempted for 2.11 and fail
if (scalaBinaryVersion.value == "2.11") Nil else Seq(
"io.monix" %% "monix-eval" % "3.3.0" % "provided",
"io.monix" %% "monix-bio" % "1.1.0" % "provided")
}
)
.dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test", `kamon-cats-io` % "compile->compile;test->test")


lazy val `kamon-logback` = (project in file("instrumentation/kamon-logback"))
.disablePlugins(AssemblyPlugin)
Expand Down Expand Up @@ -644,6 +666,7 @@ val `kamon-bundle` = (project in file("bundle/kamon-bundle"))
`kamon-twitter-future` % "shaded",
`kamon-scalaz-future` % "shaded",
`kamon-cats-io` % "shaded",
`kamon-monix` % "shaded",
`kamon-logback` % "shaded",
`kamon-jdbc` % "shaded",
`kamon-kafka` % "shaded",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

kanela.modules {
executor-service {
within += "cats.effect.internals.IOShift\\$Tick"
within += "cats.effect.internals.IOTimer\\$ShiftTick"
within += "cats.effect.internals..*"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package kamon.instrumentation.cats.io

import cats.effect.{ExitCase, Sync}
import cats.implicits._
import kamon.Kamon
import kamon.tag.TagSet
import kamon.trace.Span

object Tracing {

/**
* Wraps the effect `fa` in a new span with the provided name and tags. The created span is marked as finished after
* the effect is completed or cancelled.
*
* @param name the span name
* @param tags the collection of tags to apply to the span
* @param takeSamplingDecision if true, it ensures that a Sampling Decision is taken in case none has been taken so far
* @param fa the effect to execute
* @tparam F the effect type
* @tparam A the value produced in the effect F
* @return the same effect wrapped within a named span
*/
def operationName[F[_] : Sync, A](name: String, tags: Map[String, Any] = Map.empty, takeSamplingDecision: Boolean = true)(fa: F[A]): F[A] = {
val F = implicitly[Sync[F]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take (implicit F: Sync[F]) like buildSpan?

buildSpan(name, tags).flatMap { span =>
val ctx = Kamon.currentContext()
val scope = Kamon.storeContext(ctx.withEntry(Span.Key, span))
F.guaranteeCase(fa) {
case ExitCase.Completed => F.delay {
finishSpan(span, takeSamplingDecision)
scope.close()
}
case ExitCase.Error(err) => F.delay {
failSpan(span, err, takeSamplingDecision)
scope.close()
}
case ExitCase.Canceled => F.delay {
finishSpan(span.tag("cancel", value = true), takeSamplingDecision)
scope.close()
}
}
}
}

private def buildSpan[F[_]](name: String, tags: Map[String, Any])(implicit F: Sync[F]): F[Span] =
F.delay(
Kamon
.serverSpanBuilder(name, "cats-effect")
.asChildOf(Kamon.currentSpan())
.tagMetrics(TagSet.from(tags))
.start()
)

private def finishSpan(span: Span, takeSamplingDecision: Boolean): Span = {
if (takeSamplingDecision) span.takeSamplingDecision()
span.finish()
span
}

private def failSpan(span: Span, err: Throwable, takeSamplingDecision: Boolean): Span = {
if (err.getMessage == null) span.fail(err)
else span.fail(err.getMessage, err)

finishSpan(span, takeSamplingDecision)
}

object Implicits {

final class KamonOps[F[_] : Sync, A](fa: F[A]) {
/**
* Wraps the effect in a new span with the provided name and tags. The created span is marked as finished after
* the effect is completed or cancelled.
*
* @param name the span name
* @param tags the collection of tags to apply to the span
* @param takeSamplingDecision if true, it ensures that a Sampling Decision is taken in case none has been taken so far
* @return the same effect wrapped within a named span
*/
def named(name: String, tags: Map[String, Any] = Map.empty, takeSamplingDecision: Boolean = true): F[A] =
operationName(name, tags, takeSamplingDecision)(fa)
}

implicit final def kamonTracingSyntax[F[_] : Sync, A](fa: F[A]): KamonOps[F, A] = new KamonOps(fa)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package kamon.instrumentation.cats.io

import java.util.concurrent.Executors

import cats.effect.{Async, ContextShift, Effect, LiftIO, Timer}
import cats.implicits._
import kamon.Kamon
import kamon.context.Context
import kamon.instrumentation.cats.io.Tracing.Implicits._
import kamon.tag.Lookups.plain
import kamon.testkit.TestSpanReporter
import kamon.trace.Span
import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Inspectors, Matchers, WordSpec}

import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration.Duration

// NOTE: We have this test just to ensure that the Context propagation is working, but starting with Kamon 2.0 there
// is no need to have explicit Runnable/Callable instrumentation because the instrumentation brought by the
// kamon-executors module should take care of all non-JDK Runnable/Callable implementations.
abstract class AbstractCatsEffectInstrumentationSpec[F[_]: LiftIO](effectName: String)(implicit F: Effect[F])
extends WordSpec
with ScalaFutures
with Matchers
with PatienceConfiguration
with TestSpanReporter
with Inspectors
with Eventually
with BeforeAndAfterAll
with BeforeAndAfter {

implicit def contextShift: ContextShift[F]

implicit def timer: Timer[F]

private val customExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go with a fixed ThreadPool to ensure the test are running with multiple threads


override protected def afterAll(): Unit = {
customExecutionContext.shutdown()
shutdownTestSpanReporter()
super.afterAll()
}

before {
Kamon.storeContext(Context.Empty)
testSpanReporter().clear()
}

after {
Kamon.storeContext(Context.Empty)
testSpanReporter().clear()
}

s"A Cats Effect $effectName" should {
"capture the active span available when created" which {
"must be available across asynchronous boundaries" in {
val context = Context.of("key", "value")

val contextTagF: F[String] =
for {
scope <- F.delay(Kamon.storeContext(context))
_ <- Async.shift[F](customExecutionContext)
len <- F.delay("Hello Kamon!").map(_.length)
_ <- F.pure(len.toString)
_ <- timer.sleep(Duration.Zero)
_ <- Async.shift[F](global)
tagValue <- F.delay(Kamon.currentContext().getTag(plain("key")))
_ <- F.delay(scope.close())
} yield tagValue

val contextTag = F.toIO(contextTagF).unsafeRunSync()
contextTag shouldEqual "value"
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should test cancellation and errors?

Would be good to run multiple IOs in parallel (with shift inserted in between) to ensure everything works.


"nest spans correctly" in {
// the test expects the following span tree, but for some reason, it doesn't work:
// - root
// - 1 (value = 1)
// - 2 (value = 2)
// - 3 (value = 3)
val rootSpan = for {
rootAndScope <- F.delay {
val span = Kamon.spanBuilder("root").start()
val ctx = Kamon.storeContext(Kamon.currentContext().withEntry(Span.Key, span))
(span, ctx)
}
(root, scope) = rootAndScope
_ <- (1L to 3L)
.toList
.traverse { idx =>
F.delay(idx).named(idx.toString, Map("value" -> idx))
}
_ <- F.delay {
root.finish()
scope.close()
}
} yield root

val root = F.toIO(rootSpan).unsafeRunSync()

eventually {
testSpanReporter().spans().size shouldEqual 4
testSpanReporter().spans().map(_.operationName).toSet shouldEqual Set("root", "1", "2", "3")
}

val childrenSpans = testSpanReporter().spans().filter(_.id.string != root.id.string)
forAll(childrenSpans) { span =>
span.parentId.string shouldEqual root.id.string
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package kamon.instrumentation.cats.io

import cats.effect.{ContextShift, IO, Timer}

import scala.concurrent.ExecutionContext.global

class CatsIOInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[IO]("IO") {

override implicit def contextShift: ContextShift[IO] = IO.contextShift(global)

override implicit def timer: Timer[IO] = IO.timer(global)
}

This file was deleted.

11 changes: 11 additions & 0 deletions instrumentation/kamon-monix/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#############################################
# Kamon Monix Reference Configuration #
#############################################

kanela.modules {
executor-service {
within += "monix.eval..*"
within += "monix.execution..*"
within += "monix.bio..*"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package kamon.instrumentation.monix

import cats.effect.{ContextShift, Timer}
import kamon.instrumentation.cats.io.AbstractCatsEffectInstrumentationSpec
import monix.bio.{IO, Task}
import monix.execution.Scheduler.Implicits.global

class MonixBIOInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[Task]("Monix Bifunctor IO") {

override implicit def contextShift: ContextShift[Task] = IO.contextShift

override implicit def timer: Timer[Task] = IO.timer
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package kamon.instrumentation.monix

import cats.effect.{ContextShift, Timer}
import kamon.instrumentation.cats.io.AbstractCatsEffectInstrumentationSpec
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global

class MonixInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[Task]("Monix Task") {

override implicit def contextShift: ContextShift[Task] = Task.contextShift

override implicit def timer: Timer[Task] = Task.timer
}