Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(alerts): add a regular job to detect anomalies #22762

Merged
merged 33 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
eb4cb49
init
nikitaevg Jun 6, 2024
ba51409
initial version of the regular job
nikitaevg Jun 8, 2024
bb71f5a
small polishing
nikitaevg Jun 8, 2024
1d57843
Merge remote-tracking branch 'upstream/master' into 14331-regular-job
nikitaevg Jun 8, 2024
15e0b6b
small test fix
nikitaevg Jun 8, 2024
71c44d9
fix types in tests
nikitaevg Jun 8, 2024
5e59a5d
fix the crontab schedule to every hour
nikitaevg Jun 8, 2024
29e613b
add a newline to the template
nikitaevg Jun 8, 2024
b1a0219
add a test to check insight date range
nikitaevg Jun 11, 2024
0b2e57a
Merge remote-tracking branch 'upstream/master' into 14331-regular-job
nikitaevg Jun 11, 2024
29d9305
use the new display type naming
nikitaevg Jun 11, 2024
0cd1a01
Merge remote-tracking branch 'upstream/master' into 14331-regular-job
nikitaevg Jul 2, 2024
b8b4fec
address PR comments
nikitaevg Jul 2, 2024
3475c37
Merge branch 'master' into 14331-regular-job
webjunkie Jul 5, 2024
16227b9
Refactor things
webjunkie Jul 5, 2024
712d780
Fix scheduled task setup
webjunkie Jul 5, 2024
f179e37
Refactor more
webjunkie Jul 5, 2024
5cf4e46
Fix group setup
webjunkie Jul 9, 2024
d940441
address comments
nikitaevg Jul 12, 2024
d91bcc9
Merge remote-tracking branch 'upstream/master' into 14331-regular-job
nikitaevg Jul 12, 2024
ed27736
fix typing
nikitaevg Jul 13, 2024
a1966b4
Merge remote-tracking branch 'upstream/master' into 14331-regular-job
nikitaevg Jul 13, 2024
4e97160
Revert "Fix scheduled task setup"
webjunkie Jul 17, 2024
028d155
use si for chains
nikitaevg Jul 18, 2024
1920482
Merge remote-tracking branch 'upstream/master' into 14331-regular-job
nikitaevg Jul 18, 2024
fa7dc4c
use timestamp for the campaign key
nikitaevg Jul 18, 2024
e5b44ca
Merge branch 'master' into 14331-regular-job
nikitaevg Jul 23, 2024
b8d4e60
Merge branch 'master' into 14331-regular-job
nikitaevg Jul 23, 2024
e3db36a
Merge branch 'master' into 14331-regular-job
webjunkie Aug 6, 2024
4183314
Merge branch 'master' into 14331-regular-job
webjunkie Aug 6, 2024
66f0c4a
brush up the PR
nikitaevg Aug 6, 2024
d274bea
Merge branch 'master' into 14331-regular-job
nikitaevg Aug 13, 2024
0e8d28a
Merge branch 'master' into 14331-regular-job
webjunkie Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ee/tasks/test/subscriptions/test_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from posthog.models.exported_asset import ExportedAsset
from posthog.models.insight import Insight
from posthog.models.instance_setting import set_instance_setting
from posthog.models.subscription import Subscription
from posthog.test.base import APIBaseTest


Expand All @@ -24,7 +23,6 @@
@patch("ee.tasks.subscriptions.generate_assets")
@freeze_time("2022-02-02T08:55:00.000Z")
class TestSubscriptionsTasks(APIBaseTest):
subscriptions: list[Subscription] = None # type: ignore
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a redundant field

