Skip to content

Commit 6429043

Browse files
authored
moved main to cmd/root.go, refact (#67)
* moved main to cmd/root.go * fix rpm build test * refact HandleSignals() * cyclo/ extracted method
1 parent 05c0813 commit 6429043

File tree

4 files changed

+266
-250
lines changed

4 files changed

+266
-250
lines changed

cmd/root.go

+262
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
package cmd
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"flag"
7+
"fmt"
8+
"net"
9+
"net/http"
10+
"os"
11+
"os/exec"
12+
"os/signal"
13+
"syscall"
14+
"time"
15+
16+
"github.com/coreos/go-systemd/v22/daemon"
17+
"github.com/prometheus/client_golang/prometheus"
18+
"github.com/prometheus/client_golang/prometheus/promhttp"
19+
log "github.com/sirupsen/logrus"
20+
"golang.org/x/sync/errgroup"
21+
22+
"github.com/crowdsecurity/crowdsec/pkg/models"
23+
csbouncer "github.com/crowdsecurity/go-cs-bouncer"
24+
25+
"github.com/crowdsecurity/cs-custom-bouncer/pkg/cfg"
26+
"github.com/crowdsecurity/cs-custom-bouncer/pkg/custom"
27+
"github.com/crowdsecurity/cs-custom-bouncer/pkg/version"
28+
)
29+
30+
const (
31+
name = "crowdsec-custom-bouncer"
32+
)
33+
34+
func bouncerShutdown(custom *custom.CustomBouncer) {
35+
log.Info("shutting down custom-bouncer service")
36+
if err := custom.ShutDown(); err != nil {
37+
log.Errorf("while shutting down custom-bouncer service: %s", err)
38+
}
39+
}
40+
41+
func HandleSignals(ctx context.Context) error {
42+
signalChan := make(chan os.Signal, 1)
43+
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
44+
45+
select {
46+
case s := <-signalChan:
47+
switch s {
48+
case syscall.SIGTERM:
49+
return fmt.Errorf("received SIGTERM")
50+
case syscall.SIGINT:
51+
return fmt.Errorf("received SIGINT")
52+
}
53+
case <-ctx.Done():
54+
return ctx.Err()
55+
}
56+
return nil
57+
}
58+
59+
func deleteDecisions(custom *custom.CustomBouncer, decisions []*models.Decision) {
60+
if len(decisions) == 1 {
61+
log.Infof("deleting 1 decision")
62+
} else {
63+
log.Infof("deleting %d decisions", len(decisions))
64+
}
65+
for _, d := range decisions {
66+
if err := custom.Delete(d); err != nil {
67+
log.Errorf("unable to delete decision for '%s': %s", *d.Value, err)
68+
continue
69+
}
70+
log.Debugf("deleted '%s'", *d.Value)
71+
}
72+
}
73+
74+
func addDecisions(custom *custom.CustomBouncer, decisions []*models.Decision) {
75+
if len(decisions) == 1 {
76+
log.Infof("adding 1 decision")
77+
} else {
78+
log.Infof("adding %d decisions", len(decisions))
79+
}
80+
for _, d := range decisions {
81+
if err := custom.Add(d); err != nil {
82+
log.Errorf("unable to insert decision for '%s': %s", *d.Value, err)
83+
continue
84+
}
85+
log.Debugf("Adding '%s' for '%s'", *d.Value, *d.Duration)
86+
}
87+
}
88+
89+
func feedViaStdin(ctx context.Context, custom *custom.CustomBouncer, config *cfg.BouncerConfig) error {
90+
f := func() error {
91+
log.Debugf("Starting binary %s %s", config.BinPath, config.BinArgs)
92+
c := exec.CommandContext(ctx, config.BinPath, config.BinArgs...)
93+
s, err := c.StdinPipe()
94+
if err != nil {
95+
return err
96+
}
97+
custom.BinaryStdin = s
98+
if err := c.Start(); err != nil {
99+
return err
100+
}
101+
102+
return c.Wait()
103+
}
104+
var err error
105+
if config.TotalRetries == -1 {
106+
for {
107+
err = f()
108+
log.Errorf("Binary exited: %s", err)
109+
}
110+
} else {
111+
for i := 1; i <= config.TotalRetries; i++ {
112+
err = f()
113+
log.Errorf("Binary exited (retry %d/%d): %s", i, config.TotalRetries, err)
114+
}
115+
}
116+
return fmt.Errorf("maximum retries exceeded for binary. Exiting")
117+
}
118+
119+
func Execute() error {
120+
var err error
121+
var promServer *http.Server
122+
configPath := flag.String("c", "", "path to crowdsec-custom-bouncer.yaml")
123+
verbose := flag.Bool("v", false, "set verbose mode")
124+
bouncerVersion := flag.Bool("version", false, "display version and exit")
125+
testConfig := flag.Bool("t", false, "test config and exit")
126+
showConfig := flag.Bool("T", false, "show full config (.yaml + .yaml.local) and exit")
127+
128+
flag.Parse()
129+
130+
if *bouncerVersion {
131+
fmt.Print(version.ShowStr())
132+
return nil
133+
}
134+
135+
if configPath == nil || *configPath == "" {
136+
return fmt.Errorf("configuration file is required")
137+
}
138+
139+
configBytes, err := cfg.MergedConfig(*configPath)
140+
if err != nil {
141+
return fmt.Errorf("unable to read config file: %w", err)
142+
}
143+
144+
if *showConfig {
145+
fmt.Println(string(configBytes))
146+
return nil
147+
}
148+
149+
config, err := cfg.NewConfig(bytes.NewReader(configBytes))
150+
if err != nil {
151+
return fmt.Errorf("unable to load configuration: %w", err)
152+
}
153+
154+
if *verbose {
155+
log.SetLevel(log.DebugLevel)
156+
}
157+
158+
custom, err := custom.NewCustomBouncer(config)
159+
if err != nil {
160+
return err
161+
}
162+
163+
if *testConfig {
164+
log.Info("config is valid")
165+
return nil
166+
}
167+
168+
if err = custom.Init(); err != nil {
169+
return err
170+
}
171+
172+
defer bouncerShutdown(custom)
173+
174+
bouncer := &csbouncer.StreamBouncer{}
175+
bouncer.UserAgent = fmt.Sprintf("%s/%s", name, version.VersionStr())
176+
177+
err = bouncer.ConfigReader(bytes.NewReader(configBytes))
178+
if err != nil {
179+
return fmt.Errorf("unable to configure bouncer: %w", err)
180+
}
181+
182+
if err := bouncer.Init(); err != nil {
183+
return err
184+
}
185+
cacheResetTicker := time.NewTicker(config.CacheRetentionDuration)
186+
187+
g, ctx := errgroup.WithContext(context.Background())
188+
189+
g.Go(func() error {
190+
bouncer.Run(ctx)
191+
return fmt.Errorf("stream init failed")
192+
})
193+
194+
if config.PrometheusConfig.Enabled {
195+
listenOn := net.JoinHostPort(
196+
config.PrometheusConfig.ListenAddress,
197+
config.PrometheusConfig.ListenPort,
198+
)
199+
muxer := http.NewServeMux()
200+
promServer = &http.Server{
201+
Addr: net.JoinHostPort(
202+
config.PrometheusConfig.ListenAddress,
203+
config.PrometheusConfig.ListenPort,
204+
),
205+
Handler: muxer,
206+
}
207+
muxer.Handle("/metrics", promhttp.Handler())
208+
prometheus.MustRegister(csbouncer.TotalLAPICalls, csbouncer.TotalLAPIError)
209+
go func() {
210+
log.Infof("Serving metrics at %s", listenOn+"/metrics")
211+
log.Error(promServer.ListenAndServe())
212+
// don't need to cancel context here, prometheus is not critical
213+
}()
214+
}
215+
216+
if config.FeedViaStdin {
217+
g.Go(func() error {
218+
return feedViaStdin(ctx, custom, config)
219+
})
220+
}
221+
222+
g.Go(func() error {
223+
log.Infof("Processing new and deleted decisions . . .")
224+
for {
225+
select {
226+
case <-ctx.Done():
227+
log.Infoln("terminating bouncer process")
228+
if config.PrometheusConfig.Enabled {
229+
log.Infoln("terminating prometheus server")
230+
if err := promServer.Shutdown(context.Background()); err != nil {
231+
log.Errorf("unable to shutdown prometheus server: %s", err)
232+
}
233+
}
234+
return nil
235+
case decisions := <-bouncer.Stream:
236+
if decisions == nil {
237+
continue
238+
}
239+
deleteDecisions(custom, decisions.Deleted)
240+
addDecisions(custom, decisions.New)
241+
case <-cacheResetTicker.C:
242+
custom.ResetCache()
243+
}
244+
}
245+
})
246+
247+
if config.Daemon {
248+
sent, err := daemon.SdNotify(false, "READY=1")
249+
if !sent && err != nil {
250+
log.Errorf("Failed to notify: %v", err)
251+
}
252+
g.Go(func() error {
253+
return HandleSignals(ctx)
254+
})
255+
}
256+
257+
if err := g.Wait(); err != nil {
258+
return fmt.Errorf("process terminated with error: %w", err)
259+
}
260+
261+
return nil
262+
}

0 commit comments

Comments
 (0)