Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

+ Monix Task and BIO support and Cats IO improvements #879

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ lazy val instrumentation = (project in file("instrumentation"))
`kamon-twitter-future`,
`kamon-scalaz-future`,
`kamon-cats-io`,
`kamon-monix`,
`kamon-logback`,
`kamon-jdbc`,
`kamon-kafka`,
Expand Down Expand Up @@ -234,6 +235,27 @@ lazy val `kamon-cats-io` = (project in file("instrumentation/kamon-cats-io"))

).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")

lazy val `kamon-monix` = (project in file("instrumentation/kamon-monix"))
.disablePlugins(AssemblyPlugin)
.enablePlugins(JavaAgent)
.settings(instrumentationSettings)
.settings(
// do not build for 2.11
crossScalaVersions := crossScalaVersions.value.filter(!_.startsWith("2.11")),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
scalatest % "test",
logbackClassic % "test"
) ++ {
// dependencies must be added only for scala versions strictly above 2.11, otherwise the resolution will be
// attempted for 2.11 and fail
if (scalaBinaryVersion.value == "2.11") Nil else Seq(
"io.monix" %% "monix-eval" % "3.2.2" % "provided",
"io.monix" %% "monix-bio" % "1.0.0" % "provided")
}
)
.dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test", `kamon-cats-io` % "compile->compile;test->test")


lazy val `kamon-logback` = (project in file("instrumentation/kamon-logback"))
.disablePlugins(AssemblyPlugin)
Expand Down Expand Up @@ -641,6 +663,7 @@ val `kamon-bundle` = (project in file("bundle/kamon-bundle"))
`kamon-twitter-future` % "shaded",
`kamon-scalaz-future` % "shaded",
`kamon-cats-io` % "shaded",
`kamon-monix` % "shaded",
`kamon-logback` % "shaded",
`kamon-jdbc` % "shaded",
`kamon-kafka` % "shaded",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

