Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

engine/metricsmanager: add daily_alert_metrics table #2272

Merged
merged 8 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion devtools/resetdb/datagen.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (d *datagen) NewAlert(status alert.Status) {
}
d.Alerts = append(d.Alerts, alert.Alert{
ID: len(d.Alerts) + 1,
CreatedAt: gofakeit.DateRange(time.Now().Add(-2*time.Hour), time.Now().Add(-1*time.Hour)),
CreatedAt: gofakeit.DateRange(time.Now().Add(-30*24*time.Hour), time.Now().Add(-1*time.Hour)),
Status: status,
ServiceID: serviceID,
Summary: d.ids.Gen(func() string { return gofakeit.Sentence(rand.Intn(10) + 3) }, serviceID),
Expand Down
33 changes: 31 additions & 2 deletions engine/metricsmanager/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ type DB struct {
db *sql.DB
lock *processinglock.Lock

scanLogs *sql.Stmt
boundNow *sql.Stmt

scanLogs *sql.Stmt
insertMetrics *sql.Stmt

nextDailyMetricsDate *sql.Stmt
insertDailyMetrics *sql.Stmt
}

// Name returns the name of the module.
Expand All @@ -39,10 +43,13 @@ func NewDB(ctx context.Context, db *sql.DB) (*DB, error) {
db: db,
lock: lock,

// NOTE: this buffer provides time for in-flight requests to settle
boundNow: p.P(`select now() - '2 minutes'::interval`),

scanLogs: p.P(`
select alert_id, timestamp, id
from alert_logs
where event='closed' and (timestamp > $1 and timestamp < now() - '2 minutes'::interval or (timestamp = $1 and id > $2))
where event='closed' and timestamp < $3 and (timestamp > $1 or (timestamp = $1 and id > $2))
order by timestamp, id
limit 500`),

Expand All @@ -59,5 +66,27 @@ func NewDB(ctx context.Context, db *sql.DB) (*DB, error) {
where a.id = any($1) and a.service_id is not null
on conflict do nothing
`),

nextDailyMetricsDate: p.P(`
select (date(timezone('UTC'::text, closed_at))) from alert_metrics
where (date(timezone('UTC'::text, closed_at))) > $1::date
and (date(timezone('UTC'::text, closed_at))) < $2::date
order by (date(timezone('UTC'::text, closed_at)))
limit 1;
`),

insertDailyMetrics: p.P(`
insert into daily_alert_metrics (date, service_id, alert_count, avg_time_to_ack, avg_time_to_close, escalated_count)
select
$1::date,
service_id,
count(*),
avg(time_to_ack),
avg(time_to_close),
count(*) filter (where escalated=true)
from alert_metrics
where (date(timezone('UTC'::text, closed_at))) = $1
group by service_id;
`),
}, p.Err
}
111 changes: 100 additions & 11 deletions engine/metricsmanager/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metricsmanager
import (
"context"
"database/sql"
"errors"
"fmt"
"time"

Expand All @@ -19,26 +20,44 @@ type State struct {

// LastLogID breaks ties for the LastLogTime cursor
LastLogID int

// LastMetricsDate is a cursor for processed alert_metrics
LastMetricsDate time.Time
}
}

func (db *DB) UpdateAll(ctx context.Context) error {
err := permission.LimitCheckAny(ctx, permission.System)
if err != nil {
return err
}

err = db.UpdateAlertMetrics(ctx)
if err != nil {
return err
}

err = db.UpdateDailyAlertMetrics(ctx)
if err != nil {
return err
}

return nil
}

// UpdateAlertMetrics will update the alert metrics table
/*
Theory of Operation:

1. Aquire processing lock
2. Get batch of oldest alert IDs after cursor (use LastLogID as a tie breaker)
3. Insert metrics for these alerts
4. Set cursor to last inserted
5. If none inserted, set cursor to upper time bound

*/

// UpdateAll will update the alert metrics table
func (db *DB) UpdateAll(ctx context.Context) error {
err := permission.LimitCheckAny(ctx, permission.System)
if err != nil {
return err
}
log.Debugf(ctx, "Running metrics operations.")
func (db *DB) UpdateAlertMetrics(ctx context.Context) error {
log.Debugf(ctx, "Running alert_metrics operations.")

tx, lockState, err := db.lock.BeginTxWithState(ctx, nil)
if err != nil {
Expand All @@ -47,16 +66,21 @@ func (db *DB) UpdateAll(ctx context.Context) error {
defer tx.Rollback()

var alertIDs []int
var lastLogTime time.Time
var lastLogTime, boundNow time.Time
var lastLogID int
var state State
err = lockState.Load(ctx, &state)
if err != nil {
return fmt.Errorf("load state: %w", err)
}

err = tx.StmtContext(ctx, db.boundNow).QueryRowContext(ctx).Scan(&boundNow)
if err != nil {
return fmt.Errorf("select bound now: %w", err)
}

var rows *sql.Rows
rows, err = tx.StmtContext(ctx, db.scanLogs).QueryContext(ctx, state.V2.LastLogTime, state.V2.LastLogID)
rows, err = tx.StmtContext(ctx, db.scanLogs).QueryContext(ctx, state.V2.LastLogTime, state.V2.LastLogID, boundNow)
if err != nil {
return fmt.Errorf("scan logs: %w", err)
}
Expand All @@ -78,13 +102,77 @@ func (db *DB) UpdateAll(ctx context.Context) error {
return fmt.Errorf("insert metrics: %w", err)
}

// update and save state
// update state
state.V2.LastLogTime = lastLogTime
state.V2.LastLogID = lastLogID

} else {
// update state
state.V2.LastLogTime = boundNow
state.V2.LastLogID = 0
}

// save state
err = lockState.Save(ctx, &state)
if err != nil {
return fmt.Errorf("save state: %w", err)
}

err = tx.Commit()
if err != nil {
return fmt.Errorf("commit: %w", err)
}

return nil
}

// UpdateDailyAlertMetrics will update the daily alert metrics table
/*
Theory of Operation:

1. Acquire processing lock
2. Get next date to process (min date after LastMetricsDate, before LastLogTime's date)
3. Insert daily metrics for this date
4. Set cursor to this date

*/
func (db *DB) UpdateDailyAlertMetrics(ctx context.Context) error {
log.Debugf(ctx, "Running daily_alert_metrics operations.")

tx, lockState, err := db.lock.BeginTxWithState(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()

var nextDate sql.NullTime
var state State
err = lockState.Load(ctx, &state)
if err != nil {
return fmt.Errorf("load state: %w", err)
}

err = tx.StmtContext(ctx, db.nextDailyMetricsDate).QueryRowContext(ctx, state.V2.LastMetricsDate, state.V2.LastLogTime).Scan(&nextDate)
if errors.Is(err, sql.ErrNoRows) {
err = nil
}
if err != nil {
return fmt.Errorf("select next daily metrics date: %w", err)
}

if nextDate.Valid {

_, err = tx.StmtContext(ctx, db.insertDailyMetrics).ExecContext(ctx, nextDate)
if err != nil {
return fmt.Errorf("insert daily metrics: %w", err)
}

state.V2.LastMetricsDate = nextDate.Time
err = lockState.Save(ctx, &state)
if err != nil {
return fmt.Errorf("save state: %w", err)
}

}

err = tx.Commit()
Expand All @@ -93,4 +181,5 @@ func (db *DB) UpdateAll(ctx context.Context) error {
}

return nil

}
25 changes: 25 additions & 0 deletions migrate/migrations/20220329170727-add-daily-alert-metrics.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- +migrate Up

CREATE TABLE daily_alert_metrics (
id BIGSERIAL PRIMARY KEY,
service_id UUID NOT NULL,
date DATE NOT NULL,
alert_count INT DEFAULT 0 NOT NULL,
avg_time_to_ack INTERVAL,
avg_time_to_close INTERVAL,
escalated_count INT DEFAULT 0 NOT NULL,

UNIQUE(service_id, date)
);

CREATE INDEX alert_metrics_closed_date_idx ON alert_metrics (DATE(closed_at AT TIME ZONE 'UTC' ) ASC);

-- +migrate Down

DROP INDEX alert_metrics_closed_date_idx;

DROP TABLE daily_alert_metrics;

UPDATE engine_processing_versions
SET state = DEFAULT
WHERE type_id = 'metrics';