dashboard: Dashboard
insight: Insight
tiles: list[DashboardTile] = None # type: ignore
Expand Down
12 changes: 10 additions & 2 deletions frontend/src/lib/components/Alerts/views/EditAlert.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,18 @@ export function EditAlert({ id, insightShortId, onCancel, onDelete }: EditAlertP
</LemonField>
<Group name={['anomaly_condition', 'absoluteThreshold']}>
<span className="flex gap-10">
<LemonField name="lower" label="Lower threshold">
<LemonField
name="lower"
label="Lower threshold"
help="Notify if the value is strictly below"
>
<LemonInput type="number" className="w-20" data-attr="alert-lower-threshold" />
</LemonField>
<LemonField name="upper" label="Upper threshold">
<LemonField
name="upper"
label="Upper threshold"
help="Notify if the value is strictly above"
>
<LemonInput type="number" className="w-20" data-attr="alert-upper-threshold" />
</LemonField>
</span>
Expand Down
22 changes: 22 additions & 0 deletions frontend/src/queries/schema.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"AbsoluteThreshold": {
"additionalProperties": false,
"properties": {
"lower": {
"type": ["number", "null"]
},
"upper": {
"type": ["number", "null"]
}
},
"type": "object"
},
"ActionsNode": {
"additionalProperties": false,
"properties": {
Expand Down Expand Up @@ -179,6 +191,16 @@
"enum": ["numeric", "duration", "duration_ms", "percentage", "percentage_scaled"],
"type": "string"
},
"AnomalyCondition": {
"additionalProperties": false,
"properties": {
"absoluteThreshold": {
"$ref": "#/definitions/AbsoluteThreshold"
}
},
"required": ["absoluteThreshold"],
"type": "object"
},
"AnyDataNode": {
"anyOf": [
{
Expand Down
9 changes: 9 additions & 0 deletions frontend/src/queries/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1584,3 +1584,12 @@ export interface DashboardFilter {
date_to?: string | null
properties?: AnyPropertyFilter[] | null
}

export interface AbsoluteThreshold {
lower?: number | null
upper?: number | null
}

export interface AnomalyCondition {
absoluteThreshold: AbsoluteThreshold
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ export const insightNavLogic = kea<insightNavLogicType>([
},
})),
urlToAction(({ actions }) => ({
'/insights/:shortId(/:mode)(/:subscriptionId)': (
'/insights/:shortId(/:mode)(/:itemId)': (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_, // url params
{ dashboard, ...searchParams }, // search params
{ filters: _filters } // hash params
Expand Down
6 changes: 3 additions & 3 deletions frontend/src/scenes/insights/InsightPageHeader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { ExporterFormat, InsightLogicProps, ItemMode, NotebookNodeType } from '~

export function InsightPageHeader({ insightLogicProps }: { insightLogicProps: InsightLogicProps }): JSX.Element {
// insightSceneLogic
const { insightMode, subscriptionId } = useValues(insightSceneLogic)
const { insightMode, itemId } = useValues(insightSceneLogic)
const { setInsightMode } = useActions(insightSceneLogic)

// insightLogic
Expand Down Expand Up @@ -71,7 +71,7 @@ export function InsightPageHeader({ insightLogicProps }: { insightLogicProps: In
isOpen={insightMode === ItemMode.Subscriptions}
closeModal={() => push(urls.insightView(insight.short_id))}
insightShortId={insight.short_id}
subscriptionId={subscriptionId}
subscriptionId={itemId}
/>
<SharingModal
title="Insight sharing"
Expand All @@ -91,7 +91,7 @@ export function InsightPageHeader({ insightLogicProps }: { insightLogicProps: In
isOpen={insightMode === ItemMode.Alerts}
closeModal={() => push(urls.insightView(insight.short_id))}
insightShortId={insight.short_id}
alertId={subscriptionId}
alertId={itemId}
/>
<NewDashboardModal />
</>
Expand Down
26 changes: 9 additions & 17 deletions frontend/src/scenes/insights/insightSceneLogic.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ export const insightSceneLogic = kea<insightSceneLogicType>([
actions({
setInsightId: (insightId: InsightShortId) => ({ insightId }),
setInsightMode: (insightMode: ItemMode, source: InsightEventSource | null) => ({ insightMode, source }),
setSceneState: (insightId: InsightShortId, insightMode: ItemMode, subscriptionId: string | undefined) => ({
setSceneState: (insightId: InsightShortId, insightMode: ItemMode, itemId: string | undefined) => ({
insightId,
insightMode,
subscriptionId,
itemId,
}),
setInsightLogicRef: (logic: BuiltLogic<insightLogicType> | null, unmount: null | (() => void)) => ({
logic,
Expand All @@ -61,15 +61,11 @@ export const insightSceneLogic = kea<insightSceneLogicType>([
setSceneState: (_, { insightMode }) => insightMode,
},
],
subscriptionId: [
itemId: [
null as null | number | 'new',
{
setSceneState: (_, { subscriptionId }) =>
subscriptionId !== undefined
? subscriptionId === 'new'
? 'new'
: parseInt(subscriptionId, 10)
: null,
setSceneState: (_, { itemId }) =>
itemId !== undefined ? (itemId === 'new' ? 'new' : parseInt(itemId, 10)) : null,
},
],
insightLogicRef: [
Expand Down Expand Up @@ -210,8 +206,8 @@ export const insightSceneLogic = kea<insightSceneLogicType>([
source: JSON.parse(q),
} as DataVisualizationNode)
},
'/insights/:shortId(/:mode)(/:subscriptionId)': (
{ shortId, mode, subscriptionId }, // url params
'/insights/:shortId(/:mode)(/:itemId)': (
{ shortId, mode, itemId }, // url params
{ dashboard, ...searchParams }, // search params
{ filters: _filters, q }, // hash params
{ method, initial }, // "location changed" event payload
Expand Down Expand Up @@ -245,12 +241,8 @@ export const insightSceneLogic = kea<insightSceneLogicType>([
return
}

if (
insightId !== values.insightId ||
insightMode !== values.insightMode ||
subscriptionId !== values.subscriptionId
) {
actions.setSceneState(insightId, insightMode, subscriptionId)
if (insightId !== values.insightId || insightMode !== values.insightMode || itemId !== values.itemId) {
actions.setSceneState(insightId, insightMode, itemId)
}

// capture any filters from the URL, either #filters={} or ?insight=X&bla=foo&bar=baz
Expand Down
4 changes: 2 additions & 2 deletions frontend/src/scenes/scenes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,8 @@ export const routes: Record<string, Scene> = {
[urls.insightEdit(':shortId' as InsightShortId)]: Scene.Insight,
[urls.insightView(':shortId' as InsightShortId)]: Scene.Insight,
[urls.insightSubcriptions(':shortId' as InsightShortId)]: Scene.Insight,
[urls.insightSubcription(':shortId' as InsightShortId, ':subscriptionId')]: Scene.Insight,
[urls.alert(':shortId' as InsightShortId, ':subscriptionId')]: Scene.Insight,
[urls.insightSubcription(':shortId' as InsightShortId, ':itemId')]: Scene.Insight,
[urls.alert(':shortId' as InsightShortId, ':itemId')]: Scene.Insight,
[urls.alerts(':shortId' as InsightShortId)]: Scene.Insight,
[urls.insightSharing(':shortId' as InsightShortId)]: Scene.Insight,
[urls.savedInsights()]: Scene.SavedInsights,
Expand Down
1 change: 1 addition & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4320,6 +4320,7 @@ export type HogFunctionInvocationGlobals = {
>
}

// TODO: move to schema.ts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to do it in a follow up PR, but yeah, I can fix it here, done

export interface AnomalyCondition {
absoluteThreshold: {
lower?: number
Expand Down
2 changes: 1 addition & 1 deletion posthog/caching/calculate_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def get_cache_type(cacheable: Optional[FilterType] | Optional[dict]) -> CacheTyp


def calculate_for_query_based_insight(
insight: Insight, *, dashboard: Optional[Dashboard] = None, execution_mode: ExecutionMode, user: User
insight: Insight, *, dashboard: Optional[Dashboard] = None, execution_mode: ExecutionMode, user: Optional[User]
) -> "InsightResult":
from posthog.caching.fetch_from_cache import InsightResult, NothingInCacheResult
from posthog.caching.insight_cache import update_cached_state
Expand Down
2 changes: 1 addition & 1 deletion posthog/models/alert.py
nikitaevg marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class Alert(models.Model):
team: models.ForeignKey = models.ForeignKey("Team", on_delete=models.CASCADE)
insight = models.ForeignKey("posthog.Insight", on_delete=models.CASCADE)
insight: models.ForeignKey = models.ForeignKey("posthog.Insight", on_delete=models.CASCADE)

name: models.CharField = models.CharField(max_length=100)
target_value: models.TextField = models.TextField()
Expand Down
15 changes: 15 additions & 0 deletions posthog/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ class SchemaRoot(RootModel[Any]):
root: Any


class AbsoluteThreshold(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
lower: Optional[float] = None
upper: Optional[float] = None


class MathGroupTypeIndex(float, Enum):
NUMBER_0 = 0
NUMBER_1 = 1
Expand All @@ -28,6 +36,13 @@ class AggregationAxisFormat(StrEnum):
PERCENTAGE_SCALED = "percentage_scaled"


class AnomalyCondition(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
absoluteThreshold: AbsoluteThreshold


class Kind(StrEnum):
METHOD = "Method"
FUNCTION = "Function"
Expand Down
Empty file.
111 changes: 111 additions & 0 deletions posthog/tasks/alerts/checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from celery import shared_task
from celery.canvas import group, chain
from django.utils import timezone
import math
import structlog
from typing import Any

from posthog.api.services.query import ExecutionMode
from posthog.caching.calculate_results import calculate_for_query_based_insight
from posthog.email import EmailMessage
from posthog.hogql_queries.legacy_compatibility.flagged_conversion_manager import (
conversion_to_query_based,
)
from posthog.models import Alert
from posthog.schema import AnomalyCondition

logger = structlog.get_logger(__name__)


def check_all_alerts() -> None:
alert_ids = list(Alert.objects.all().values_list("id", flat=True))

group_count = 10
# All groups but the last one will have a group_size size.
# The last group will have at most group_size size.
group_size = int(math.ceil(len(alert_ids) / group_count))

groups = []
for i in range(0, len(alert_ids), group_size):
alert_id_group = alert_ids[i : i + group_size]
chained_calls = chain([check_alert_task.s(alert_id=alert_id) for alert_id in alert_id_group])
groups.append(chained_calls)

group(groups).apply_async()


def check_alert(alert_id: int) -> None:
alert = Alert.objects.get(pk=alert_id)
insight = alert.insight

with conversion_to_query_based(insight):
calculation_result = calculate_for_query_based_insight(
insight,
execution_mode=ExecutionMode.RECENT_CACHE_CALCULATE_BLOCKING_IF_STALE,
user=None,
)

if not calculation_result.result:
raise RuntimeError(f"No results for alert {alert.id}")

anomaly_condition = AnomalyCondition.model_validate(alert.anomaly_condition)
thresholds = anomaly_condition.absoluteThreshold

result = calculation_result.result[0]
aggregated_value = result["aggregated_value"]
anomalies_descriptions = []

if thresholds.lower is not None and aggregated_value < thresholds.lower:
anomalies_descriptions += [
f"The trend value ({aggregated_value}) is below the lower threshold ({thresholds.lower})"
]
if thresholds.upper is not None and aggregated_value > thresholds.upper:
anomalies_descriptions += [
f"The trend value ({aggregated_value}) is above the upper threshold ({thresholds.upper})"
]

if not anomalies_descriptions:
logger.info("No threshold met", alert_id=alert.id)
return

send_notifications(alert, anomalies_descriptions)


@shared_task(ignore_result=True)
def check_all_alerts_task() -> None:
check_all_alerts()


# Note, check_alert_task is used in Celery chains. Celery chains pass the previous
# function call result to the next function as an argument, hence args and kwargs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do check_alert_task.si (for immutable) above, then this doesn't happen/matter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it worked, thanks!

@shared_task(ignore_result=True)
def check_alert_task(*args: Any, **kwargs: Any) -> None:
check_alert(**kwargs)


# TODO: make it a task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be a task. The .send function by default will queue a celery task for the actual sending.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense, thanks, removed

def send_notifications(alert: Alert, anomalies_descriptions: list[str]) -> None:
subject = f"PostHog alert {alert.name} has anomalies"
campaign_key = f"alert-anomaly-notification-{alert.id}-{timezone.now().isoformat()}"
insight_url = f"/project/{alert.team.pk}/insights/{alert.insight.short_id}"
alert_url = f"{insight_url}/alerts/{alert.id}"
message = EmailMessage(
campaign_key=campaign_key,
subject=subject,
template_name="alert_anomaly",
template_context={
"anomalies_descriptions": anomalies_descriptions,
"insight_url": insight_url,
"insight_name": alert.insight.name,
"alert_url": alert_url,
"alert_name": alert.name,
},
)
targets = list(filter(len, alert.target_value.split(",")))
if not targets:
raise RuntimeError(f"no targets configured for the alert {alert.id}")
for target in targets:
message.add_recipient(email=target)

logger.info(f"Send notifications about {len(anomalies_descriptions)} anomalies", alert_id=alert.id)
message.send()
Empty file.
Loading
Loading