Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

signals: use job queue to handle processing #4140

Merged
merged 40 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1a03009
add pgx pool that will switch DBs properly
mastercactapus Nov 8, 2024
899ec20
update harness
mastercactapus Nov 8, 2024
dc6fcb8
Refactor to use pgxpool throughout
mastercactapus Nov 8, 2024
bf57f31
Set maximum connections for pgx pool to 5
mastercactapus Nov 8, 2024
ccf0c7f
Refactor database connection setup for retries
mastercactapus Nov 8, 2024
7bedd65
Add function to unlock all PostgreSQL advisory locks
mastercactapus Nov 8, 2024
b48f539
Remove unused time package and redundant config settings
mastercactapus Nov 8, 2024
ee7d7e7
Rename connector.go to appurl.go for clarity
mastercactapus Nov 8, 2024
d059ddf
Merge branch 'master' into swo-pgxpool
mastercactapus Nov 8, 2024
d5776ed
Upgrade sqlc and refactor migration parsing
mastercactapus Nov 8, 2024
8de57a4
Merge branch 'smart-migration-statement-splitting' into river-job-queue
mastercactapus Nov 8, 2024
06235d5
feat: integrate River queue system and UI into the application
mastercactapus Nov 8, 2024
41c145b
feat: add unique ID columns to river queue tables and refactor logger…
mastercactapus Nov 8, 2024
c08d24f
Merge branch 'master' into river-job-queue
mastercactapus Nov 8, 2024
2b2bf1d
wait to introduce UI
mastercactapus Nov 8, 2024
f318197
Revert "wait to introduce UI"
mastercactapus Nov 8, 2024
b8dead0
ignore imported migrations from label calculation
mastercactapus Nov 8, 2024
ff58b60
rename migrations
mastercactapus Nov 8, 2024
4b79fc2
Add river job migration script and update schema with new river job t…
mastercactapus Nov 8, 2024
c232c2c
regen models
mastercactapus Nov 8, 2024
fc94317
Add RiverWorkers to App and update related components
mastercactapus Nov 8, 2024
c98f359
Add River integration and enforce signal limits
mastercactapus Nov 11, 2024
83c73f9
Only show river UI in local dev for now
mastercactapus Nov 11, 2024
e3b2867
fix migration validation
mastercactapus Nov 11, 2024
41e8877
log error if necessary
mastercactapus Nov 11, 2024
25e63e2
Merge branch 'river-job-queue' into river-signals
mastercactapus Nov 12, 2024
164e6b3
Merge branch 'master' into river-job-queue
mastercactapus Nov 12, 2024
bc5a206
disable river for now
mastercactapus Nov 12, 2024
66e432b
fix broken down migration
mastercactapus Nov 12, 2024
51af56b
Merge branch 'river-job-queue' into river-signals
mastercactapus Nov 12, 2024
2445cf0
Merge branch 'master' into river-signals
mastercactapus Nov 12, 2024
0a29885
re-enable
mastercactapus Nov 12, 2024
3004de5
cap pending signal messages
mastercactapus Nov 12, 2024
f2ed95e
don't start it twice
mastercactapus Nov 12, 2024
581c83f
refactor: use constant for engine signal manager queue name
mastercactapus Nov 12, 2024
d2e126b
longer name
mastercactapus Nov 12, 2024
c4f6e1e
feat: add service ID filtering to signal message scheduling and retri…
mastercactapus Nov 12, 2024
3df81e3
feat: implement TriggerService for immediate message scheduling and e…
mastercactapus Nov 12, 2024
12f2482
feat: add noop worker for river initialization to prevent registratio…
mastercactapus Nov 12, 2024
2497fec
use run context when re-starting river
mastercactapus Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type App struct {
APIKeyStore *apikey.Store
River *river.Client[pgx.Tx]
RiverUI *riverui.Server
RiverWorkers *river.Workers
}

