Tracking anamoly in the data

Tracking anamoly in the data

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np


def anamoly_detection(new: pd.DataFrame):
    new["std"] = new["value"].std()
    new["higher_std_value"] = new["std"] * 2
    new["lower_std_value"] = new["std"] * -2
    new["mean"] = new["value"].mean()
    new["lower_10th_percentile"] = new["value"].quantile(0.1)
    new["upper_10th_percentile"] = new["value"].quantile(0.9)
    new["quantile"] = new["value"].quantile()
    new["percentile_value"] = new["value"].quantile()

    # if |change|> quantile value, then 1 for huge_change
    new["huge_change"] = new["value"].diff().abs() > new["quantile"]
    # percentile value of the observation
    # if mod of change per row of value is greater than quantile value, then anomaly

    new["change"] = new["value"].diff()

    new["percentile_value_of_observation"] = new["value"].rank(pct=True)
    plt.plot(new["date"], new["value"], label="value")
    plt.plot(
        new["date"], new["higher_std_value"], label="higher_std_value", color="green"
    )
    plt.plot(new["date"], new["mean"], label="mean", color="blue")
    plt.plot(new["date"], new["lower_std_value"], label="lower_std_value", color="red")
    plt.plot(
        new["date"],
        new["lower_10th_percentile"],
        label="lower_10th_percentile",
        color="orange",
        linestyle="--",
    )
    plt.plot(
        new["date"],
        new["upper_10th_percentile"],
        label="upper_10th_percentile",
        color="orange",
        linestyle="--",
    )
    plt.xticks(
        ticks=None,
        # rotation=90,
        # fontweight="light",
        # fontsize="x-small",
    )
    point_to_highlight_max = new["value"].max()
    point_to_highlight_min = new["value"].min()
    from sklearn.neighbors import LocalOutlierFactor

    clf = LocalOutlierFactor(n_neighbors=24)
    new["LOF score"] = clf.fit_predict(new[["value"]])
    from sklearn.ensemble import IsolationForest

    model = IsolationForest(
        n_estimators=100,
        max_samples="auto",
        contamination=float(0.3),
        max_features=1.0,
        random_state=42,
    )
    model.fit(new[["value"]])
    new["isolation_function_scores"] = model.decision_function(new[["value"]])
    new["isolation_anomaly_score"] = model.predict(new[["value"]])
    # new.to_csv('anomaly.csv')
    plt.scatter(
        new["date"][new["value"] == point_to_highlight_min],
        new["value"][new["value"] == point_to_highlight_min],
        color="red",
    )
    plt.scatter(
        new["date"][new["isolation_anomaly_score"] == -1],
        new["value"][new["isolation_anomaly_score"] == -1],
        color="black",
        marker="X",
        label="anomaly",
    )
    # plt.scatter(new['date'][new['LOF score'] == -1], new['value'][new['LOF score'] == -1], color='yellow',marker='o',label='LOF')
    plt.scatter(
        new["date"][new["huge_change"] == 1],
        new["value"][new["huge_change"] == 1],
        color="purple",
        marker="D",
        label="huge_change",
    )

    # add point values
    for i in range(len(new["value"])):
        plt.text(new["date"][i], new["value"][i], new["value"][i]).set_fontsize(4)
    plt.legend()
    last_value = new.iloc[-1]
    logic = ""
    score = 0
    given_value = 100
    """
    anamoly score total of 10 points
    # start with tracking broken

    points      logic
    3           if less than or more than given value
    2           if an anomaly is detected, isolationforest
    2           if less than 30% of quantile value
    1           huge change i.e more than quantile
    1           std value for the hour/day in a week/year
    1           lower or equal to 10th percentile
    """
    if last_value.value < given_value:
        logic += f"Last value is less than {given_value} /"
        score += 3
    if last_value.isolation_anomaly_score == -1:
        logic += " Anomaly detected in last point. /"
        score += 2
        # value less than 30% of new['quantile'].min()
    if last_value.value < new["quantile"].min() * 0.3:
        logic += f" value less than 30% of quantile value of {new['quantile'].min()}. /"
        score += 2
    if last_value.huge_change == 1:
        logic += f" Huge change detected for last point. /"
        score += 1
    if last_value.value <= new["lower_10th_percentile"].min():
        logic += f" value less than 10th percentile. /"
        score += 1

    message = f"Last value is {last_value.value}, for date {last_value.date}. / Anamoly Detection score is {score} out of 10. {logic}  "

    s = message.split("/")
    plt.tick_params(
        axis="x",  # changes apply to the x-axis
        which="both",  # both major and minor ticks are affected
        bottom=False,  # ticks along the bottom edge are off
        top=False,  # ticks along the top edge are off
        labelbottom=False,
    )

    # for i in range(len(s)):
    #     plt.gcf().text(1, 1 - (i * 0.1), s[i], fontsize=12)
    # plt.gcf().text(1, 0.9, message, fontsize=12)
    # save the plot as png
    # plt.savefig('./anomaly.png')
    plt.show()