From 054b26958d6a8e4bbd2af410f1e88af039719703 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Wed, 5 Feb 2025 16:51:04 -0600 Subject: [PATCH 01/20] feat(rotation): enhance rotation management with new database integration and state handling --- engine/engine.go | 2 +- engine/rotationmanager/advance.go | 5 +- engine/rotationmanager/db.go | 7 +- engine/rotationmanager/queries.sql | 56 ++++++++++++ engine/rotationmanager/setup.go | 39 ++++++++ engine/rotationmanager/updaterotation.go | 90 +++++++++++++++++++ gadb/queries.sql.go | 110 +++++++++++++++++++++++ schedule/rotation/update.go | 8 ++ 8 files changed, 313 insertions(+), 4 deletions(-) create mode 100644 engine/rotationmanager/queries.sql create mode 100644 engine/rotationmanager/setup.go create mode 100644 engine/rotationmanager/updaterotation.go create mode 100644 schedule/rotation/update.go diff --git a/engine/engine.go b/engine/engine.go index 1eac751e73..cac6d43df5 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -95,7 +95,7 @@ func NewEngine(ctx context.Context, db *sql.DB, c *Config) (*Engine, error) { return nil, err } - rotMgr, err := rotationmanager.NewDB(ctx, db) + rotMgr, err := rotationmanager.NewDB(ctx, db, c.RiverDBSQL) if err != nil { return nil, errors.Wrap(err, "rotation management backend") } diff --git a/engine/rotationmanager/advance.go b/engine/rotationmanager/advance.go index 7c62abc329..b3647bc9f5 100644 --- a/engine/rotationmanager/advance.go +++ b/engine/rotationmanager/advance.go @@ -17,8 +17,9 @@ type advance struct { } type rotState struct { - rotation.State - Version int + ShiftStart time.Time + Position int + Version int } // calcAdvance will calculate rotation advancement if it is required. If not, nil is returned diff --git a/engine/rotationmanager/db.go b/engine/rotationmanager/db.go index e317b519ad..b66f22ba99 100644 --- a/engine/rotationmanager/db.go +++ b/engine/rotationmanager/db.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" + "github.com/riverqueue/river" "github.com/target/goalert/engine/processinglock" "github.com/target/goalert/util" ) @@ -17,13 +18,15 @@ type DB struct { lockPart *sql.Stmt rotate *sql.Stmt rotateData *sql.Stmt + + riverDBSQL *river.Client[*sql.Tx] } // Name returns the name of the module. func (db *DB) Name() string { return "Engine.RotationManager" } // NewDB will create a new DB, preparing all statements necessary. -func NewDB(ctx context.Context, db *sql.DB) (*DB, error) { +func NewDB(ctx context.Context, db *sql.DB, riverDBSQL *river.Client[*sql.Tx]) (*DB, error) { lock, err := processinglock.NewLock(ctx, db, processinglock.Config{ Type: processinglock.TypeRotation, Version: 2, @@ -36,6 +39,8 @@ func NewDB(ctx context.Context, db *sql.DB) (*DB, error) { return &DB{ lock: lock, + riverDBSQL: riverDBSQL, + currentTime: p.P(`select now()`), lockPart: p.P(`lock rotation_participants, rotation_state in exclusive mode`), rotate: p.P(` diff --git a/engine/rotationmanager/queries.sql b/engine/rotationmanager/queries.sql new file mode 100644 index 0000000000..9fff452eb1 --- /dev/null +++ b/engine/rotationmanager/queries.sql @@ -0,0 +1,56 @@ +-- name: RotMgrRotationData :one +-- Get rotation data for a given rotation ID +SELECT + now()::timestamptz AS now, + rot.id, + rot.type, + rot.start_time, + rot.shift_length, + rot.time_zone, + state.position, + state.shift_start, + ARRAY ( + SELECT + p.id + FROM + rotation_participants p + WHERE + p.rotation_id = rot.id + ORDER BY + position)::uuid[] AS participants + FROM + rotations rot + LEFT JOIN rotation_state state ON rot.id = state.rotation_id +WHERE + rot.id = @rotation_id; + +-- name: RotMgrStart :exec +-- Start a rotation. +INSERT INTO rotation_state(rotation_id, position, shift_start, rotation_participant_id) +SELECT + p.rotation_id, + 0, + now(), + id +FROM + rotation_participants p +WHERE + p.rotation_id = @rotation_id + AND position = 0; + +-- name: RotMgrEnd :exec +-- End a rotation. +DELETE FROM rotation_state +WHERE rotation_id = @rotation_id; + +-- name: RotMgrUpdate :exec +-- Update the rotation state. +UPDATE + rotation_state +SET + position = @position, + shift_start = now(), + rotation_participant_id = @rotation_participant_id +WHERE + rotation_id = @rotation_id; + diff --git a/engine/rotationmanager/setup.go b/engine/rotationmanager/setup.go new file mode 100644 index 0000000000..6526e6ef88 --- /dev/null +++ b/engine/rotationmanager/setup.go @@ -0,0 +1,39 @@ +package rotationmanager + +import ( + "context" + "fmt" + + "github.com/riverqueue/river" + "github.com/target/goalert/engine/processinglock" + "github.com/target/goalert/event" + "github.com/target/goalert/schedule/rotation" +) + +const ( + QueueName = "rotation-manager" + PriorityLookForWork = 2 + PriorityCleanup = 3 + PriorityProcess = 4 +) + +var _ processinglock.Setupable = &DB{} + +// Setup implements processinglock.Setupable. +func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { + river.AddWorker(args.Workers, river.WorkFunc(db.updateRotation)) + + event.RegisterJobSource(args.EventBus, func(data rotation.Update) (river.JobArgs, *river.InsertOpts) { + return UpdateArgs{RotationID: data.ID}, &river.InsertOpts{ + Queue: QueueName, + Priority: PriorityLookForWork, + } + }) + + err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 5}) + if err != nil { + return fmt.Errorf("add queue: %w", err) + } + + return nil +} diff --git a/engine/rotationmanager/updaterotation.go b/engine/rotationmanager/updaterotation.go new file mode 100644 index 0000000000..1a608d4ed1 --- /dev/null +++ b/engine/rotationmanager/updaterotation.go @@ -0,0 +1,90 @@ +package rotationmanager + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/riverqueue/river" + "github.com/target/goalert/gadb" + "github.com/target/goalert/schedule/rotation" + "github.com/target/goalert/util" +) + +type UpdateArgs struct { + RotationID uuid.UUID +} + +func (UpdateArgs) Kind() string { return "rotation-manager-update" } + +// cleanup is a worker function that will remove any stale subscriptions. +func (db *DB) updateRotation(ctx context.Context, j *river.Job[UpdateArgs]) error { + return db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { + g := gadb.New(tx) + + row, err := g.RotMgrRotationData(ctx, j.Args.RotationID) + if err != nil { + return err + } + + if len(row.Participants) == 0 { + if row.Position.Valid { + // no participants, but we have a position, so clear it + return g.RotMgrEnd(ctx, j.Args.RotationID) + } + + return nil + } + + loc, err := util.LoadLocation(row.TimeZone) + if err != nil { + return fmt.Errorf("load location: %w", err) + } + + r := rotation.Rotation{ + Type: rotation.Type(row.Type), + Start: row.StartTime.In(loc), + ShiftLength: int(row.ShiftLength), + } + + // schedule next run + _, err = db.riverDBSQL.InsertTx(ctx, tx, UpdateArgs{RotationID: j.Args.RotationID}, &river.InsertOpts{ + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + ByPeriod: time.Minute, + }, + ScheduledAt: r.EndTime(row.Now), + }) + if err != nil { + return fmt.Errorf("schedule next run: %w", err) + } + + if !row.Position.Valid { + // no state, but we have participants, so start at the beginning + return g.RotMgrStart(ctx, j.Args.RotationID) + } + + s := rotState{ + ShiftStart: row.ShiftStart.Time.In(loc), + Position: int(row.Position.Int32), + } + adv := calcAdvance(ctx, row.Now, &r, s, len(row.Participants)) + if adv == nil { + // no advancement needed + return nil + } + + err = g.RotMgrUpdate(ctx, gadb.RotMgrUpdateParams{ + RotationID: j.Args.RotationID, + Position: int32(adv.newPosition), + RotationParticipantID: row.Participants[adv.newPosition], + }) + if err != nil { + return fmt.Errorf("update rotation state (advance): %w", err) + } + + return nil + }) +} diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index cd6a107103..9b8bf85558 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -4175,6 +4175,116 @@ func (q *Queries) RequestAlertEscalationByTime(ctx context.Context, arg RequestA return column_1, err } +const rotMgrEnd = `-- name: RotMgrEnd :exec +DELETE FROM rotation_state +WHERE rotation_id = $1 +` + +// End a rotation. +func (q *Queries) RotMgrEnd(ctx context.Context, rotationID uuid.UUID) error { + _, err := q.db.ExecContext(ctx, rotMgrEnd, rotationID) + return err +} + +const rotMgrRotationData = `-- name: RotMgrRotationData :one +SELECT + now()::timestamptz AS now, + rot.id, + rot.type, + rot.start_time, + rot.shift_length, + rot.time_zone, + state.position, + state.shift_start, + ARRAY ( + SELECT + p.id + FROM + rotation_participants p + WHERE + p.rotation_id = rot.id + ORDER BY + position)::uuid[] AS participants + FROM + rotations rot + LEFT JOIN rotation_state state ON rot.id = state.rotation_id +WHERE + rot.id = $1 +` + +type RotMgrRotationDataRow struct { + Now time.Time + ID uuid.UUID + Type EnumRotationType + StartTime time.Time + ShiftLength int64 + TimeZone string + Position sql.NullInt32 + ShiftStart sql.NullTime + Participants []uuid.UUID +} + +// Get rotation data for a given rotation ID +func (q *Queries) RotMgrRotationData(ctx context.Context, rotationID uuid.UUID) (RotMgrRotationDataRow, error) { + row := q.db.QueryRowContext(ctx, rotMgrRotationData, rotationID) + var i RotMgrRotationDataRow + err := row.Scan( + &i.Now, + &i.ID, + &i.Type, + &i.StartTime, + &i.ShiftLength, + &i.TimeZone, + &i.Position, + &i.ShiftStart, + pq.Array(&i.Participants), + ) + return i, err +} + +const rotMgrStart = `-- name: RotMgrStart :exec +INSERT INTO rotation_state(rotation_id, position, shift_start, rotation_participant_id) +SELECT + p.rotation_id, + 0, + now(), + id +FROM + rotation_participants p +WHERE + p.rotation_id = $1 + AND position = 0 +` + +// Start a rotation. +func (q *Queries) RotMgrStart(ctx context.Context, rotationID uuid.UUID) error { + _, err := q.db.ExecContext(ctx, rotMgrStart, rotationID) + return err +} + +const rotMgrUpdate = `-- name: RotMgrUpdate :exec +UPDATE + rotation_state +SET + position = $1, + shift_start = now(), + rotation_participant_id = $2 +WHERE + rotation_id = $3 +` + +type RotMgrUpdateParams struct { + Position int32 + RotationParticipantID uuid.UUID + RotationID uuid.UUID +} + +// Update the rotation state. +func (q *Queries) RotMgrUpdate(ctx context.Context, arg RotMgrUpdateParams) error { + _, err := q.db.ExecContext(ctx, rotMgrUpdate, arg.Position, arg.RotationParticipantID, arg.RotationID) + return err +} + const sWOConnLock = `-- name: SWOConnLock :one WITH LOCK AS ( SELECT diff --git a/schedule/rotation/update.go b/schedule/rotation/update.go new file mode 100644 index 0000000000..eabc0ac9f0 --- /dev/null +++ b/schedule/rotation/update.go @@ -0,0 +1,8 @@ +package rotation + +import "github.com/google/uuid" + +// Update is an event triggered when a rotation is updated. +type Update struct { + ID uuid.UUID +} From 379a34cab2517880a199aa5667b3b2d8f1dd098a Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Wed, 5 Feb 2025 17:11:34 -0600 Subject: [PATCH 02/20] feat(rotation): implement tracking for rotation updates with database triggers --- engine/rotationmanager/updaterotation.go | 5 ++ .../20250205165505-track-rotation-updates.sql | 58 +++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 migrate/migrations/20250205165505-track-rotation-updates.sql diff --git a/engine/rotationmanager/updaterotation.go b/engine/rotationmanager/updaterotation.go index 1a608d4ed1..d51b2febc4 100644 --- a/engine/rotationmanager/updaterotation.go +++ b/engine/rotationmanager/updaterotation.go @@ -3,6 +3,7 @@ package rotationmanager import ( "context" "database/sql" + "errors" "fmt" "time" @@ -25,6 +26,10 @@ func (db *DB) updateRotation(ctx context.Context, j *river.Job[UpdateArgs]) erro g := gadb.New(tx) row, err := g.RotMgrRotationData(ctx, j.Args.RotationID) + if errors.Is(err, sql.ErrNoRows) { + // no longer exists, so nothing to do + return nil + } if err != nil { return err } diff --git a/migrate/migrations/20250205165505-track-rotation-updates.sql b/migrate/migrations/20250205165505-track-rotation-updates.sql new file mode 100644 index 0000000000..a83c090a1b --- /dev/null +++ b/migrate/migrations/20250205165505-track-rotation-updates.sql @@ -0,0 +1,58 @@ +-- +migrate Up +CREATE TABLE entity_updates( + id bigserial PRIMARY KEY, + entity_type text NOT NULL, + entity_id uuid NOT NULL, + created_at timestamptz NOT NULL DEFAULT now() +) +WITH ( + autovacuum_vacuum_threshold = 50, -- Lower threshold to trigger vacuum sooner + autovacuum_vacuum_scale_factor = 0.05 -- Lower scale factor for frequent vacuuming +); + +CREATE INDEX idx_entity_updates_entity_type ON entity_updates(entity_type); + +INSERT INTO entity_updates(entity_type, entity_id) +SELECT + 'rotation', + id +FROM + rotations; + +CREATE OR REPLACE FUNCTION track_rotation_updates() + RETURNS TRIGGER + AS $$ +BEGIN + INSERT INTO entity_updates(entity_type, entity_id) + VALUES('rotation', NEW.id); + RETURN new; +END; +$$ +LANGUAGE plpgsql; + +CREATE TRIGGER track_rotation_updates + AFTER INSERT OR UPDATE ON rotations FOR EACH ROW + EXECUTE FUNCTION track_rotation_updates(); + +CREATE OR REPLACE FUNCTION track_rotation_dep_updates() + RETURNS TRIGGER + AS $$ +BEGIN + INSERT INTO entity_updates(entity_type, entity_id) + VALUES('rotation', NEW.rotation_id); + RETURN new; +END; +$$ +LANGUAGE plpgsql; + +CREATE TRIGGER track_rotation_state_updates + AFTER INSERT OR UPDATE ON rotation_state FOR EACH ROW + EXECUTE FUNCTION track_rotation_dep_updates(); + +CREATE TRIGGER track_rotation_participants_updates + AFTER INSERT OR UPDATE OR DELETE ON rotation_participants FOR EACH ROW + EXECUTE FUNCTION track_rotation_dep_updates(); + +-- +migrate Down +DROP TABLE entity_updates; + From 07426f737c585f61d39e12bf9388fefb097fc4af Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Thu, 6 Feb 2025 10:42:53 -0600 Subject: [PATCH 03/20] feat(rotation): implement entity update tracking and rotation work management --- engine/rotationmanager/lookforwork.go | 62 +++++++++++++++++++ engine/rotationmanager/queries.sql | 25 ++++++++ engine/rotationmanager/setup.go | 25 ++++++-- engine/rotationmanager/updaterotation.go | 1 + gadb/models.go | 7 +++ gadb/queries.sql.go | 49 +++++++++++++++ .../20250205165505-track-rotation-updates.sql | 59 +++++++++--------- migrate/schema.sql | 43 ++++++++++++- 8 files changed, 234 insertions(+), 37 deletions(-) create mode 100644 engine/rotationmanager/lookforwork.go diff --git a/engine/rotationmanager/lookforwork.go b/engine/rotationmanager/lookforwork.go new file mode 100644 index 0000000000..c27d8a0479 --- /dev/null +++ b/engine/rotationmanager/lookforwork.go @@ -0,0 +1,62 @@ +package rotationmanager + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/riverqueue/river" + "github.com/target/goalert/gadb" +) + +type LookForWorkArgs struct{} + +func (LookForWorkArgs) Kind() string { return "rotation-manager-lfw" } + +// cleanup is a worker function that will remove any stale subscriptions. +func (db *DB) lookForWork(ctx context.Context, j *river.Job[LookForWorkArgs]) error { + var hadWork bool + err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { + g := gadb.New(tx) + + rotations, err := g.RotMgrFindWork(ctx) + if errors.Is(err, sql.ErrNoRows) { + // done, no more work + return nil + } + if err != nil { + return fmt.Errorf("find work: %w", err) + } + if len(rotations) == 0 { + return nil + } + + var params []river.InsertManyParams + for _, r := range rotations { + params = append(params, river.InsertManyParams{ + Args: UpdateArgs{RotationID: r}, + InsertOpts: &river.InsertOpts{ + Queue: QueueName, + Priority: PriorityEvent, + }, + }) + } + + _, err = db.riverDBSQL.InsertManyFastTx(ctx, tx, params) + if err != nil { + return fmt.Errorf("insert many: %w", err) + } + hadWork = true + return nil + }) + if err != nil { + return fmt.Errorf("look for work: %w", err) + } + if !hadWork { + return nil + } + + return river.JobSnooze(time.Second) +} diff --git a/engine/rotationmanager/queries.sql b/engine/rotationmanager/queries.sql index 9fff452eb1..d2bf9a8e2d 100644 --- a/engine/rotationmanager/queries.sql +++ b/engine/rotationmanager/queries.sql @@ -54,3 +54,28 @@ SET WHERE rotation_id = @rotation_id; +-- name: RotMgrFindWork :many +WITH items AS ( + SELECT + id, + entity_id + FROM + entity_updates + WHERE + entity_type = 'rotation' + FOR UPDATE + SKIP LOCKED + LIMIT 1000 +), +_delete AS ( + DELETE FROM entity_updates + WHERE id IN ( + SELECT + id + FROM + items)) +SELECT DISTINCT + entity_id +FROM + items; + diff --git a/engine/rotationmanager/setup.go b/engine/rotationmanager/setup.go index 6526e6ef88..0fe8d9f095 100644 --- a/engine/rotationmanager/setup.go +++ b/engine/rotationmanager/setup.go @@ -3,6 +3,7 @@ package rotationmanager import ( "context" "fmt" + "time" "github.com/riverqueue/river" "github.com/target/goalert/engine/processinglock" @@ -11,10 +12,10 @@ import ( ) const ( - QueueName = "rotation-manager" - PriorityLookForWork = 2 - PriorityCleanup = 3 - PriorityProcess = 4 + QueueName = "rotation-manager" + PriorityScheduled = 1 + PriorityEvent = 2 + PriorityLFW = 4 ) var _ processinglock.Setupable = &DB{} @@ -22,11 +23,12 @@ var _ processinglock.Setupable = &DB{} // Setup implements processinglock.Setupable. func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.AddWorker(args.Workers, river.WorkFunc(db.updateRotation)) + river.AddWorker(args.Workers, river.WorkFunc(db.lookForWork)) event.RegisterJobSource(args.EventBus, func(data rotation.Update) (river.JobArgs, *river.InsertOpts) { return UpdateArgs{RotationID: data.ID}, &river.InsertOpts{ Queue: QueueName, - Priority: PriorityLookForWork, + Priority: PriorityEvent, } }) @@ -35,5 +37,18 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { return fmt.Errorf("add queue: %w", err) } + args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ + river.NewPeriodicJob( + river.PeriodicInterval(time.Minute), + func() (river.JobArgs, *river.InsertOpts) { + return LookForWorkArgs{}, &river.InsertOpts{ + Queue: QueueName, + Priority: PriorityLFW, + } + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + }) + return nil } diff --git a/engine/rotationmanager/updaterotation.go b/engine/rotationmanager/updaterotation.go index d51b2febc4..78e531d761 100644 --- a/engine/rotationmanager/updaterotation.go +++ b/engine/rotationmanager/updaterotation.go @@ -60,6 +60,7 @@ func (db *DB) updateRotation(ctx context.Context, j *river.Job[UpdateArgs]) erro ByArgs: true, ByPeriod: time.Minute, }, + Priority: PriorityScheduled, ScheduledAt: r.EndTime(row.Now), }) if err != nil { diff --git a/gadb/models.go b/gadb/models.go index 157b0adfed..ef46f3b541 100644 --- a/gadb/models.go +++ b/gadb/models.go @@ -926,6 +926,13 @@ type EngineProcessingVersion struct { Version int32 } +type EntityUpdate struct { + CreatedAt time.Time + EntityID uuid.UUID + EntityType string + ID int64 +} + type EpStepOnCallUser struct { EndTime sql.NullTime EpStepID uuid.UUID diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index 9b8bf85558..a4fc10e129 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -4186,6 +4186,55 @@ func (q *Queries) RotMgrEnd(ctx context.Context, rotationID uuid.UUID) error { return err } +const rotMgrFindWork = `-- name: RotMgrFindWork :many +WITH items AS ( + SELECT + id, + entity_id + FROM + entity_updates + WHERE + entity_type = 'rotation' + FOR UPDATE + SKIP LOCKED + LIMIT 1000 +), +_delete AS ( + DELETE FROM entity_updates + WHERE id IN ( + SELECT + id + FROM + items)) +SELECT DISTINCT + entity_id +FROM + items +` + +func (q *Queries) RotMgrFindWork(ctx context.Context) ([]uuid.UUID, error) { + rows, err := q.db.QueryContext(ctx, rotMgrFindWork) + if err != nil { + return nil, err + } + defer rows.Close() + var items []uuid.UUID + for rows.Next() { + var entity_id uuid.UUID + if err := rows.Scan(&entity_id); err != nil { + return nil, err + } + items = append(items, entity_id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const rotMgrRotationData = `-- name: RotMgrRotationData :one SELECT now()::timestamptz AS now, diff --git a/migrate/migrations/20250205165505-track-rotation-updates.sql b/migrate/migrations/20250205165505-track-rotation-updates.sql index a83c090a1b..058a5d5705 100644 --- a/migrate/migrations/20250205165505-track-rotation-updates.sql +++ b/migrate/migrations/20250205165505-track-rotation-updates.sql @@ -12,46 +12,47 @@ WITH ( CREATE INDEX idx_entity_updates_entity_type ON entity_updates(entity_type); -INSERT INTO entity_updates(entity_type, entity_id) -SELECT - 'rotation', - id -FROM - rotations; - -CREATE OR REPLACE FUNCTION track_rotation_updates() +CREATE OR REPLACE FUNCTION fn_track_rotation_updates() RETURNS TRIGGER AS $$ BEGIN - INSERT INTO entity_updates(entity_type, entity_id) - VALUES('rotation', NEW.id); - RETURN new; + IF TG_TABLE_NAME = 'rotations' THEN + INSERT INTO entity_updates(entity_type, entity_id) + VALUES('rotation', NEW.id); + ELSIF TG_OP = 'DELETE' THEN + INSERT INTO entity_updates(entity_type, entity_id) + VALUES('rotation', OLD.rotation_id); + RETURN OLD; + ELSE + INSERT INTO entity_updates(entity_type, entity_id) + VALUES('rotation', NEW.rotation_id); + END IF; + RETURN NEW; END; $$ LANGUAGE plpgsql; -CREATE TRIGGER track_rotation_updates - AFTER INSERT OR UPDATE ON rotations FOR EACH ROW - EXECUTE FUNCTION track_rotation_updates(); +CREATE TRIGGER trg_track_rotation_updates + AFTER INSERT OR UPDATE ON rotations + FOR EACH ROW + EXECUTE FUNCTION fn_track_rotation_updates(); -CREATE OR REPLACE FUNCTION track_rotation_dep_updates() - RETURNS TRIGGER - AS $$ -BEGIN - INSERT INTO entity_updates(entity_type, entity_id) - VALUES('rotation', NEW.rotation_id); - RETURN new; -END; -$$ -LANGUAGE plpgsql; +CREATE TRIGGER trg_track_rotation_updates + AFTER UPDATE ON rotation_state + FOR EACH ROW + EXECUTE FUNCTION fn_track_rotation_updates(); CREATE TRIGGER track_rotation_state_updates - AFTER INSERT OR UPDATE ON rotation_state FOR EACH ROW - EXECUTE FUNCTION track_rotation_dep_updates(); + AFTER INSERT OR UPDATE OR DELETE ON rotation_participants + FOR EACH ROW + EXECUTE FUNCTION fn_track_rotation_updates(); -CREATE TRIGGER track_rotation_participants_updates - AFTER INSERT OR UPDATE OR DELETE ON rotation_participants FOR EACH ROW - EXECUTE FUNCTION track_rotation_dep_updates(); +INSERT INTO entity_updates(entity_type, entity_id) +SELECT + 'rotation', + id +FROM + rotations; -- +migrate Down DROP TABLE entity_updates; diff --git a/migrate/schema.sql b/migrate/schema.sql index 717c44399e..586db6ffcd 100644 --- a/migrate/schema.sql +++ b/migrate/schema.sql @@ -1,7 +1,7 @@ -- This file is auto-generated by "make db-schema"; DO NOT EDIT --- DATA=a04ad6e24396ac3db803fcd0841d859677a35a3152da216750bbf99c355cb2e2 - --- DISK=b9de2c2d582024716605a046fe0ddcf35748820896a5cef0bf4effa297af79e4 - --- PSQL=b9de2c2d582024716605a046fe0ddcf35748820896a5cef0bf4effa297af79e4 - +-- DATA=0d4a01d6ded7ca0cb412d82228deac104404d35f6ba8128f0ce543a0e7367623 - +-- DISK=e363175696c65c32920016cc9b06ae1551c72ce8f89302a7f139befdc886c19f - +-- PSQL=e363175696c65c32920016cc9b06ae1551c72ce8f89302a7f139befdc886c19f - -- -- pgdump-lite database dump -- @@ -1289,6 +1289,27 @@ END; $function$ ; +CREATE OR REPLACE FUNCTION public.fn_track_rotation_updates() + RETURNS trigger + LANGUAGE plpgsql +AS $function$ +BEGIN + IF TG_TABLE_NAME = 'rotations' THEN + INSERT INTO entity_updates(entity_type, entity_id) + VALUES('rotation', NEW.id); + ELSIF TG_OP = 'DELETE' THEN + INSERT INTO entity_updates(entity_type, entity_id) + VALUES('rotation', OLD.rotation_id); + RETURN OLD; + ELSE + INSERT INTO entity_updates(entity_type, entity_id) + VALUES('rotation', NEW.rotation_id); + END IF; + RETURN NEW; +END; +$function$ +; + CREATE OR REPLACE FUNCTION public.fn_trig_alert_on_force_escalation() RETURNS trigger LANGUAGE plpgsql @@ -1699,6 +1720,18 @@ CREATE TABLE engine_processing_versions ( CREATE UNIQUE INDEX engine_processing_versions_pkey ON public.engine_processing_versions USING btree (type_id); +CREATE TABLE entity_updates ( + created_at timestamp with time zone DEFAULT now() NOT NULL, + entity_id uuid NOT NULL, + entity_type text NOT NULL, + id bigint DEFAULT nextval('entity_updates_id_seq'::regclass) NOT NULL, + CONSTRAINT entity_updates_pkey PRIMARY KEY (id) +); + +CREATE UNIQUE INDEX entity_updates_pkey ON public.entity_updates USING btree (id); +CREATE INDEX idx_entity_updates_entity_type ON public.entity_updates USING btree (entity_type); + + CREATE TABLE ep_step_on_call_users ( end_time timestamp with time zone, ep_step_id uuid NOT NULL, @@ -2207,6 +2240,7 @@ CREATE INDEX idx_participant_rotation ON public.rotation_participants USING btre CREATE UNIQUE INDEX rotation_participants_pkey ON public.rotation_participants USING btree (id); CREATE UNIQUE INDEX rotation_participants_rotation_id_position_key ON public.rotation_participants USING btree (rotation_id, "position"); +CREATE TRIGGER track_rotation_state_updates AFTER INSERT OR DELETE OR UPDATE ON public.rotation_participants FOR EACH ROW EXECUTE FUNCTION fn_track_rotation_updates(); CREATE TRIGGER trg_10_decr_part_count_on_del BEFORE DELETE ON public.rotation_participants FOR EACH ROW EXECUTE FUNCTION fn_decr_part_count_on_del(); CREATE TRIGGER trg_20_decr_rot_part_position_on_delete AFTER DELETE ON public.rotation_participants FOR EACH ROW EXECUTE FUNCTION fn_decr_rot_part_position_on_delete(); CREATE TRIGGER trg_30_advance_or_end_rot_on_part_del BEFORE DELETE ON public.rotation_participants FOR EACH ROW EXECUTE FUNCTION fn_advance_or_end_rot_on_part_del(); @@ -2235,6 +2269,7 @@ CREATE UNIQUE INDEX rotation_state_pkey ON public.rotation_state USING btree (ro CREATE UNIQUE INDEX rotation_state_uniq_id ON public.rotation_state USING btree (id); CREATE TRIGGER trg_set_rot_state_pos_on_active_change BEFORE UPDATE ON public.rotation_state FOR EACH ROW WHEN ((new.rotation_participant_id <> old.rotation_participant_id)) EXECUTE FUNCTION fn_set_rot_state_pos_on_active_change(); +CREATE TRIGGER trg_track_rotation_updates AFTER UPDATE ON public.rotation_state FOR EACH ROW EXECUTE FUNCTION fn_track_rotation_updates(); CREATE TABLE rotations ( @@ -2258,6 +2293,8 @@ CREATE UNIQUE INDEX rotations_name ON public.rotations USING btree (lower(name)) CREATE UNIQUE INDEX rotations_name_unique ON public.rotations USING btree (name); CREATE UNIQUE INDEX rotations_pkey ON public.rotations USING btree (id); +CREATE TRIGGER trg_track_rotation_updates AFTER INSERT OR UPDATE ON public.rotations FOR EACH ROW EXECUTE FUNCTION fn_track_rotation_updates(); + CREATE TABLE schedule_data ( data jsonb NOT NULL, From 40d58da57d162779d1af4bccd496de3c89c063b5 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Thu, 6 Feb 2025 10:51:28 -0600 Subject: [PATCH 04/20] feat(rotation): integrate event bus into rotation store for enhanced event handling --- app/initstores.go | 2 +- schedule/rotation/store.go | 24 +++++++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/app/initstores.go b/app/initstores.go index d204e3736b..cd1e8272f6 100644 --- a/app/initstores.go +++ b/app/initstores.go @@ -194,7 +194,7 @@ func (app *App) initStores(ctx context.Context) error { } if app.RotationStore == nil { - app.RotationStore, err = rotation.NewStore(ctx, app.db) + app.RotationStore, err = rotation.NewStore(ctx, app.db, app.EventBus) } if err != nil { return errors.Wrap(err, "init rotation store") diff --git a/schedule/rotation/store.go b/schedule/rotation/store.go index 8ee5425d9a..c2c8ccbd14 100644 --- a/schedule/rotation/store.go +++ b/schedule/rotation/store.go @@ -8,6 +8,7 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/target/goalert/assignment" + "github.com/target/goalert/event" "github.com/target/goalert/permission" "github.com/target/goalert/util" "github.com/target/goalert/util/sqlutil" @@ -19,7 +20,8 @@ import ( var ErrNoState = errors.New("no state available") type Store struct { - db *sql.DB + db *sql.DB + bus *event.Bus createRotation *sql.Stmt updateRotation *sql.Stmt @@ -43,11 +45,12 @@ type Store struct { findPartCount *sql.Stmt } -func NewStore(ctx context.Context, db *sql.DB) (*Store, error) { +func NewStore(ctx context.Context, db *sql.DB, bus *event.Bus) (*Store, error) { p := &util.Prepare{DB: db, Ctx: ctx} return &Store{ - db: db, + db: db, + bus: bus, lockPart: p.P(`lock rotation_participants, rotation_state in exclusive mode`), @@ -187,12 +190,15 @@ func (s *Store) CreateRotationTx(ctx context.Context, tx *sql.Tx, r *Rotation) ( stmt = tx.Stmt(stmt) } - n.ID = uuid.New().String() + id := uuid.New() + n.ID = id.String() _, err = stmt.ExecContext(ctx, n.ID, n.Name, n.Description, n.Type, n.Start, n.ShiftLength, n.Start.Location().String()) if err != nil { return nil, err } + + event.SendTx(ctx, s.bus, tx, Update{ID: id}) return n, nil } @@ -217,6 +223,8 @@ func (s *Store) UpdateRotationTx(ctx context.Context, tx *sql.Tx, r *Rotation) e } _, err = stmt.ExecContext(ctx, n.ID, n.Name, n.Description, n.Type, n.Start, n.ShiftLength, n.Start.Location().String()) + + event.SendTx(ctx, s.bus, tx, Update{ID: uuid.MustParse(n.ID)}) return err } @@ -451,6 +459,8 @@ func (s *Store) SetActiveIndexTx(ctx context.Context, tx *sql.Tx, rotID string, // We are checking to see if there is no participant for that position before returning a validation error return validation.NewFieldError("ActiveUserIndex", "invalid index for rotation") } + + event.SendTx(ctx, s.bus, tx, Update{ID: uuid.MustParse(rotID)}) return err } @@ -499,8 +509,12 @@ func (s *Store) AddRotationUsersTx(ctx context.Context, tx *sql.Tx, rotationID s return nil }) + if err != nil { + return err + } - return err + event.SendTx(ctx, s.bus, tx, Update{ID: uuid.MustParse(rotationID)}) + return nil } func (s *Store) DeleteRotationParticipantsTx(ctx context.Context, tx *sql.Tx, partIDs []string) error { From d43c6398fc562c751532d8c8df4b90b72e26f510 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Thu, 6 Feb 2025 10:53:37 -0600 Subject: [PATCH 05/20] refactor(rotation): remove unused UpdateAll function from rotation manager --- engine/rotationmanager/update.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/engine/rotationmanager/update.go b/engine/rotationmanager/update.go index d19c0c5b9b..ea65f4a2b3 100644 --- a/engine/rotationmanager/update.go +++ b/engine/rotationmanager/update.go @@ -14,16 +14,6 @@ import ( "github.com/target/goalert/validation/validate" ) -// UpdateAll will update and cleanup the rotation state for all rotations. -func (db *DB) UpdateAll(ctx context.Context) error { - err := permission.LimitCheckAny(ctx, permission.System) - if err != nil { - return err - } - err = db.update(ctx, true, nil) - return err -} - // UpdateOneRotation will update and cleanup the rotation state for the given rotation. func (db *DB) UpdateOneRotation(ctx context.Context, rotID string) error { err := permission.LimitCheckAny(ctx, permission.System) From 0aa39cfa4bf801c8524cdc8d27ba0f7b4ab4bcea Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Thu, 6 Feb 2025 10:55:32 -0600 Subject: [PATCH 06/20] refactor(rotation): remove UpdateOneRotation and related functions from rotation manager --- engine/rotationmanager/update.go | 125 ------------------------------- 1 file changed, 125 deletions(-) delete mode 100644 engine/rotationmanager/update.go diff --git a/engine/rotationmanager/update.go b/engine/rotationmanager/update.go deleted file mode 100644 index ea65f4a2b3..0000000000 --- a/engine/rotationmanager/update.go +++ /dev/null @@ -1,125 +0,0 @@ -package rotationmanager - -import ( - "context" - "database/sql" - "time" - - "github.com/pkg/errors" - "github.com/target/goalert/permission" - "github.com/target/goalert/schedule/rotation" - "github.com/target/goalert/util" - "github.com/target/goalert/util/log" - "github.com/target/goalert/util/sqlutil" - "github.com/target/goalert/validation/validate" -) - -// UpdateOneRotation will update and cleanup the rotation state for the given rotation. -func (db *DB) UpdateOneRotation(ctx context.Context, rotID string) error { - err := permission.LimitCheckAny(ctx, permission.System) - if err != nil { - return err - } - err = validate.UUID("Rotation", rotID) - if err != nil { - return err - } - ctx = log.WithField(ctx, "RotationID", rotID) - return db.update(ctx, false, &rotID) -} - -func (db *DB) update(ctx context.Context, all bool, rotID *string) error { - err := permission.LimitCheckAny(ctx, permission.System) - if err != nil { - return err - } - log.Debugf(ctx, "Updating rotations.") - - // process rotation advancement - tx, err := db.lock.BeginTx(ctx, nil) - if err != nil { - return errors.Wrap(err, "start advancement transaction") - } - defer sqlutil.Rollback(ctx, "rotation manager", tx) - - _, err = tx.StmtContext(ctx, db.lockPart).ExecContext(ctx) - if err != nil { - return errors.Wrap(err, "lock rotation participants") - } - - needsAdvance, err := db.calcAdvances(ctx, tx, all, rotID) - if err != nil { - return errors.Wrap(err, "calc stale rotations") - } - - updateStmt := tx.Stmt(db.rotate) - for _, adv := range needsAdvance { - fctx := log.WithFields(ctx, log.Fields{ - "RotationID": adv.id, - "Position": adv.newPosition, - }) - - if !adv.silent { - log.Debugf(fctx, "Advancing rotation.") - } - _, err = updateStmt.ExecContext(fctx, adv.id, adv.newPosition) - if err != nil { - return errors.Wrap(err, "advance rotation") - } - } - - return errors.Wrap(tx.Commit(), "commit transaction") -} - -func (db *DB) calcAdvances(ctx context.Context, tx *sql.Tx, all bool, rotID *string) ([]advance, error) { - var t time.Time - err := tx.Stmt(db.currentTime).QueryRowContext(ctx).Scan(&t) - if err != nil { - return nil, errors.Wrap(err, "fetch current timestamp") - } - - rows, err := tx.Stmt(db.rotateData).QueryContext(ctx, all, rotID) - if err != nil { - return nil, errors.Wrap(err, "fetch current rotation state") - } - defer rows.Close() - - var rot rotation.Rotation - var state rotState - var partCount int - var tzName string - var adv *advance - var loc *time.Location - var needsAdvance []advance - - for rows.Next() { - err = rows.Scan( - &rot.ID, - &rot.Type, - &rot.Start, - &rot.ShiftLength, - &tzName, - &state.ShiftStart, - &state.Position, - &partCount, - &state.Version, - ) - if err != nil { - return nil, errors.Wrap(err, "scan rotation data") - } - loc, err = util.LoadLocation(tzName) - if err != nil { - return nil, errors.Wrap(err, "load timezone") - } - rot.Start = rot.Start.In(loc) - adv = calcAdvance(ctx, t, &rot, state, partCount) - if adv != nil { - needsAdvance = append(needsAdvance, *adv) - if len(needsAdvance) == 150 { - // only process up to 150 at a time (of those that need updates) - break - } - } - } - return needsAdvance, nil -} From 20c7e9bc50bc5c575fda6632508aa3db932b2649 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Thu, 6 Feb 2025 10:55:58 -0600 Subject: [PATCH 07/20] refactor(rotation): remove unused SQL statements and preparation logic from DB initialization --- engine/rotationmanager/db.go | 37 +----------------------------------- 1 file changed, 1 insertion(+), 36 deletions(-) diff --git a/engine/rotationmanager/db.go b/engine/rotationmanager/db.go index b66f22ba99..bf6228cc84 100644 --- a/engine/rotationmanager/db.go +++ b/engine/rotationmanager/db.go @@ -6,19 +6,12 @@ import ( "github.com/riverqueue/river" "github.com/target/goalert/engine/processinglock" - "github.com/target/goalert/util" ) // DB manages rotations in Postgres. type DB struct { lock *processinglock.Lock - currentTime *sql.Stmt - - lockPart *sql.Stmt - rotate *sql.Stmt - rotateData *sql.Stmt - riverDBSQL *river.Client[*sql.Tx] } @@ -34,38 +27,10 @@ func NewDB(ctx context.Context, db *sql.DB, riverDBSQL *river.Client[*sql.Tx]) ( if err != nil { return nil, err } - p := &util.Prepare{Ctx: ctx, DB: db} return &DB{ lock: lock, riverDBSQL: riverDBSQL, - - currentTime: p.P(`select now()`), - lockPart: p.P(`lock rotation_participants, rotation_state in exclusive mode`), - rotate: p.P(` - update rotation_state - set - shift_start = now(), - rotation_participant_id = (select id from rotation_participants where rotation_id = $1 and position = $2), - version = 2 - where rotation_id = $1 - `), - rotateData: p.P(` - select - rot.id, - rot."type", - rot.start_time, - rot.shift_length, - rot.time_zone, - state.shift_start, - state."position", - rot.participant_count, - state.version - from rotations rot - join rotation_state state on state.rotation_id = rot.id - where $1 or state.rotation_id = $2 - for update skip locked - `), - }, p.Err + }, nil } From 174069743b5510778de0f8c5ac8506bea6bad3b0 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Thu, 6 Feb 2025 13:44:51 -0600 Subject: [PATCH 08/20] feat(rotation): add EventBus to App struct and update rotation handling with event notifications --- app/initgraphql.go | 1 + graphql2/graphqlapp/app.go | 4 +++- graphql2/graphqlapp/rotation.go | 9 +++++---- schedule/rotation/store.go | 24 +++++------------------- 4 files changed, 14 insertions(+), 24 deletions(-) diff --git a/app/initgraphql.go b/app/initgraphql.go index 66e23f48d3..9930b48571 100644 --- a/app/initgraphql.go +++ b/app/initgraphql.go @@ -42,6 +42,7 @@ func (app *App) initGraphQL(ctx context.Context) error { SWO: app.cfg.SWO, APIKeyStore: app.APIKeyStore, DestReg: app.DestRegistry, + EventBus: app.EventBus, } return nil diff --git a/graphql2/graphqlapp/app.go b/graphql2/graphqlapp/app.go index c5b901d587..31841a92ae 100644 --- a/graphql2/graphqlapp/app.go +++ b/graphql2/graphqlapp/app.go @@ -27,6 +27,7 @@ import ( "github.com/target/goalert/calsub" "github.com/target/goalert/config" "github.com/target/goalert/escalation" + "github.com/target/goalert/event" "github.com/target/goalert/graphql2" "github.com/target/goalert/heartbeat" "github.com/target/goalert/integrationkey" @@ -99,7 +100,8 @@ type App struct { SWO *swo.Manager - DestReg *nfydest.Registry + DestReg *nfydest.Registry + EventBus *event.Bus } type fieldErr struct { diff --git a/graphql2/graphqlapp/rotation.go b/graphql2/graphqlapp/rotation.go index a97b02a977..5dccf4831d 100644 --- a/graphql2/graphqlapp/rotation.go +++ b/graphql2/graphqlapp/rotation.go @@ -5,7 +5,9 @@ import ( "database/sql" "time" + "github.com/google/uuid" "github.com/target/goalert/assignment" + "github.com/target/goalert/event" "github.com/target/goalert/graphql2" "github.com/target/goalert/permission" "github.com/target/goalert/schedule/rotation" @@ -369,9 +371,10 @@ func (m *Mutation) UpdateRotation(ctx context.Context, input graphql2.UpdateRota } } - return err - }) + event.SendTx(ctx, m.EventBus, tx, rotation.Update{ID: uuid.MustParse(input.ID)}) + return nil + }) if err != nil { return false, err } @@ -379,7 +382,6 @@ func (m *Mutation) UpdateRotation(ctx context.Context, input graphql2.UpdateRota } func (a *Query) CalcRotationHandoffTimes(ctx context.Context, input *graphql2.CalcRotationHandoffTimesInput) ([]time.Time, error) { - err := validate.Range("count", input.Count, 0, 20) if err != nil { return nil, err @@ -430,7 +432,6 @@ func (a *Query) CalcRotationHandoffTimes(ctx context.Context, input *graphql2.Ca // getRotationFromISO determines the rotation type based on the given ISODuration. An error is given if the unsupported year field or multiple non-zero fields are given. func setRotationShiftFromISO(rot *rotation.Rotation, dur *timeutil.ISODuration) error { - // validate only one time field (year, month, days, timepart) is non-zero nonZeroFields := 0 diff --git a/schedule/rotation/store.go b/schedule/rotation/store.go index c2c8ccbd14..8ee5425d9a 100644 --- a/schedule/rotation/store.go +++ b/schedule/rotation/store.go @@ -8,7 +8,6 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/target/goalert/assignment" - "github.com/target/goalert/event" "github.com/target/goalert/permission" "github.com/target/goalert/util" "github.com/target/goalert/util/sqlutil" @@ -20,8 +19,7 @@ import ( var ErrNoState = errors.New("no state available") type Store struct { - db *sql.DB - bus *event.Bus + db *sql.DB createRotation *sql.Stmt updateRotation *sql.Stmt @@ -45,12 +43,11 @@ type Store struct { findPartCount *sql.Stmt } -func NewStore(ctx context.Context, db *sql.DB, bus *event.Bus) (*Store, error) { +func NewStore(ctx context.Context, db *sql.DB) (*Store, error) { p := &util.Prepare{DB: db, Ctx: ctx} return &Store{ - db: db, - bus: bus, + db: db, lockPart: p.P(`lock rotation_participants, rotation_state in exclusive mode`), @@ -190,15 +187,12 @@ func (s *Store) CreateRotationTx(ctx context.Context, tx *sql.Tx, r *Rotation) ( stmt = tx.Stmt(stmt) } - id := uuid.New() - n.ID = id.String() + n.ID = uuid.New().String() _, err = stmt.ExecContext(ctx, n.ID, n.Name, n.Description, n.Type, n.Start, n.ShiftLength, n.Start.Location().String()) if err != nil { return nil, err } - - event.SendTx(ctx, s.bus, tx, Update{ID: id}) return n, nil } @@ -223,8 +217,6 @@ func (s *Store) UpdateRotationTx(ctx context.Context, tx *sql.Tx, r *Rotation) e } _, err = stmt.ExecContext(ctx, n.ID, n.Name, n.Description, n.Type, n.Start, n.ShiftLength, n.Start.Location().String()) - - event.SendTx(ctx, s.bus, tx, Update{ID: uuid.MustParse(n.ID)}) return err } @@ -459,8 +451,6 @@ func (s *Store) SetActiveIndexTx(ctx context.Context, tx *sql.Tx, rotID string, // We are checking to see if there is no participant for that position before returning a validation error return validation.NewFieldError("ActiveUserIndex", "invalid index for rotation") } - - event.SendTx(ctx, s.bus, tx, Update{ID: uuid.MustParse(rotID)}) return err } @@ -509,12 +499,8 @@ func (s *Store) AddRotationUsersTx(ctx context.Context, tx *sql.Tx, rotationID s return nil }) - if err != nil { - return err - } - event.SendTx(ctx, s.bus, tx, Update{ID: uuid.MustParse(rotationID)}) - return nil + return err } func (s *Store) DeleteRotationParticipantsTx(ctx context.Context, tx *sql.Tx, partIDs []string) error { From 50599e960ca183467bc33e39933b9e0c03b14f65 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Thu, 6 Feb 2025 13:45:35 -0600 Subject: [PATCH 09/20] refactor(rotation): remove EventBus dependency from RotationStore initialization --- app/initstores.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/initstores.go b/app/initstores.go index cd1e8272f6..d204e3736b 100644 --- a/app/initstores.go +++ b/app/initstores.go @@ -194,7 +194,7 @@ func (app *App) initStores(ctx context.Context) error { } if app.RotationStore == nil { - app.RotationStore, err = rotation.NewStore(ctx, app.db, app.EventBus) + app.RotationStore, err = rotation.NewStore(ctx, app.db) } if err != nil { return errors.Wrap(err, "init rotation store") From 1f4046a6cd0ffbdf63b390758c42d71e9b7f781f Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Fri, 7 Feb 2025 14:48:57 -0600 Subject: [PATCH 10/20] refactor(tests): simplify user assertion logic in WaitAndAssertOnCallUsers --- .../20250205165505-track-rotation-updates.sql | 2 ++ test/smoke/harness/harness.go | 22 ++++--------------- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/migrate/migrations/20250205165505-track-rotation-updates.sql b/migrate/migrations/20250205165505-track-rotation-updates.sql index 058a5d5705..8a46a413ff 100644 --- a/migrate/migrations/20250205165505-track-rotation-updates.sql +++ b/migrate/migrations/20250205165505-track-rotation-updates.sql @@ -57,3 +57,5 @@ FROM -- +migrate Down DROP TABLE entity_updates; +DROP FUNCTION fn_track_rotation_updates(); + diff --git a/test/smoke/harness/harness.go b/test/smoke/harness/harness.go index 6b49b060b6..1c732079b6 100644 --- a/test/smoke/harness/harness.go +++ b/test/smoke/harness/harness.go @@ -803,26 +803,12 @@ func (h *Harness) WaitAndAssertOnCallUsers(serviceID string, userIDs ...string) return uniq } sort.Strings(userIDs) - match := func(final bool) bool { + check := func(t *assert.CollectT) { ids := getUsers() - if len(ids) != len(userIDs) { - if final { - h.t.Fatalf("got %d on-call users; want %d", len(ids), len(userIDs)) - } - return false - } - for i, id := range userIDs { - if ids[i] != id { - if final { - h.t.Fatalf("on-call[%d] = %s; want %s", i, ids[i], id) - } - return false - } - } - return true + require.Lenf(t, ids, len(userIDs), "number of on-call users") + require.EqualValuesf(t, userIDs, ids, "on-call users") } - h.Trigger() // run engine cycle - match(true) // assert result + assert.EventuallyWithT(h.t, check, 5*time.Second, 100*time.Millisecond) } From 5b6a5df4d8ee295ce6f9c78627ec88d605235f5b Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Fri, 7 Feb 2025 14:52:29 -0600 Subject: [PATCH 11/20] chore(schema): update auto-generated data hash in schema.sql --- migrate/schema.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/migrate/schema.sql b/migrate/schema.sql index 586db6ffcd..eb531baf50 100644 --- a/migrate/schema.sql +++ b/migrate/schema.sql @@ -1,5 +1,5 @@ -- This file is auto-generated by "make db-schema"; DO NOT EDIT --- DATA=0d4a01d6ded7ca0cb412d82228deac104404d35f6ba8128f0ce543a0e7367623 - +-- DATA=16df1b4efe4c463710bac60981c52f8ef2b05234f815ffde94979bdacf4322ee - -- DISK=e363175696c65c32920016cc9b06ae1551c72ce8f89302a7f139befdc886c19f - -- PSQL=e363175696c65c32920016cc9b06ae1551c72ce8f89302a7f139befdc886c19f - -- From a1a1761e3b51d111f1aa3c64653ed37dcc73bded Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Fri, 7 Feb 2025 14:59:07 -0600 Subject: [PATCH 12/20] refactor(migrations): remove triggers and function related to rotation updates --- .../migrations/20250205165505-track-rotation-updates.sql | 8 +++++++- migrate/schema.sql | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/migrate/migrations/20250205165505-track-rotation-updates.sql b/migrate/migrations/20250205165505-track-rotation-updates.sql index 8a46a413ff..1f1845223d 100644 --- a/migrate/migrations/20250205165505-track-rotation-updates.sql +++ b/migrate/migrations/20250205165505-track-rotation-updates.sql @@ -55,7 +55,13 @@ FROM rotations; -- +migrate Down -DROP TABLE entity_updates; +DROP TRIGGER IF EXISTS trg_track_rotation_updates ON rotations; + +DROP TRIGGER IF EXISTS track_rotation_state_updates ON rotation_state; + +DROP TRIGGER IF EXISTS track_rotation_state_updates ON rotation_participants; DROP FUNCTION fn_track_rotation_updates(); +DROP TABLE entity_updates; + diff --git a/migrate/schema.sql b/migrate/schema.sql index eb531baf50..91b2810e00 100644 --- a/migrate/schema.sql +++ b/migrate/schema.sql @@ -1,5 +1,5 @@ -- This file is auto-generated by "make db-schema"; DO NOT EDIT --- DATA=16df1b4efe4c463710bac60981c52f8ef2b05234f815ffde94979bdacf4322ee - +-- DATA=fa86df968f60643e7ec822d67707c6410ff9a3510bf901f3a18dfafdc22a4e1a - -- DISK=e363175696c65c32920016cc9b06ae1551c72ce8f89302a7f139befdc886c19f - -- PSQL=e363175696c65c32920016cc9b06ae1551c72ce8f89302a7f139befdc886c19f - -- From ca9832fd280f9fddb3618f485d1dc765ab02bf8f Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Fri, 7 Feb 2025 15:31:44 -0600 Subject: [PATCH 13/20] refactor(migrations): rename triggers for clarity and update schema.sql --- .../20250205165505-track-rotation-updates.sql | 10 +++++----- migrate/schema.sql | 6 +++--- test/smoke/migrations_test.go | 2 ++ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/migrate/migrations/20250205165505-track-rotation-updates.sql b/migrate/migrations/20250205165505-track-rotation-updates.sql index 1f1845223d..64d209d285 100644 --- a/migrate/migrations/20250205165505-track-rotation-updates.sql +++ b/migrate/migrations/20250205165505-track-rotation-updates.sql @@ -37,12 +37,12 @@ CREATE TRIGGER trg_track_rotation_updates FOR EACH ROW EXECUTE FUNCTION fn_track_rotation_updates(); -CREATE TRIGGER trg_track_rotation_updates +CREATE TRIGGER trg_track_rotation_state_updates AFTER UPDATE ON rotation_state FOR EACH ROW EXECUTE FUNCTION fn_track_rotation_updates(); -CREATE TRIGGER track_rotation_state_updates +CREATE TRIGGER trg_track_rotation_part_updates AFTER INSERT OR UPDATE OR DELETE ON rotation_participants FOR EACH ROW EXECUTE FUNCTION fn_track_rotation_updates(); @@ -55,11 +55,11 @@ FROM rotations; -- +migrate Down -DROP TRIGGER IF EXISTS trg_track_rotation_updates ON rotations; +DROP TRIGGER trg_track_rotation_updates ON rotations; -DROP TRIGGER IF EXISTS track_rotation_state_updates ON rotation_state; +DROP TRIGGER trg_track_rotation_state_updates ON rotation_state; -DROP TRIGGER IF EXISTS track_rotation_state_updates ON rotation_participants; +DROP TRIGGER trg_track_rotation_part_updates ON rotation_participants; DROP FUNCTION fn_track_rotation_updates(); diff --git a/migrate/schema.sql b/migrate/schema.sql index 91b2810e00..faeaec0c90 100644 --- a/migrate/schema.sql +++ b/migrate/schema.sql @@ -1,5 +1,5 @@ -- This file is auto-generated by "make db-schema"; DO NOT EDIT --- DATA=fa86df968f60643e7ec822d67707c6410ff9a3510bf901f3a18dfafdc22a4e1a - +-- DATA=59402a05b69ad3a2679fcbdcb057b78d68bb139f2bc985239cd77ee8ff9bf601 - -- DISK=e363175696c65c32920016cc9b06ae1551c72ce8f89302a7f139befdc886c19f - -- PSQL=e363175696c65c32920016cc9b06ae1551c72ce8f89302a7f139befdc886c19f - -- @@ -2240,7 +2240,6 @@ CREATE INDEX idx_participant_rotation ON public.rotation_participants USING btre CREATE UNIQUE INDEX rotation_participants_pkey ON public.rotation_participants USING btree (id); CREATE UNIQUE INDEX rotation_participants_rotation_id_position_key ON public.rotation_participants USING btree (rotation_id, "position"); -CREATE TRIGGER track_rotation_state_updates AFTER INSERT OR DELETE OR UPDATE ON public.rotation_participants FOR EACH ROW EXECUTE FUNCTION fn_track_rotation_updates(); CREATE TRIGGER trg_10_decr_part_count_on_del BEFORE DELETE ON public.rotation_participants FOR EACH ROW EXECUTE FUNCTION fn_decr_part_count_on_del(); CREATE TRIGGER trg_20_decr_rot_part_position_on_delete AFTER DELETE ON public.rotation_participants FOR EACH ROW EXECUTE FUNCTION fn_decr_rot_part_position_on_delete(); CREATE TRIGGER trg_30_advance_or_end_rot_on_part_del BEFORE DELETE ON public.rotation_participants FOR EACH ROW EXECUTE FUNCTION fn_advance_or_end_rot_on_part_del(); @@ -2250,6 +2249,7 @@ CREATE TRIGGER trg_inc_rot_part_position_on_insert BEFORE INSERT ON public.rotat CREATE TRIGGER trg_incr_part_count_on_add BEFORE INSERT ON public.rotation_participants FOR EACH ROW EXECUTE FUNCTION fn_incr_part_count_on_add(); CREATE TRIGGER trg_set_rot_state_pos_on_part_reorder BEFORE UPDATE ON public.rotation_participants FOR EACH ROW WHEN ((new."position" <> old."position")) EXECUTE FUNCTION fn_set_rot_state_pos_on_part_reorder(); CREATE TRIGGER trg_start_rotation_on_first_part_add AFTER INSERT ON public.rotation_participants FOR EACH ROW EXECUTE FUNCTION fn_start_rotation_on_first_part_add(); +CREATE TRIGGER trg_track_rotation_part_updates AFTER INSERT OR DELETE OR UPDATE ON public.rotation_participants FOR EACH ROW EXECUTE FUNCTION fn_track_rotation_updates(); CREATE TABLE rotation_state ( @@ -2269,7 +2269,7 @@ CREATE UNIQUE INDEX rotation_state_pkey ON public.rotation_state USING btree (ro CREATE UNIQUE INDEX rotation_state_uniq_id ON public.rotation_state USING btree (id); CREATE TRIGGER trg_set_rot_state_pos_on_active_change BEFORE UPDATE ON public.rotation_state FOR EACH ROW WHEN ((new.rotation_participant_id <> old.rotation_participant_id)) EXECUTE FUNCTION fn_set_rot_state_pos_on_active_change(); -CREATE TRIGGER trg_track_rotation_updates AFTER UPDATE ON public.rotation_state FOR EACH ROW EXECUTE FUNCTION fn_track_rotation_updates(); +CREATE TRIGGER trg_track_rotation_state_updates AFTER UPDATE ON public.rotation_state FOR EACH ROW EXECUTE FUNCTION fn_track_rotation_updates(); CREATE TABLE rotations ( diff --git a/test/smoke/migrations_test.go b/test/smoke/migrations_test.go index eef1823b70..170963cd77 100644 --- a/test/smoke/migrations_test.go +++ b/test/smoke/migrations_test.go @@ -79,6 +79,8 @@ var rules = migratetest.RuleSet{ // Every DB must have a unique ID. {MigrationName: "switchover-mk2", TableName: "switchover_state", ColumnName: "db_id"}, + + {MigrationName: "track-rotation-updates", TableName: "entity_updates", ColumnName: "created_at"}, } const migrateInitData = ` From f05525bc968d98e47fadc9ad8fbc791c60222bb4 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 10 Feb 2025 13:47:08 -0600 Subject: [PATCH 14/20] refactor(rotation): add queue assignment in updateRotation and improve lookForWork comments --- engine/rotationmanager/lookforwork.go | 1 + engine/rotationmanager/updaterotation.go | 1 + 2 files changed, 2 insertions(+) diff --git a/engine/rotationmanager/lookforwork.go b/engine/rotationmanager/lookforwork.go index c27d8a0479..13cbb86682 100644 --- a/engine/rotationmanager/lookforwork.go +++ b/engine/rotationmanager/lookforwork.go @@ -58,5 +58,6 @@ func (db *DB) lookForWork(ctx context.Context, j *river.Job[LookForWorkArgs]) er return nil } + // There was work to do, so wait a bit before looking again. return river.JobSnooze(time.Second) } diff --git a/engine/rotationmanager/updaterotation.go b/engine/rotationmanager/updaterotation.go index 78e531d761..06169fdf0a 100644 --- a/engine/rotationmanager/updaterotation.go +++ b/engine/rotationmanager/updaterotation.go @@ -62,6 +62,7 @@ func (db *DB) updateRotation(ctx context.Context, j *river.Job[UpdateArgs]) erro }, Priority: PriorityScheduled, ScheduledAt: r.EndTime(row.Now), + Queue: QueueName, }) if err != nil { return fmt.Errorf("schedule next run: %w", err) From cb245d7f7a98471d82f998194e1ad844af86c2ad Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 10 Feb 2025 14:03:29 -0600 Subject: [PATCH 15/20] add version --- engine/rotationmanager/queries.sql | 1 + engine/rotationmanager/updaterotation.go | 1 + gadb/queries.sql.go | 3 +++ 3 files changed, 5 insertions(+) diff --git a/engine/rotationmanager/queries.sql b/engine/rotationmanager/queries.sql index d2bf9a8e2d..5cb0d8ddb9 100644 --- a/engine/rotationmanager/queries.sql +++ b/engine/rotationmanager/queries.sql @@ -9,6 +9,7 @@ SELECT rot.time_zone, state.position, state.shift_start, + state.version, ARRAY ( SELECT p.id diff --git a/engine/rotationmanager/updaterotation.go b/engine/rotationmanager/updaterotation.go index 06169fdf0a..c1b08c9ea3 100644 --- a/engine/rotationmanager/updaterotation.go +++ b/engine/rotationmanager/updaterotation.go @@ -76,6 +76,7 @@ func (db *DB) updateRotation(ctx context.Context, j *river.Job[UpdateArgs]) erro s := rotState{ ShiftStart: row.ShiftStart.Time.In(loc), Position: int(row.Position.Int32), + Version: int(row.Version.Int32), } adv := calcAdvance(ctx, row.Now, &r, s, len(row.Participants)) if adv == nil { diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index a4fc10e129..f4fc92700c 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -4245,6 +4245,7 @@ SELECT rot.time_zone, state.position, state.shift_start, + state.version, ARRAY ( SELECT p.id @@ -4270,6 +4271,7 @@ type RotMgrRotationDataRow struct { TimeZone string Position sql.NullInt32 ShiftStart sql.NullTime + Version sql.NullInt32 Participants []uuid.UUID } @@ -4286,6 +4288,7 @@ func (q *Queries) RotMgrRotationData(ctx context.Context, rotationID uuid.UUID) &i.TimeZone, &i.Position, &i.ShiftStart, + &i.Version, pq.Array(&i.Participants), ) return i, err From 13933dad7dbd7d92aa56b01ea252ecfb3c38848e Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 10 Feb 2025 14:05:56 -0600 Subject: [PATCH 16/20] refactor(rotation): enhance calcAdvance to handle multiple rotation versions and return error for unknown versions --- engine/rotationmanager/advance.go | 15 ++++++++++----- engine/rotationmanager/updaterotation.go | 5 ++++- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/engine/rotationmanager/advance.go b/engine/rotationmanager/advance.go index b3647bc9f5..38333605ef 100644 --- a/engine/rotationmanager/advance.go +++ b/engine/rotationmanager/advance.go @@ -23,15 +23,20 @@ type rotState struct { } // calcAdvance will calculate rotation advancement if it is required. If not, nil is returned -func calcAdvance(ctx context.Context, t time.Time, rot *rotation.Rotation, state rotState, partCount int) *advance { +func calcAdvance(ctx context.Context, t time.Time, rot *rotation.Rotation, state rotState, partCount int) (*advance, error) { var mustUpdate bool origPos := state.Position // get next shift start time newStart := rot.EndTime(state.ShiftStart) - if state.Version == 1 { + switch state.Version { + case 1: newStart = calcVersion1EndTime(rot, state.ShiftStart) mustUpdate = true + case 2: + // no-op + default: + return nil, fmt.Errorf("unknown rotation version (supported: 1,2): %d", state.Version) } if state.Position >= partCount { @@ -49,10 +54,10 @@ func calcAdvance(ctx context.Context, t time.Time, rot *rotation.Rotation, state // If migrating from version 1 to 2 without changing // who's on-call do so silently. silent: state.Version == 1 && state.Position == origPos, - } + }, nil } // in the future, so nothing to do yet - return nil + return nil, nil } if !newStart.After(t.Add(-15 * time.Minute)) { @@ -79,5 +84,5 @@ func calcAdvance(ctx context.Context, t time.Time, rot *rotation.Rotation, state return &advance{ id: rot.ID, newPosition: state.Position, - } + }, nil } diff --git a/engine/rotationmanager/updaterotation.go b/engine/rotationmanager/updaterotation.go index c1b08c9ea3..326773048b 100644 --- a/engine/rotationmanager/updaterotation.go +++ b/engine/rotationmanager/updaterotation.go @@ -78,7 +78,10 @@ func (db *DB) updateRotation(ctx context.Context, j *river.Job[UpdateArgs]) erro Position: int(row.Position.Int32), Version: int(row.Version.Int32), } - adv := calcAdvance(ctx, row.Now, &r, s, len(row.Participants)) + adv, err := calcAdvance(ctx, row.Now, &r, s, len(row.Participants)) + if err != nil { + return fmt.Errorf("calc advance: %w", err) + } if adv == nil { // no advancement needed return nil From 532494b3f6fc914b4614fa1f8409c5d09be46507 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 11 Feb 2025 09:25:08 -0600 Subject: [PATCH 17/20] refactor(rotation): update comments for lookForWork and updateRotation functions for clarity --- engine/rotationmanager/lookforwork.go | 2 +- engine/rotationmanager/setup.go | 2 +- engine/rotationmanager/updaterotation.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/engine/rotationmanager/lookforwork.go b/engine/rotationmanager/lookforwork.go index 13cbb86682..cc97987c58 100644 --- a/engine/rotationmanager/lookforwork.go +++ b/engine/rotationmanager/lookforwork.go @@ -15,7 +15,7 @@ type LookForWorkArgs struct{} func (LookForWorkArgs) Kind() string { return "rotation-manager-lfw" } -// cleanup is a worker function that will remove any stale subscriptions. +// lookForWork will schedule jobs for rotations in the entity_updates table. func (db *DB) lookForWork(ctx context.Context, j *river.Job[LookForWorkArgs]) error { var hadWork bool err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { diff --git a/engine/rotationmanager/setup.go b/engine/rotationmanager/setup.go index 0fe8d9f095..ac7e56b1f3 100644 --- a/engine/rotationmanager/setup.go +++ b/engine/rotationmanager/setup.go @@ -18,7 +18,7 @@ const ( PriorityLFW = 4 ) -var _ processinglock.Setupable = &DB{} +var _ processinglock.Setupable = (*DB)(nil) // assert that DB implements processinglock.Setupable // Setup implements processinglock.Setupable. func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { diff --git a/engine/rotationmanager/updaterotation.go b/engine/rotationmanager/updaterotation.go index 326773048b..9ca3d3f364 100644 --- a/engine/rotationmanager/updaterotation.go +++ b/engine/rotationmanager/updaterotation.go @@ -20,7 +20,7 @@ type UpdateArgs struct { func (UpdateArgs) Kind() string { return "rotation-manager-update" } -// cleanup is a worker function that will remove any stale subscriptions. +// updateRotation updates the state of a single rotation, and schedules a job for the next rotation time. func (db *DB) updateRotation(ctx context.Context, j *river.Job[UpdateArgs]) error { return db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { g := gadb.New(tx) From 6bcfd5f72bfc1f01321d6e718b68ec6b345b9001 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Wed, 26 Feb 2025 15:00:19 -0600 Subject: [PATCH 18/20] build(deps): update river and related dependencies to v0.18.0 --- go.mod | 18 +++++++++--------- go.sum | 36 ++++++++++++++++++------------------ 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index 7454982e53..74c8276cf9 100644 --- a/go.mod +++ b/go.mod @@ -33,15 +33,16 @@ require ( github.com/pelletier/go-toml/v2 v2.2.3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.5 - github.com/riverqueue/river v0.16.0 - github.com/riverqueue/river/cmd/river v0.16.0 - github.com/riverqueue/river/riverdriver/riverdatabasesql v0.16.0 - github.com/riverqueue/river/riverdriver/riverpgxv5 v0.16.0 - github.com/riverqueue/river/rivertype v0.16.0 + github.com/riverqueue/river v0.18.0 + github.com/riverqueue/river/cmd/river v0.18.0 + github.com/riverqueue/river/riverdriver/riverdatabasesql v0.18.0 + github.com/riverqueue/river/riverdriver/riverpgxv5 v0.18.0 + github.com/riverqueue/river/rivershared v0.18.0 + github.com/riverqueue/river/rivertype v0.18.0 github.com/samber/slog-logrus v1.0.0 github.com/sirupsen/logrus v1.9.3 github.com/slack-go/slack v0.16.0 - github.com/spf13/cobra v1.9.0 + github.com/spf13/cobra v1.9.1 github.com/spf13/viper v1.19.0 github.com/sqlc-dev/pqtype v0.3.0 github.com/stretchr/testify v1.10.0 @@ -106,7 +107,7 @@ require ( github.com/jhump/protoreflect v1.17.0 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/leodido/go-urn v1.4.0 // indirect - github.com/lmittmann/tint v1.0.6 // indirect + github.com/lmittmann/tint v1.0.7 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -122,8 +123,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.60.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect - github.com/riverqueue/river/riverdriver v0.16.0 // indirect - github.com/riverqueue/river/rivershared v0.16.0 // indirect + github.com/riverqueue/river/riverdriver v0.18.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sagikazarmark/locafero v0.6.0 // indirect diff --git a/go.sum b/go.sum index 5266467bef..42d074de84 100644 --- a/go.sum +++ b/go.sum @@ -246,8 +246,8 @@ github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/lmittmann/tint v1.0.6 h1:vkkuDAZXc0EFGNzYjWcV0h7eEX+uujH48f/ifSkJWgc= -github.com/lmittmann/tint v1.0.6/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= +github.com/lmittmann/tint v1.0.7 h1:D/0OqWZ0YOGZ6AyC+5Y2kD8PBEzBk6rFHVSfOqCkF9Y= +github.com/lmittmann/tint v1.0.7/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/matcornic/hermes/v2 v2.1.0 h1:9TDYFBPFv6mcXanaDmRDEp/RTWj0dTTi+LpFnnnfNWc= @@ -305,20 +305,20 @@ github.com/prometheus/common v0.60.1 h1:FUas6GcOw66yB/73KC+BOZoFJmbo/1pojoILArPA github.com/prometheus/common v0.60.1/go.mod h1:h0LYf1R1deLSKtD4Vdg8gy4RuOvENW2J/h19V5NADQw= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/riverqueue/river v0.16.0 h1:YyQrs0kGgjuABwgat02DPUYS0TMyG2ZFlzvf6+fSFaw= -github.com/riverqueue/river v0.16.0/go.mod h1:pEZ8Gc15XyFjVY89nJeL256ub5z18XF7ukYn8ktqQrs= -github.com/riverqueue/river/cmd/river v0.16.0 h1:7vsFaaD/9CT/jTGx8stEjsn0Hy4FVvKQ+RLk8jEoqqQ= -github.com/riverqueue/river/cmd/river v0.16.0/go.mod h1:GKp7Q7h1fRbZmBldwOseXFD7BVv4AYeZEn+MXCibVxY= -github.com/riverqueue/river/riverdriver v0.16.0 h1:y4Df4e1Xk3Id0nnu1VxHJn9118OzmRHcmvOxM/i1Q30= -github.com/riverqueue/river/riverdriver v0.16.0/go.mod h1:7Kdf5HQDrLyLUUqPqXobaK+7zbcMctWeAl7yhg4nHes= -github.com/riverqueue/river/riverdriver/riverdatabasesql v0.16.0 h1:T/DcMmZXiJAyLN3CSyAoNcf3U4oAD9Ht/8Vd5SXv5YU= -github.com/riverqueue/river/riverdriver/riverdatabasesql v0.16.0/go.mod h1:a9EUhD2yGsAeM9eWo+QrGGbL8LVWoGj2m8KEzm0xUxE= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.16.0 h1:6HP296OPN+3ORL9qG1f561pldB5eovkLzfkNIQmaTXI= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.16.0/go.mod h1:MAeBNoTQ+CD3nRvV9mF6iCBfsGJTxYHZeZSP4MYoeUE= -github.com/riverqueue/river/rivershared v0.16.0 h1:L1lQ3gMwdIsxA6yF0/PwAdsFP0T82yBD1V03q2GuJDU= -github.com/riverqueue/river/rivershared v0.16.0/go.mod h1:y5Xu8Shcp44DUNnEQV4c6oWH4m2OTkSMCe6nRrgzT34= -github.com/riverqueue/river/rivertype v0.16.0 h1:iDjNtCiUbXwLraqNEyQdH/OD80f1wTo8Ai6WHYCwRxs= -github.com/riverqueue/river/rivertype v0.16.0/go.mod h1:DETcejveWlq6bAb8tHkbgJqmXWVLiFhTiEm8j7co1bE= +github.com/riverqueue/river v0.18.0 h1:sGHeTOL9MR8+pMIVHRm59fzet8Ron/xjF3Yq/PSGb78= +github.com/riverqueue/river v0.18.0/go.mod h1:oapX5xb/L2YnkE801QubDZ0COHxVxEGVY37icPzghhU= +github.com/riverqueue/river/cmd/river v0.18.0 h1:W+prm3ze3vqCO9HJzxyTl48/loh8/jnCR4usc4QEYUM= +github.com/riverqueue/river/cmd/river v0.18.0/go.mod h1:fVPP+QsD6n/53GX54z4Ub9PsTOlrVjFLk8PbxeBdq1I= +github.com/riverqueue/river/riverdriver v0.18.0 h1:a2haR5I0MQLHjLCSVFpUEeJALCLemRl5zCztucysm1E= +github.com/riverqueue/river/riverdriver v0.18.0/go.mod h1:Mj45PbHabEnBv/nSah0J1/tg6hrX/SNeXtcYcSqMzxQ= +github.com/riverqueue/river/riverdriver/riverdatabasesql v0.18.0 h1:I6+5PWNt1WLBYZmtGXZnU/0oiE9f+py92guBw0Ah81k= +github.com/riverqueue/river/riverdriver/riverdatabasesql v0.18.0/go.mod h1:5AEPOZ4bHz4Oco4HO8azdp01ZgoUA41r/Qum8VplvIo= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.18.0 h1:fE28haolCDg8vRtAL9Pi+yGeU1Kbiry/zhZHzpPjiL8= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.18.0/go.mod h1:YpDDm1R0GPDt3GrQCR7olxDdaYWRTgvaDQhRiDKsJiQ= +github.com/riverqueue/river/rivershared v0.18.0 h1:hBfyaoTAvogs7lSw4vr6A2ZdZmmtTlew10P4MRXuaDg= +github.com/riverqueue/river/rivershared v0.18.0/go.mod h1:wyJw90ILEYNcYCoXr4B6iPHnSyRH0WKGQuPzjdEwou8= +github.com/riverqueue/river/rivertype v0.18.0 h1:YsXR5NbLAzniurGO0+zcISWMKq7Y71xkIe2oi86OAsE= +github.com/riverqueue/river/rivertype v0.18.0/go.mod h1:DETcejveWlq6bAb8tHkbgJqmXWVLiFhTiEm8j7co1bE= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= @@ -361,8 +361,8 @@ github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= -github.com/spf13/cobra v1.9.0 h1:Py5fIuq/lJsRYxcxfOtsJqpmwJWCMOUy2tMJYV8TNHE= -github.com/spf13/cobra v1.9.0/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0= +github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo= +github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0= github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= From 1757b58626a6762854e38378c0f5e7ed50148765 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Wed, 26 Feb 2025 15:05:28 -0600 Subject: [PATCH 19/20] refactor(rotation): streamline rotation data retrieval and update logic --- engine/rotationmanager/queries.sql | 13 ++---- engine/rotationmanager/updaterotation.go | 18 ++++----- gadb/queries.sql.go | 50 +++++++++++------------- 3 files changed, 36 insertions(+), 45 deletions(-) diff --git a/engine/rotationmanager/queries.sql b/engine/rotationmanager/queries.sql index 5cb0d8ddb9..4074b1bfea 100644 --- a/engine/rotationmanager/queries.sql +++ b/engine/rotationmanager/queries.sql @@ -2,14 +2,8 @@ -- Get rotation data for a given rotation ID SELECT now()::timestamptz AS now, - rot.id, - rot.type, - rot.start_time, - rot.shift_length, - rot.time_zone, - state.position, - state.shift_start, - state.version, + sqlc.embed(rot), + sqlc.embed(state), ARRAY ( SELECT p.id @@ -51,7 +45,8 @@ UPDATE SET position = @position, shift_start = now(), - rotation_participant_id = @rotation_participant_id + rotation_participant_id = @rotation_participant_id, + version = 2 WHERE rotation_id = @rotation_id; diff --git a/engine/rotationmanager/updaterotation.go b/engine/rotationmanager/updaterotation.go index 9ca3d3f364..038a747068 100644 --- a/engine/rotationmanager/updaterotation.go +++ b/engine/rotationmanager/updaterotation.go @@ -35,7 +35,7 @@ func (db *DB) updateRotation(ctx context.Context, j *river.Job[UpdateArgs]) erro } if len(row.Participants) == 0 { - if row.Position.Valid { + if row.RotationState.Version != 0 { // no participants, but we have a position, so clear it return g.RotMgrEnd(ctx, j.Args.RotationID) } @@ -43,15 +43,15 @@ func (db *DB) updateRotation(ctx context.Context, j *river.Job[UpdateArgs]) erro return nil } - loc, err := util.LoadLocation(row.TimeZone) + loc, err := util.LoadLocation(row.Rotation.TimeZone) if err != nil { return fmt.Errorf("load location: %w", err) } r := rotation.Rotation{ - Type: rotation.Type(row.Type), - Start: row.StartTime.In(loc), - ShiftLength: int(row.ShiftLength), + Type: rotation.Type(row.Rotation.Type), + Start: row.Rotation.StartTime.In(loc), + ShiftLength: int(row.Rotation.ShiftLength), } // schedule next run @@ -68,15 +68,15 @@ func (db *DB) updateRotation(ctx context.Context, j *river.Job[UpdateArgs]) erro return fmt.Errorf("schedule next run: %w", err) } - if !row.Position.Valid { + if row.RotationState.Version == 0 { // no state, but we have participants, so start at the beginning return g.RotMgrStart(ctx, j.Args.RotationID) } s := rotState{ - ShiftStart: row.ShiftStart.Time.In(loc), - Position: int(row.Position.Int32), - Version: int(row.Version.Int32), + ShiftStart: row.RotationState.ShiftStart.In(loc), + Position: int(row.RotationState.Position), + Version: int(row.RotationState.Version), } adv, err := calcAdvance(ctx, row.Now, &r, s, len(row.Participants)) if err != nil { diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index f4fc92700c..ee0f5f65bf 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -4238,14 +4238,8 @@ func (q *Queries) RotMgrFindWork(ctx context.Context) ([]uuid.UUID, error) { const rotMgrRotationData = `-- name: RotMgrRotationData :one SELECT now()::timestamptz AS now, - rot.id, - rot.type, - rot.start_time, - rot.shift_length, - rot.time_zone, - state.position, - state.shift_start, - state.version, + rot.description, rot.id, rot.last_processed, rot.name, rot.participant_count, rot.shift_length, rot.start_time, rot.time_zone, rot.type, + state.id, state.position, state.rotation_id, state.rotation_participant_id, state.shift_start, state.version, ARRAY ( SELECT p.id @@ -4263,16 +4257,10 @@ WHERE ` type RotMgrRotationDataRow struct { - Now time.Time - ID uuid.UUID - Type EnumRotationType - StartTime time.Time - ShiftLength int64 - TimeZone string - Position sql.NullInt32 - ShiftStart sql.NullTime - Version sql.NullInt32 - Participants []uuid.UUID + Now time.Time + Rotation Rotation + RotationState RotationState + Participants []uuid.UUID } // Get rotation data for a given rotation ID @@ -4281,14 +4269,21 @@ func (q *Queries) RotMgrRotationData(ctx context.Context, rotationID uuid.UUID) var i RotMgrRotationDataRow err := row.Scan( &i.Now, - &i.ID, - &i.Type, - &i.StartTime, - &i.ShiftLength, - &i.TimeZone, - &i.Position, - &i.ShiftStart, - &i.Version, + &i.Rotation.Description, + &i.Rotation.ID, + &i.Rotation.LastProcessed, + &i.Rotation.Name, + &i.Rotation.ParticipantCount, + &i.Rotation.ShiftLength, + &i.Rotation.StartTime, + &i.Rotation.TimeZone, + &i.Rotation.Type, + &i.RotationState.ID, + &i.RotationState.Position, + &i.RotationState.RotationID, + &i.RotationState.RotationParticipantID, + &i.RotationState.ShiftStart, + &i.RotationState.Version, pq.Array(&i.Participants), ) return i, err @@ -4320,7 +4315,8 @@ UPDATE SET position = $1, shift_start = now(), - rotation_participant_id = $2 + rotation_participant_id = $2, + version = 2 WHERE rotation_id = $3 ` From cdc2f9c41e46eacfdee5cf70b5280a51db2c8650 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Wed, 26 Feb 2025 15:26:02 -0600 Subject: [PATCH 20/20] defined app.events during startup phase --- app/listenevents.go | 7 +------ app/startup.go | 7 +++++++ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/app/listenevents.go b/app/listenevents.go index d8f5090a28..41f0b25c22 100644 --- a/app/listenevents.go +++ b/app/listenevents.go @@ -7,15 +7,10 @@ import ( "github.com/pkg/errors" "github.com/target/goalert/permission" "github.com/target/goalert/util/log" - "github.com/target/goalert/util/sqlutil" ) func (app *App) listenEvents(ctx context.Context) (<-chan struct{}, error) { - l, err := sqlutil.NewListener(ctx, app.cfg.LegacyLogger, app.db, "/goalert/config-refresh") - if err != nil { - return nil, err - } - app.events = l + l := app.events go func() { for { select { diff --git a/app/startup.go b/app/startup.go index 79ea11f8f2..9f3da264c4 100644 --- a/app/startup.go +++ b/app/startup.go @@ -10,6 +10,7 @@ import ( "github.com/target/goalert/notification/email" "github.com/target/goalert/notification/webhook" "github.com/target/goalert/retry" + "github.com/target/goalert/util/sqlutil" "github.com/pkg/errors" ) @@ -117,5 +118,11 @@ func (app *App) startup(ctx context.Context) error { app.Logger.InfoContext(ctx, "SWO Enabled.") } + l, err := sqlutil.NewListener(ctx, app.cfg.LegacyLogger, app.db, "/goalert/config-refresh") + if err != nil { + return err + } + app.events = l + return nil }