// NewApp constructs a new App and binds the listening socket.
Expand Down
1 change: 1 addition & 0 deletions app/initengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (app *App) initEngine(ctx context.Context) error {
DisableCycle: app.cfg.APIOnly,
LogCycles: app.cfg.LogEngine,
River: app.River,
RiverWorkers: app.RiverWorkers,
})
if err != nil {
return errors.Wrap(err, "init engine")
Expand Down
16 changes: 14 additions & 2 deletions app/initriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,25 @@ func (r *riverErrs) HandlePanic(ctx context.Context, job *rivertype.JobRow, pani
return nil
}

type noopWorker struct{}

func (noopWorker) Kind() string { return "noop" }

func (app *App) initRiver(ctx context.Context) error {
w := river.NewWorkers()
app.RiverWorkers = river.NewWorkers()

// TODO: remove once a worker is added that's not behind a feature flag
//
// Without this, it will complain about no workers being registered.
river.AddWorker(app.RiverWorkers, river.WorkFunc(func(ctx context.Context, j *river.Job[noopWorker]) error {
// Do something with the job
return nil
}))

var err error
app.River, err = river.NewClient(riverpgxv5.New(app.pgx), &river.Config{
Logger: app.Logger,
Workers: w,
Workers: app.RiverWorkers,
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Expand Down
2 changes: 1 addition & 1 deletion app/initstores.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (app *App) initStores(ctx context.Context) error {
return errors.Wrap(err, "init API key store")
}

app.UIKHandler = uik.NewHandler(app.db, app.IntegrationKeyStore, app.AlertStore)
app.UIKHandler = uik.NewHandler(app.db, app.IntegrationKeyStore, app.AlertStore, app.River)

return nil
}
2 changes: 1 addition & 1 deletion app/startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (app *App) startup(ctx context.Context) error {
return err
})

app.initStartup(ctx, "Startup.River", app.initRiver)
app.initStartup(ctx, "Startup.DBStores", app.initStores)
ctx = app.ConfigStore.Config().Context(ctx)

Expand All @@ -67,7 +68,6 @@ func (app *App) startup(ctx context.Context) error {

app.initStartup(ctx, "Startup.Slack", app.initSlack)

app.initStartup(ctx, "Startup.River", app.initRiver)
app.initStartup(ctx, "Startup.Engine", app.initEngine)
app.initStartup(ctx, "Startup.Auth", app.initAuth)
app.initStartup(ctx, "Startup.GraphQL", app.initGraphQL)
Expand Down
1 change: 1 addition & 0 deletions engine/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
SlackStore *slack.ChannelSender
DestRegistry *nfydest.Registry
River *river.Client[pgx.Tx]
RiverWorkers *river.Workers

ConfigSource config.Source

Expand Down
77 changes: 56 additions & 21 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ import (
"github.com/target/goalert/util/sqlutil"
)

type updater interface {
Name() string
UpdateAll(context.Context) error
}

// Engine handles automatic escalation of unacknowledged(triggered) alerts, as well as
// passing to-be-sent notifications to the notification.Sender.
//
Expand All @@ -53,12 +48,15 @@ type Engine struct {
triggerCh chan struct{}
runLoopExit chan struct{}

modules []updater
modules []processinglock.Module
msg *message.DB

a *alert.Store
cfg *Config

// needed to re-start river during switchover
runCtx context.Context

triggerPauseCh chan *pauseReq
}

Expand Down Expand Up @@ -137,7 +135,7 @@ func NewEngine(ctx context.Context, db *sql.DB, c *Config) (*Engine, error) {
return nil, errors.Wrap(err, "compatibility backend")
}

p.modules = []updater{
p.modules = []processinglock.Module{
compatMgr,
rotMgr,
schedMgr,
Expand Down Expand Up @@ -168,6 +166,21 @@ func NewEngine(ctx context.Context, db *sql.DB, c *Config) (*Engine, error) {
return nil, errors.Wrap(err, "init backend")
}

args := processinglock.SetupArgs{
DB: db,
River: c.River,
Workers: c.RiverWorkers,
ConfigSource: c.ConfigSource,
}
for _, m := range p.modules {
if s, ok := m.(processinglock.Setupable); ok {
err = s.Setup(ctx, args)
if err != nil {
return nil, errors.Wrap(err, "setup module")
}
}
}

return p, nil
}

Expand All @@ -178,7 +191,7 @@ func (p *Engine) AuthLinkURL(ctx context.Context, providerID, subjectID string,
return url, err
}

func (p *Engine) processModule(ctx context.Context, m updater) {
func (p *Engine) processModule(ctx context.Context, m processinglock.Updatable) {
defer recoverPanic(ctx, m.Name())
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
Expand Down Expand Up @@ -241,6 +254,11 @@ func (p *Engine) Pause(ctx context.Context) error {
func (p *Engine) _pause(ctx context.Context) error {
ch := make(chan error, 1)

err := p.cfg.River.Stop(ctx)
if err != nil {
return errors.Wrap(err, "pause river")
}

select {
case <-p.shutdownCh:
return errors.New("shutting down")
Expand All @@ -263,8 +281,17 @@ func (p *Engine) Resume(ctx context.Context) error {
}

func (p *Engine) _resume(ctx context.Context) error {
// nothing to be done `p.mgr.IsPaused` will already
// return false
if p.cfg.DisableCycle {
return nil
}

err := p.cfg.River.QueueResume(ctx, "*", nil)
if err != nil {
return errors.Wrap(err, "resume river")
}

go p.startRiver()

return nil
}

Expand All @@ -284,9 +311,9 @@ func (p *Engine) Shutdown(ctx context.Context) error {

func (p *Engine) _shutdown(ctx context.Context) (err error) {
close(p.shutdownCh)
// if !p.cfg.DisableCycle {
// err = p.cfg.River.Stop(ctx)
// }
if !p.cfg.DisableCycle {
err = p.cfg.River.Stop(ctx)
}
<-p.runLoopExit
return err
}
Expand Down Expand Up @@ -454,8 +481,13 @@ func (p *Engine) processAll(ctx context.Context) bool {
return true
}

up, ok := m.(processinglock.Updatable)
if !ok {
continue
}

start := time.Now()
p.processModule(ctx, m)
p.processModule(ctx, up)
metricModuleDuration.WithLabelValues(m.Name()).Observe(time.Since(start).Seconds())
}
return false
Expand Down Expand Up @@ -528,6 +560,14 @@ func (p *Engine) handlePause(ctx context.Context, respCh chan error) {
respCh <- nil
}

func (p *Engine) startRiver() {
ctx := p.runCtx
err := p.cfg.River.Start(ctx)
if err != nil {
log.Log(ctx, errors.Wrap(err, "start river"))
}
}

func (p *Engine) _run(ctx context.Context) error {
defer close(p.runLoopExit)
ctx = permission.SystemContext(ctx, "Engine")
Expand All @@ -549,13 +589,8 @@ func (p *Engine) _run(ctx context.Context) error {
}
}

// Disabled until we have our first worker (next PR)
// go func() {
// err := p.cfg.River.Start(ctx)
// if err != nil {
// log.Log(ctx, errors.Wrap(err, "start river"))
// }
// }()
p.runCtx = ctx
go p.startRiver()

dur := p.cfg.CycleTime
if dur == 0 {
Expand Down
35 changes: 35 additions & 0 deletions engine/processinglock/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package processinglock

import (
"context"
"database/sql"

"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/target/goalert/config"
)

// Module is a processing lock module.
type Module interface {
Name() string
}

// Updatable is an interface for types that can be updated.
type Updatable interface {
Module
UpdateAll(context.Context) error
}

// SetupArgs is a struct that contains the arguments for the setup function.
type SetupArgs struct {
DB *sql.DB
River *river.Client[pgx.Tx]
Workers *river.Workers
ConfigSource config.Source
}

// Setupable is an interface for types that can be set up using the job queue system.
type Setupable interface {
Module
Setup(context.Context, SetupArgs) error
}
18 changes: 18 additions & 0 deletions engine/signalmgr/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ FROM
pending_signals
WHERE
message_id IS NULL
AND (sqlc.narg(service_id)::uuid IS NULL
OR service_id = @service_id)
FOR UPDATE
SKIP LOCKED
LIMIT 100;
Expand All @@ -32,3 +34,19 @@ DELETE FROM pending_signals
WHERE message_id IS NULL
AND created_at < NOW() - INTERVAL '1 hour';

-- name: SignalMgrGetScheduled :many
SELECT
count(*),
service_id,
channel_id
FROM
outgoing_messages
WHERE
message_type = 'signal_message'
AND last_status = 'pending'
AND (sqlc.narg(service_id)::uuid IS NULL
OR service_id = @service_id)
GROUP BY
service_id,
channel_id;

81 changes: 81 additions & 0 deletions engine/signalmgr/schedulemessages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package signalmgr

import (
"context"
"database/sql"
"fmt"
"time"

"github.com/google/uuid"
"github.com/riverqueue/river"
"github.com/target/goalert/gadb"
)

func (db *DB) scheduleMessages(ctx context.Context, serviceID uuid.NullUUID) error {
var didWork bool
err := db.lock.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
q := gadb.New(tx)

messages, err := q.SignalMgrGetPending(ctx, serviceID)
if err != nil {
return fmt.Errorf("get pending signals: %w", err)
}

type dest struct {
ServiceID uuid.UUID
ChannelID uuid.UUID
}

res, err := q.SignalMgrGetScheduled(ctx, serviceID)
if err != nil {
return fmt.Errorf("get scheduled signals: %w", err)
}
alreadyScheduled := make(map[dest]struct{}, len(res))
for _, r := range res {
alreadyScheduled[dest{ServiceID: r.ServiceID.UUID, ChannelID: r.ChannelID.UUID}] = struct{}{}
}

for _, m := range messages {
if _, ok := alreadyScheduled[dest{ServiceID: m.ServiceID, ChannelID: m.DestID}]; ok {
// Only allow one message per destination, per service, to be scheduled at a time.
continue
}
didWork = true
alreadyScheduled[dest{ServiceID: m.ServiceID, ChannelID: m.DestID}] = struct{}{}
id := uuid.New()
err = q.SignalMgrInsertMessage(ctx, gadb.SignalMgrInsertMessageParams{
ID: id,
ServiceID: uuid.NullUUID{Valid: true, UUID: m.ServiceID},
ChannelID: uuid.NullUUID{Valid: true, UUID: m.DestID},
})
if err != nil {
return fmt.Errorf("insert message: %w", err)
}

err = q.SignalMgrUpdateSignal(ctx, gadb.SignalMgrUpdateSignalParams{
ID: m.ID,
MessageID: uuid.NullUUID{Valid: true, UUID: id},
})
if err != nil {
return fmt.Errorf("update signal: %w", err)
}
}

return nil
})
if err != nil {
return err
}

if serviceID.Valid {
// only try once for per-service/on-demand updates
return nil
}

if didWork {
// for global update, we want to keep checking for work until nothing is left.
return river.JobSnooze(time.Second)
}

return nil
}
Loading
Loading