kanela.modules {
executor-service {
within += "cats.effect.internals.IOShift\\$Tick"
within += "cats.effect.internals.IOTimer\\$ShiftTick"
within += "cats.effect.internals..*"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package kamon.instrumentation.cats.io

import cats.effect.{ExitCase, Sync}
import cats.implicits._
import kamon.Kamon
import kamon.tag.TagSet
import kamon.trace.Span

object Tracing {

/**
* Wraps the effect `fa` in a new span with the provided name and tags. The created span is marked as finished after
* the effect is completed or cancelled.
*
* @param name the span name
* @param tags the collection of tags to apply to the span
* @param takeSamplingDecision if true, it ensures that a Sampling Decision is taken in case none has been taken so far
* @param fa the effect to execute
* @tparam F the effect type
* @tparam A the value produced in the effect F
* @return the same effect wrapped within a named span
*/
def operationName[F[_] : Sync, A](name: String, tags: Map[String, Any] = Map.empty, takeSamplingDecision: Boolean = true)(fa: F[A]): F[A] = {
val F = implicitly[Sync[F]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take (implicit F: Sync[F]) like buildSpan?

buildSpan(name, tags).flatMap { span =>
F.guaranteeCase(fa) {
case ExitCase.Completed => F.delay(finishSpan(span, takeSamplingDecision))
case ExitCase.Error(err) => F.delay(failSpan(span, err, takeSamplingDecision))
case ExitCase.Canceled => F.delay(finishSpan(span.tag("cancel", value = true), takeSamplingDecision))
}
}
}

private def buildSpan[F[_]](name: String, tags: Map[String, Any])(implicit F: Sync[F]): F[Span] =
F.delay(
Kamon
.serverSpanBuilder(name, "cats-effect")
.asChildOf(Kamon.currentSpan())
.tagMetrics(TagSet.from(tags))
.start()
)

private def finishSpan(span: Span, takeSamplingDecision: Boolean): Span = {
if (takeSamplingDecision) span.takeSamplingDecision()
span.finish()
span
}

private def failSpan(span: Span, err: Throwable, takeSamplingDecision: Boolean): Span = {
if (err.getMessage == null) span.fail(err)
else span.fail(err.getMessage, err)

finishSpan(span, takeSamplingDecision)
}

object Implicits {

final class KamonOps[F[_] : Sync, A](fa: F[A]) {
/**
* Wraps the effect in a new span with the provided name and tags. The created span is marked as finished after
* the effect is completed or cancelled.
*
* @param name the span name
* @param tags the collection of tags to apply to the span
* @param takeSamplingDecision if true, it ensures that a Sampling Decision is taken in case none has been taken so far
* @return the same effect wrapped within a named span
*/
def named(name: String, tags: Map[String, Any] = Map.empty, takeSamplingDecision: Boolean = true): F[A] =
operationName(name, tags, takeSamplingDecision)(fa)
}

implicit final def kamonTracingSyntax[F[_] : Sync, A](fa: F[A]): KamonOps[F, A] = new KamonOps(fa)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package kamon.instrumentation.cats.io

import java.util.concurrent.Executors

import cats.effect.{Async, ContextShift, Effect, LiftIO, Timer}
import cats.implicits._
import kamon.Kamon
import kamon.context.Context
import kamon.instrumentation.cats.io.Tracing.Implicits._
import kamon.tag.Lookups.plain
import kamon.testkit.TestSpanReporter
import org.scalatest.concurrent.{PatienceConfiguration, ScalaFutures}
import org.scalatest.{BeforeAndAfterAll, Inspectors, Matchers, WordSpec}

import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration.Duration

// NOTE: We have this test just to ensure that the Context propagation is working, but starting with Kamon 2.0 there
// is no need to have explicit Runnable/Callable instrumentation because the instrumentation brought by the
// kamon-executors module should take care of all non-JDK Runnable/Callable implementations.
abstract class AbstractCatsEffectInstrumentationSpec[F[_]: LiftIO](effectName: String)(implicit F: Effect[F])
extends WordSpec
with ScalaFutures
with Matchers
with PatienceConfiguration
with TestSpanReporter
with Inspectors
with BeforeAndAfterAll {

implicit def contextShift: ContextShift[F]

implicit def timer: Timer[F]

private val customExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go with a fixed ThreadPool to ensure the test are running with multiple threads


override protected def afterAll(): Unit = {
customExecutionContext.shutdown()
shutdownTestSpanReporter()
super.afterAll()
}

s"A Cats Effect $effectName" should {
"capture the active span available when created" which {
"must be available across asynchronous boundaries" in {
val context = Context.of("key", "value")

val contextTagF: F[String] =
for {
scope <- F.delay(Kamon.storeContext(context))
_ <- Async.shift[F](customExecutionContext)
len <- F.delay("Hello Kamon!").map(_.length)
_ <- F.pure(len.toString)
_ <- timer.sleep(Duration.Zero)
_ <- Async.shift[F](global)
tagValue <- F.delay(Kamon.currentContext().getTag(plain("key")))
_ <- F.delay(scope.close())
} yield tagValue

val contextTag = F.toIO(contextTagF).unsafeRunSync()
contextTag shouldEqual "value"
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should test cancellation and errors?

Would be good to run multiple IOs in parallel (with shift inserted in between) to ensure everything works.


"nest spans correctly" in {
// the test expects the following span tree, but for some reason, it doesn't work:
// - root
// - 1 (value = 1)
// - 2 (value = 2)
// - 3 (value = 3)
val rootSpan = for {
root <- F.delay(Kamon.spanBuilder("root").start())
_ <- (1L to 3L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would the spans be nested in a tree?
I thinks this test just creates unrelated spans.
Maybe try creating the other spans as children of the first one, or using Kamon.runWithSpan

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the test all operations are sequenced as follows:

  • a root span is created and started without being finished
  • 3 additional spans that are started and finished, executed one after another (.map(...).sequence)
  • the root span is finished

The named function that is applied to the F.delay(idx) is actually a call to the operationName in the Tracing object that does the following:

  • creates and starts a new span with the provided name and tags that sets as parent span the result of Kamon.currentSpan()
  • executes the F[_] effect, which in this case is F.delay(idx)
  • finishes the span

Assuming the operation sequence defined above, the three effects F.delay(idx) should be wrapped in spans that use the root span as a parent, because that's the current span (the started and not finished).

I don't understand what part of this test is "iffy". :) ... but please let me know if I'm missing something.

The assertion at the end is obviously wrong because it also considers the root span for the equality check, but that's not the problem here as I've manually printed the span ids when they are created.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the kamon-scala-future module, it looks like it doesn't just create a span, but explicitly stores it in the context storage and removes it afterwards. I assumed that simply starting and finishing a span is sufficient. I'll try to do the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I've put down some println's and Kamon.currentSpan() give Span.Empty both in the span creation code and in the test code.
Adding something like Kamon.runWithSpan(root) to the test should correctly nest it in the test code.

Copy link
Contributor

@SimunKaracic SimunKaracic Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you figured it out! 🎉

Feel free to use me as a rubber duck, while I try and wrap my head around Monix code 😅

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the problem, I needed to store the span in the context storage...
But now I've hit another issue where every now and again the test fails due to the fact that the fibers are interleaved. I think this is a more fundamental issue with my PR, I may have to use TaskLocal and IOLocal to get named spans working correctly. Still investigating.

.toList
.map { idx =>
F.delay(idx).named(idx.toString, Map("value" -> idx))
}.sequence
_ <- F.delay(root.finish())
} yield root

val root = F.toIO(rootSpan).unsafeRunSync()

val spans = testSpanReporter().spans()
forAll(spans) { span =>
span.parentId.string shouldEqual root.id.string
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package kamon.instrumentation.cats.io

import cats.effect.{ContextShift, IO, Timer}

import scala.concurrent.ExecutionContext.global

class CatsIOInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[IO]("IO") {

override implicit def contextShift: ContextShift[IO] = IO.contextShift(global)

override implicit def timer: Timer[IO] = IO.timer(global)
}

This file was deleted.

11 changes: 11 additions & 0 deletions instrumentation/kamon-monix/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#############################################
# Kamon Monix Reference Configuration #
#############################################

kanela.modules {
executor-service {
within += "monix.eval..*"
within += "monix.execution..*"
within += "monix.bio..*"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package kamon.instrumentation.monix

import cats.effect.{ContextShift, Timer}
import kamon.instrumentation.cats.io.AbstractCatsEffectInstrumentationSpec
import monix.bio.{IO, Task}
import monix.execution.Scheduler.Implicits.global

class MonixBIOInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[Task]("Monix Bifunctor IO") {

override implicit def contextShift: ContextShift[Task] = IO.contextShift

override implicit def timer: Timer[Task] = IO.timer
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package kamon.instrumentation.monix

import cats.effect.{ContextShift, Timer}
import kamon.instrumentation.cats.io.AbstractCatsEffectInstrumentationSpec
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global

class MonixInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[Task]("Monix Task") {

override implicit def contextShift: ContextShift[Task] = Task.contextShift

override implicit def timer: Timer[Task] = Task.timer
}