diff --git a/.gitignore b/.gitignore index a3227f21..3dedac2a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ goguerrilla.conf goguerrilla.conf.json /guerrillad vendor -go-guerrilla.wiki \ No newline at end of file +go-guerrilla.wiki +statik diff --git a/.travis.yml b/.travis.yml index 69cb9ad4..1fa63a53 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,8 +7,16 @@ go: - 1.10.x - master +before_install: + - if [[ `npm -v` != 3* ]]; then npm i -g npm@3; fi + - nvm install 6 && nvm use 6 + install: - export GO15VENDOREXPERIMENT=1 + - go get github.com/rakyll/statik + - go install github.com/rakyll/statik + - cd dashboard/js && npm install && npm run build && cd ../.. + - statik -src=dashboard/js/build -dest=dashboard - go get github.com/Masterminds/glide - go install github.com/Masterminds/glide - glide up diff --git a/Makefile b/Makefile index 1a0ebc2c..aca28265 100644 --- a/Makefile +++ b/Makefile @@ -16,10 +16,17 @@ help: clean: rm -f guerrillad + rm -rf dashboard/js/node_modules + rm -rf dashboard/js/build dependencies: $(GO_VARS) $(GO) list -f='{{ join .Deps "\n" }}' $(ROOT)/cmd/guerrillad | grep -v $(ROOT) | tr '\n' ' ' | $(GO_VARS) xargs $(GO) get -u -v $(GO_VARS) $(GO) list -f='{{ join .Deps "\n" }}' $(ROOT)/cmd/guerrillad | grep -v $(ROOT) | tr '\n' ' ' | $(GO_VARS) xargs $(GO) install -v + cd dashboard/js && npm install && cd ../.. + +dashboard: dashboard/*.go */*/*/*.js */*/*/*/*.js + cd dashboard/js && npm run build && cd ../.. + statik -src=dashboard/js/build -dest=dashboard guerrillad: *.go */*.go */*/*.go $(GO_VARS) $(GO) build -o="guerrillad" -ldflags="$(LD_FLAGS)" $(ROOT)/cmd/guerrillad @@ -41,4 +48,4 @@ testrace: *.go */*.go */*/*.go $(GO_VARS) $(GO) test -v ./tests -race $(GO_VARS) $(GO) test -v ./cmd/guerrillad -race $(GO_VARS) $(GO) test -v ./response -race - $(GO_VARS) $(GO) test -v ./backends -race \ No newline at end of file + $(GO_VARS) $(GO) test -v ./backends -race diff --git a/backends/gateway.go b/backends/gateway.go index 5d65ca75..89f5dc94 100644 --- a/backends/gateway.go +++ b/backends/gateway.go @@ -7,11 +7,12 @@ import ( "sync" "time" + "runtime/debug" + "strings" + "github.com/flashmob/go-guerrilla/log" "github.com/flashmob/go-guerrilla/mail" "github.com/flashmob/go-guerrilla/response" - "runtime/debug" - "strings" ) var ErrProcessorNotFound error diff --git a/build.sh b/build.sh new file mode 100755 index 00000000..5ddd3eb0 --- /dev/null +++ b/build.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +# Build frontend to `dashboard/js/build` +cd dashboard/js && npm install && cd ../../ +cd dashboard/js && npm run build && cd ../../ +# Build statik file system in `dashboard/statik` +statik -src=dashboard/js/build -dest=dashboard diff --git a/cmd/guerrillad/backend_test.go.no b/cmd/guerrillad/backend_test.go.no new file mode 100644 index 00000000..d6ab07c9 --- /dev/null +++ b/cmd/guerrillad/backend_test.go.no @@ -0,0 +1,91 @@ +package main + +import ( + "testing" + "os" + "time" + "io/ioutil" + "github.com/flashmob/go-guerrilla/tests/testcert" + "github.com/flashmob/go-guerrilla/log" + "runtime" + "github.com/spf13/cobra" + "sync" + "strings" + "fmt" +) + +func TestBadBackendReload2(t *testing.T) { + + testcert.GenerateCert("mail2.guerrillamail.com", "", 365*24*time.Hour, false, 2048, "P256", "../../tests/") + os.Truncate("../../tests/testlog", 0) + //mainlog, _ = log.GetLogger("../../tests/testlog") + mainlog, _ = log.GetLogger("stdout") + mainlog.SetLevel("debug") + mainlog.Info("are u sure") + mainlog.Info("not another word") + + select { + + case <-time.After(10 * time.Second): + mainlog.Info("paabix") + stacktrace := make([]byte, 8192) + length := runtime.Stack(stacktrace, true) + _ = length + fmt.Fprintf(ioutil.Discard, (string(stacktrace[:length]))) + + panic("timed out") + } + + mainlog.Info("not another word") + sigKill() + ioutil.WriteFile("configJsonA.json", []byte(configJsonA), 0644) + cmd := &cobra.Command{} + configPath = "configJsonA.json" + var serveWG sync.WaitGroup + serveWG.Add(1) + go func() { + mainlog.Info("start serve") + serve(cmd, []string{}) + serveWG.Done() + }() + mainlog.Info("after start") + time.Sleep(testPauseDuration) + + // change the config file to the one with a broken backend + ioutil.WriteFile("configJsonA.json", []byte(configJsonE), 0644) + + // test SIGHUP via the kill command + // Would not work on windows as kill is not available. + // TODO: Implement an alternative test for windows. + if runtime.GOOS != "windows" { + sigHup() + time.Sleep(testPauseDuration) // allow sighup to do its job + // did the pidfile change as expected? + if _, err := os.Stat("./pidfile2.pid"); os.IsNotExist(err) { + t.Error("pidfile not changed after sighup SIGHUP", err) + } + } + + // send kill signal and wait for exit + sigKill() + serveWG.Wait() + //time.Sleep(time.Second * 3) + // did backend started as expected? + fd, err := os.Open("../../tests/testlog") + if err != nil { + t.Error(err) + } + if read, err := ioutil.ReadAll(fd); err == nil { + logOutput := string(read) + if i := strings.Index(logOutput, "reverted to old backend config"); i < 0 { + t.Error("did not revert to old backend config") + } + } + + // cleanup + //os.Truncate("../../tests/testlog", 0) + os.Remove("configJsonA.json") + os.Remove("./pidfile.pid") + os.Remove("./pidfile2.pid") + +} \ No newline at end of file diff --git a/config.go b/config.go index 50399892..fda4cff6 100644 --- a/config.go +++ b/config.go @@ -5,12 +5,14 @@ import ( "encoding/json" "errors" "fmt" - "github.com/flashmob/go-guerrilla/backends" - "github.com/flashmob/go-guerrilla/log" "os" "reflect" "strings" "time" + + "github.com/flashmob/go-guerrilla/backends" + "github.com/flashmob/go-guerrilla/dashboard" + "github.com/flashmob/go-guerrilla/log" ) // AppConfig is the holder of the configuration of the app @@ -30,6 +32,8 @@ type AppConfig struct { LogLevel string `json:"log_level,omitempty"` // BackendConfig configures the email envelope processing backend BackendConfig backends.BackendConfig `json:"backend_config"` + // Dashboard config configures how analytics are gathered and displayed + Dashboard dashboard.Config `json:"dashboard"` } // ServerConfig specifies config options for a single server diff --git a/dashboard/README.md b/dashboard/README.md new file mode 100644 index 00000000..fe253e63 --- /dev/null +++ b/dashboard/README.md @@ -0,0 +1,82 @@ +# About the Dashboard + +The dashboard package gathers data about Guerrilla while it is running +and provides an analytics web dashboard. To activate the dashboard, checkout +the dashboard branch, then build it, then edit your configuration +file as specified in the example configuration. + +tl/dr + +``` +$ git checkout dashboard +$ cd dashboard/js +$ npm install +$ npm run build +$ cd .. +$ statik -src=./js/build +$ cd .. +$ make guerrillad +``` + +Then see the Config section below how to enable it! + +## Screenshot + + + +## The Backend + +The backend is a Go package that collects and stores data from guerrillad, +serves the dashboard to web clients, and updates clients with new analytics data +over WebSockets. + +The backend uses [statik](https://github.com/rakyll/statik) to convert the `build` +folder into a http-servable Go package. When deploying, the frontend should be +built first, then the `statik` package should be created. +An example of this process is in the `.travis.yml`. + +`To build the statik Go package, cd to the `dashboard` dir, then run + + `statik -src=./js/build` + +## The Frontend + +The front-end is written in React and uses WebSockets to accept data +from the backend and [Victory](https://formidable.com/open-source/victory/) to render charts. +The `js` directory is an NPM module that contains all frontend code. +All commands below should be run within the `js` directory. + +To install frontend dependencies: +`npm install` + +To build the frontend code: +`npm run build` + +To run the HMR development server (serves frontend on port 3000 rather than through `dashboard` package): +`npm start` + +## Config + +Add `dashboard` to your goguerrilla.json config file + +``` +"dashboard": { + "is_enabled": true, + "listen_interface": ":8081", + "tick_interval": "5s", + "max_window": "24h", + "ranking_aggregation_interval": "6h" + } +``` + +## Security considerations + +Warning: The dashboard does not have any authentication. It is also served over HTTP. + +Assuming that the host will open the dashboard http port only to the local network or VPN. +However, if you need to access the dashboard securely from a remote connection and +don't have a VPN, then maybe an SSH tunnel could do: + +`ssh you@example.com -L 8081:127.0.0.1:8081 -N` + +Then point your browser to http://127.0.0.1:8081 \ No newline at end of file diff --git a/dashboard/conn_record.go b/dashboard/conn_record.go new file mode 100644 index 00000000..a1a9bcc2 --- /dev/null +++ b/dashboard/conn_record.go @@ -0,0 +1,51 @@ +package dashboard + +import "container/heap" + +// Records ranking of one unique connection in domain/ip/helo rankings. +type connRecord struct { + // Number of records of this type + count int + // Name of the record, either the domain, IP, or helo + value string +} + +// Contains all ranking of a particular type (domain/ip/helo). +// Tracks ranking ordering by implementing heap.Interface +type connRecordHeap []connRecord + +func (crh connRecordHeap) Len() int { + return len(crh) +} + +func (crh connRecordHeap) Less(i, j int) bool { + return crh[i].count > crh[j].count +} + +func (crh connRecordHeap) Swap(i, j int) { + crh[i], crh[j] = crh[j], crh[i] +} + +func (crh *connRecordHeap) Push(x interface{}) { + *crh = append(*crh, x.(connRecord)) +} + +func (crh *connRecordHeap) Pop() interface{} { + old := *crh + l := len(old) + toPop := old[l-1] + *crh = old[:l-1] + return toPop +} + +// Gets N records with the greatest counts, maintaining the state of the heap +func (crh *connRecordHeap) GetN(n int) []connRecord { + nHighest := make([]connRecord, n) + for i := 0; i < n; i++ { + nHighest[i] = heap.Pop(crh).(connRecord) + } + for _, cr := range nHighest { + heap.Push(crh, cr) + } + return nHighest +} diff --git a/dashboard/conn_record_test.go b/dashboard/conn_record_test.go new file mode 100644 index 00000000..b8c876bb --- /dev/null +++ b/dashboard/conn_record_test.go @@ -0,0 +1,28 @@ +package dashboard + +import ( + "container/heap" + "math/rand" + "testing" +) + +const ( + nConnRecords = 100 +) + +func TestConnRecord(t *testing.T) { + var crHeap connRecordHeap = make([]connRecord, nConnRecords) + var max connRecord + for i := 0; i < nConnRecords; i++ { + crHeap[i] = connRecord{rand.Int(), "abc"} + if crHeap[i].count > max.count { + max = crHeap[i] + } + } + + heap.Init(&crHeap) + + if max.count != heap.Pop(&crHeap).(connRecord).count { + t.Error("Pop did not return maximum value") + } +} diff --git a/dashboard/dashboard.go b/dashboard/dashboard.go new file mode 100644 index 00000000..d071fcd9 --- /dev/null +++ b/dashboard/dashboard.go @@ -0,0 +1,214 @@ +package dashboard + +import ( + "fmt" + "math/rand" + "net/http" + "time" + + "sync" + + _ "github.com/flashmob/go-guerrilla/dashboard/statik" + "github.com/flashmob/go-guerrilla/log" + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + "github.com/rakyll/statik/fs" + "sync/atomic" +) + +var ( + config *Config + sessions map[string]*session + + stopRankingManager chan bool = make(chan bool) + stopDataListener chan bool = make(chan bool) + stopHttp chan bool = make(chan bool) + + wg sync.WaitGroup + started sync.WaitGroup + + s state + + mainlogStore atomic.Value +) + +type state int + +const ( + stateStopped state = iota + stateRunning +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + // TODO below for testing w/ webpack only, change before merging + CheckOrigin: func(r *http.Request) bool { return true }, +} + +type Config struct { + Enabled bool `json:"is_enabled"` + ListenInterface string `json:"listen_interface"` + // Interval at which we send measure and send dataframe to frontend + TickInterval string `json:"tick_interval"` + // Maximum interval for which we store data + MaxWindow string `json:"max_window"` + // Granularity for which rankings are aggregated + RankingUpdateInterval string `json:"ranking_aggregation_interval"` + // Determines at which ratio of unique HELOs to unique connections we + // will stop collecting data to prevent memory exhaustion attack. + // Number between 0-1, set to >1 if you never want to stop collecting data. + // Default is 0.8 + UniqueHeloRatioMax float64 `json:"unique_helo_ratio"` +} + +// Begin collecting data and listening for dashboard clients +func Run(c *Config, l log.Logger) { + mainlogStore.Store(l) + statikFS, _ := fs.New() + //store = newDataStore() + applyConfig(c) + sessions = map[string]*session{} + + r := mux.NewRouter() + r.HandleFunc("/ws", webSocketHandler) + r.PathPrefix("/").Handler(http.FileServer(statikFS)) + + rand.Seed(time.Now().UnixNano()) + + started.Add(1) + defer func() { + s = stateStopped + + }() + + closer, err := ListenAndServeWithClose(c.ListenInterface, r) + if err != nil { + mainlog().WithError(err).Error("Dashboard server failed to start") + started.Done() + return + } + mainlog().Infof("started dashboard, listening on http [%s]", c.ListenInterface) + wg.Add(1) + + go func() { + wg.Add(1) + dataListener(tickInterval) + wg.Done() + }() + go func() { + wg.Add(1) + store.rankingManager() + wg.Done() + }() + + s = stateRunning + started.Done() + + select { + case <-stopHttp: + _ = closer.Close() + wg.Done() + return + } +} + +func Stop() { + started.Wait() + if s == stateRunning { + stopDataListener <- true + stopRankingManager <- true + stopHttp <- true + wg.Wait() + + } + +} + +func mainlog() log.Logger { + if v, ok := mainlogStore.Load().(log.Logger); ok { + return v + } + l, _ := log.GetLogger(log.OutputStderr.String(), log.InfoLevel.String()) + return l +} + +// Parses options in config and applies to global variables +func applyConfig(c *Config) { + config = c + + if len(config.MaxWindow) > 0 { + mw, err := time.ParseDuration(config.MaxWindow) + if err == nil { + maxWindow = mw + } + } + if len(config.RankingUpdateInterval) > 0 { + rui, err := time.ParseDuration(config.RankingUpdateInterval) + if err == nil { + rankingUpdateInterval = rui + } + } + if len(config.TickInterval) > 0 { + ti, err := time.ParseDuration(config.TickInterval) + if err == nil { + tickInterval = ti + } + } + if config.UniqueHeloRatioMax > 0 { + uniqueHeloRatioMax = config.UniqueHeloRatioMax + } + + maxTicks = int(maxWindow * tickInterval) + nRankingBuffers = int(maxWindow / rankingUpdateInterval) +} + +func webSocketHandler(w http.ResponseWriter, r *http.Request) { + var sess *session + cookie, err := r.Cookie("SID") + fmt.Println("cookie", cookie, err.Error()) + if err != nil { + // Haven't set this cookie yet. + sess = startSession(w, r) + } else { + var sidExists bool + sess, sidExists = sessions[cookie.Value] + if !sidExists { + // No SID cookie in our store, start a new session + sess = startSession(w, r) + } + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + sess.ws = conn + c := make(chan *message) + sess.send = c + + store.subscribe(sess.id, c) + go sess.receive() + go sess.transmit() + go store.initSession(sess) +} + +func startSession(w http.ResponseWriter, r *http.Request) *session { + sessionID := newSessionID() + + cookie := &http.Cookie{ + Name: "SID", + Value: sessionID, + Path: "/", + // Secure: true, // TODO re-add this when TLS is set up + } + + sess := &session{ + id: sessionID, + } + + http.SetCookie(w, cookie) + sessions[sessionID] = sess + return sess +} diff --git a/dashboard/dashboard_test.go b/dashboard/dashboard_test.go new file mode 100644 index 00000000..02d5b296 --- /dev/null +++ b/dashboard/dashboard_test.go @@ -0,0 +1,260 @@ +package dashboard + +import ( + "bufio" + "encoding/json" + "fmt" + "github.com/flashmob/go-guerrilla/log" + "github.com/gorilla/websocket" + "net/url" + "os" + "regexp" + "strings" + "sync" + "testing" + "time" +) + +var testlog log.Logger + +func init() { + testlog, _ = log.GetLogger(log.OutputOff.String(), log.InfoLevel.String()) +} + +func TestRunStop(t *testing.T) { + config := &Config{ + Enabled: true, + ListenInterface: ":8082", + TickInterval: "5s", + MaxWindow: "24h", + RankingUpdateInterval: "6h", + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + Run(config, testlog) + wg.Done() + }() + // give Run some time to start + time.Sleep(time.Second) + Stop() + // Wait for Run() to exit + wg.Wait() + +} + +// Test if starting with a bad interface address +func TestRunStopBadAddress(t *testing.T) { + config := &Config{ + Enabled: true, + ListenInterface: "1.1.1.1:0", + TickInterval: "5s", + MaxWindow: "24h", + RankingUpdateInterval: "6h", + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + Run(config, testlog) + wg.Done() + }() + time.Sleep(time.Second * 2) + Stop() + // Wait for Run() to exit + wg.Wait() +} + +// Run a simulation from an already captured log +func TestSimulationRun(t *testing.T) { + config := &Config{ + Enabled: true, + ListenInterface: ":8082", + TickInterval: "1s", + MaxWindow: "24h", + RankingUpdateInterval: "6h", + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + Run(config, testlog) + wg.Done() + }() + // give Run some time to start + time.Sleep(time.Second) + // run test + simulateEvents(t) + Stop() + // Wait for Run() to exit + wg.Wait() +} + +func simulateEvents(t *testing.T) { + file, err := os.OpenFile("simulation.log", os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + panic(err.Error()) + } + defer func() { + if err := file.Close(); err != nil { + t.Error(err) + } + if err := os.Remove("simulation.log"); err != nil { + t.Error(err) + } + }() + reader := bufio.NewReader(file) + scanner := bufio.NewScanner(reader) + scanner.Split(bufio.ScanLines) + testlog.AddHook(LogHook) + // match with quotes or without, ie. time="..." or level= + r := regexp.MustCompile(`(.+?)=("[^"]*"|\S*)\s*`) + simStart := time.Now() + var start time.Time + for scanner.Scan() { + fields := map[string]interface{}{} + line := scanner.Text() + items := r.FindAllString(line, -1) + msg := "" + var logElapsed time.Duration + for i := range items { + key, val := parseItem(items[i]) + //fmt.Println(key, val) + if key != "time" && key != "level" && key != "msg" { + fields[key] = val + } + if key == "msg" { + msg = val + } + if key == "time" { + tv, err := time.Parse(time.RFC3339, val) + if err != nil { + t.Error("invalid time", tv) + } + if start.IsZero() { + start = tv + } + fields["start"] = start + logElapsed = tv.Sub(start) + } + + } + diff := time.Now().Sub(simStart) - logElapsed + time.Sleep(diff) // wait so that we don't go too fast + simStart = simStart.Add(diff) // catch up + testlog.WithFields(fields).Info(msg) + } +} + +// parseItem parses a log item, eg time="2017-03-24T11:55:44+11:00" will be: +// key = time and val will be 2017-03-24T11:55:44+11:00 +func parseItem(item string) (key string, val string) { + arr := strings.Split(item, "=") + if len(arr) == 2 { + key = arr[0] + if arr[1][0:1] == "\"" { + pos := len(arr[1]) - 2 + val = arr[1][1:pos] + } else { + val = arr[1] + } + } + val = strings.TrimSpace(val) + return +} + +// Run a simulation from an already captured log +// Then open a websocket and validate that we are getting some data from it +func TestWebsocket(t *testing.T) { + + config := &Config{ + Enabled: true, + ListenInterface: "127.0.0.1:8082", + TickInterval: "1s", + MaxWindow: "24h", + RankingUpdateInterval: "6h", + } + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + Run(config, testlog) + wg.Done() + }() + + var simWg sync.WaitGroup + go func() { + simWg.Add(1) + simulateEvents(t) + simWg.Done() + }() + + time.Sleep(time.Second) + + // lets talk to the websocket + u := url.URL{Scheme: "ws", Host: "127.0.0.1:8082", Path: "/ws"} + + c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + t.Error("cant connect':", err) + return + } + + simWg.Add(1) + go func() { + defer func() { + simWg.Done() + }() + i := 0 + for { + if err := c.SetReadDeadline(time.Now().Add(time.Second + 5)); err != nil { + t.Error(err) + } + _, msg, err := c.ReadMessage() + s := string(msg) + _ = s + if err != nil { + fmt.Println("socket err:", err) + t.Error("websocket failed to connect") + return + } + var objmap map[string]*json.RawMessage + if err := json.Unmarshal(msg, &objmap); err != nil { + t.Error(err) + } + + if pl, ok := objmap["payload"]; ok { + if i == 0 { + ifr := &initFrame{} + if err := json.Unmarshal(*pl, &ifr); err != nil { + t.Error(err, i) + } + + // initial data frame + } else { + df := &dataFrame{} + if err := json.Unmarshal(*pl, &df); err != nil { + t.Error(err, i) + } + if df.NClients.Y > 10 && len(df.TopHelo) > 10 && len(df.TopDomain) > 10 && len(df.TopIP) > 10 { + return + } + } + } + fmt.Println("recv:", string(msg)) + i++ + if i > 2 { + //t.Error("websocket did get find expected result", i) + return + } + } + + }() + simWg.Wait() // wait for sim to exit, wait for websocket to finish reading + Stop() + // Wait for Run() to exit + wg.Wait() + if err := c.Close(); err != nil { + t.Error(err) + } + +} diff --git a/dashboard/datastore.go b/dashboard/datastore.go new file mode 100644 index 00000000..75f54042 --- /dev/null +++ b/dashboard/datastore.go @@ -0,0 +1,266 @@ +package dashboard + +import ( + "go.uber.org/atomic" + "runtime" + "sync" + "time" +) + +const ( + // Number of entries to show in top N charts + topClientsSize = 5 + // Redux action type names + initMessageType = "INIT" + tickMessageType = "TICK" +) + +var ( + tickInterval = time.Second * 5 + maxWindow = time.Hour * 24 + rankingUpdateInterval = time.Hour * 6 + uniqueHeloRatioMax = 0.8 + maxTicks = int(maxWindow / tickInterval) + nRankingBuffers = int(maxWindow / rankingUpdateInterval) + LogHook = logHook(1) + store = newDataStore() +) + +// Keeps track of connection data that is buffered in the topClients +// so the data can be removed after `maxWindow` interval has occurred. +type conn struct { + helo, domain, ip string +} + +type dataStore struct { + lock sync.Mutex + // List of samples of RAM usage + ramTicks []point + // List of samples of number of connected clients + nClientTicks []point + // Up-to-date number of clients + nClients atomic.Uint64 + // Total number of clients in the current aggregation buffer + nClientsInBuffer uint64 + topDomain bufferedRanking + topHelo bufferedRanking + topIP bufferedRanking + // For notifying the store about new connections + newConns chan conn + subs map[string]chan<- *message +} + +func newDataStore() *dataStore { + newConns := make(chan conn, 64) + subs := make(map[string]chan<- *message) + ds := &dataStore{ + ramTicks: make([]point, 0, maxTicks), + nClientTicks: make([]point, 0, maxTicks), + topDomain: newBufferedRanking(nRankingBuffers), + topHelo: newBufferedRanking(nRankingBuffers), + topIP: newBufferedRanking(nRankingBuffers), + newConns: newConns, + subs: subs, + } + + return ds +} + +// Keeps track of top domain/helo/ip rankings, but buffered into multiple +// maps so that old records can be efficiently kept track of and quickly removed +type bufferedRanking []map[string]int + +func newBufferedRanking(nBuffers int) bufferedRanking { + br := make([]map[string]int, nBuffers) + for i := 0; i < nBuffers; i++ { + br[i] = make(map[string]int) + } + return br +} + +// Manages the list of top clients by domain, helo, and IP by updating buffered +// record maps. At each `rankingUpdateInterval` we shift the maps and remove the +// oldest, so rankings are always at most as old as `maxWindow` +func (ds *dataStore) rankingManager() { + ticker := time.NewTicker(rankingUpdateInterval) + + for { + select { + case c := <-ds.newConns: + nHelos := len(ds.topHelo) + if nHelos > 5 && + float64(nHelos)/float64(ds.nClientsInBuffer) > uniqueHeloRatioMax { + // If too many unique HELO messages are detected as a ratio to the total + // number of clients, quit collecting data until we roll over into the next + // aggregation buffer. + continue + } + ds.lock.Lock() + ds.nClientsInBuffer++ + ds.topDomain[0][c.domain]++ + ds.topHelo[0][c.helo]++ + ds.topIP[0][c.ip]++ + ds.lock.Unlock() + + case <-ticker.C: + ds.lock.Lock() + // Add empty map at index 0 and shift other maps one down + ds.nClientsInBuffer = 0 + ds.topDomain = append( + []map[string]int{{}}, + ds.topDomain[:len(ds.topDomain)-1]...) + ds.topHelo = append( + []map[string]int{{}}, + ds.topHelo[:len(ds.topHelo)-1]...) + ds.topIP = append( + []map[string]int{{}}, + ds.topHelo[:len(ds.topIP)-1]...) + ds.lock.Unlock() + + case <-stopRankingManager: + return + } + } +} + +// Aggregates the rankings from the ranking buffer into a single map +// for each of domain, helo, ip. This is what we send to the frontend. +func (ds *dataStore) aggregateRankings() ranking { + ds.lock.Lock() + defer ds.lock.Unlock() + + topDomain := make(map[string]int, len(ds.topDomain[0])) + topHelo := make(map[string]int, len(ds.topHelo[0])) + topIP := make(map[string]int, len(ds.topIP[0])) + + // Aggregate buffers + for i := 0; i < nRankingBuffers; i++ { + if len(ds.topDomain) > i { + for domain, count := range ds.topDomain[i] { + topDomain[domain] += count + } + } + if len(ds.topHelo) > i { + for helo, count := range ds.topHelo[i] { + topHelo[helo] += count + } + } + if len(ds.topIP) > i { + for ip, count := range ds.topIP[i] { + topIP[ip] += count + } + } + } + + return ranking{ + TopDomain: topDomain, + TopHelo: topHelo, + TopIP: topIP, + } +} + +// Adds a new ram point, removing old points if necessary +func (ds *dataStore) addRAMPoint(p point) { + if len(ds.ramTicks) == int(maxTicks) { + ds.ramTicks = append(ds.ramTicks[1:], p) + } else { + ds.ramTicks = append(ds.ramTicks, p) + } +} + +// Adds a new nClients point, removing old points if necessary +func (ds *dataStore) addNClientPoint(p point) { + if len(ds.nClientTicks) == int(maxTicks) { + ds.nClientTicks = append(ds.nClientTicks[1:], p) + } else { + ds.nClientTicks = append(ds.nClientTicks, p) + } +} + +func (ds *dataStore) subscribe(id string, c chan<- *message) { + ds.subs[id] = c +} + +func (ds *dataStore) unsubscribe(id string) { + delete(ds.subs, id) +} + +func (ds *dataStore) notify(m *message) { + // Prevent concurrent read/write to maps in the store + ds.lock.Lock() + defer ds.lock.Unlock() + for _, c := range ds.subs { + select { + case c <- m: + default: + } + } +} + +// Initiates a session with all historic data in the store +func (ds *dataStore) initSession(sess *session) { + store.subs[sess.id] <- &message{initMessageType, initFrame{ + Ram: store.ramTicks, + NClients: store.nClientTicks, + }} +} + +type point struct { + X time.Time `json:"x"` + Y uint64 `json:"y"` +} + +// Measures RAM and number of connected clients and sends a tick +// message to all connected clients on the given interval +func dataListener(interval time.Duration) { + ticker := time.Tick(interval) + memStats := &runtime.MemStats{} + + for { + select { + case t := <-ticker: + runtime.ReadMemStats(memStats) + ramPoint := point{t, memStats.Alloc} + nClientPoint := point{t, store.nClients.Load()} + mainlog().WithFields(map[string]interface{}{ + "ram": ramPoint.Y, + "clients": nClientPoint.Y, + }).Info("Logging analytics data") + + store.addRAMPoint(ramPoint) + store.addNClientPoint(nClientPoint) + store.notify(&message{tickMessageType, dataFrame{ + Ram: ramPoint, + NClients: nClientPoint, + ranking: store.aggregateRankings(), + }}) + case <-stopDataListener: + return + } + } +} + +// Keeps track of top clients by helo, ip, and domain +type ranking struct { + TopHelo map[string]int `json:"topHelo"` + TopIP map[string]int `json:"topIP"` + TopDomain map[string]int `json:"topDomain"` +} + +type dataFrame struct { + Ram point `json:"ram"` + NClients point `json:"nClients"` + ranking +} + +type initFrame struct { + Ram []point `json:"ram"` + NClients []point `json:"nClients"` + ranking +} + +// Format of messages to be sent over WebSocket +type message struct { + Type string `json:"type"` + Payload interface{} `json:"payload"` +} diff --git a/dashboard/hook.go b/dashboard/hook.go new file mode 100644 index 00000000..de70a219 --- /dev/null +++ b/dashboard/hook.go @@ -0,0 +1,52 @@ +package dashboard + +import ( + log "github.com/sirupsen/logrus" +) + +type logHook int + +func (h logHook) Levels() []log.Level { + return log.AllLevels +} + +// Checks fired logs for information that is relevant to the dashboard +func (h logHook) Fire(e *log.Entry) error { + event, ok := e.Data["event"].(string) + if !ok { + return nil + } + + var helo, ip, domain string + if event == "mailfrom" { + helo, ok = e.Data["helo"].(string) + if !ok { + return nil + } + if len(helo) > 16 { + helo = helo[:16] + } + ip, ok = e.Data["address"].(string) + if !ok { + return nil + } + domain, ok = e.Data["domain"].(string) + if !ok { + return nil + } + } + + switch event { + case "connect": + store.nClients.Add(1) + case "mailfrom": + store.newConns <- conn{ + domain: domain, + helo: helo, + ip: ip, + } + case "disconnect": + store.nClients.Sub(1) + } + return nil +} diff --git a/dashboard/http.go b/dashboard/http.go new file mode 100644 index 00000000..bbdbe923 --- /dev/null +++ b/dashboard/http.go @@ -0,0 +1,57 @@ +package dashboard + +import ( + "io" + "net" + "net/http" + "time" +) + +// ListenAndServeWithClose is a non-blocking listen and serve returning a closer +func ListenAndServeWithClose(addr string, handler http.Handler) (io.Closer, error) { + + var ( + listener net.Listener + srvCloser io.Closer + err error + ) + + srv := &http.Server{Addr: addr, Handler: handler} + + if addr == "" { + addr = ":http" + } + + listener, err = net.Listen("tcp", addr) + if err != nil { + return nil, err + } + + go func() { + err := srv.Serve(tcpKeepAliveListener{listener.(*net.TCPListener)}) + if err != nil { + mainlog().Error("HTTP Server Error - ", err) + } + }() + + srvCloser = listener + return srvCloser, nil +} + +type tcpKeepAliveListener struct { + *net.TCPListener +} + +func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { + tc, err := ln.AcceptTCP() + if err != nil { + return + } + if err = tc.SetKeepAlive(true); err != nil { + return + } + if err = tc.SetKeepAlivePeriod(3 * time.Minute); err != nil { + return + } + return tc, nil +} diff --git a/dashboard/js/.gitignore b/dashboard/js/.gitignore new file mode 100644 index 00000000..6c96c5cf --- /dev/null +++ b/dashboard/js/.gitignore @@ -0,0 +1,15 @@ +# See http://help.github.com/ignore-files/ for more about ignoring files. + +# dependencies +node_modules + +# testing +coverage + +# production +build + +# misc +.DS_Store +.env +npm-debug.log diff --git a/dashboard/js/package.json b/dashboard/js/package.json new file mode 100644 index 00000000..eede669e --- /dev/null +++ b/dashboard/js/package.json @@ -0,0 +1,25 @@ +{ + "name": "guerrilla-dashboard", + "version": "0.1.0", + "private": true, + "devDependencies": { + "react-scripts": "0.8.5" + }, + "dependencies": { + "immutable": "^3.8.1", + "moment": "^2.17.1", + "react": "^15.4.2", + "react-dom": "^15.4.2", + "react-redux": "^5.0.2", + "redux": "^3.6.0", + "redux-logger": "^2.7.4", + "simplify-js": "^1.2.1", + "victory": "^0.15.0" + }, + "scripts": { + "start": "react-scripts start", + "build": "react-scripts build", + "test": "react-scripts test --env=jsdom", + "eject": "react-scripts eject" + } +} diff --git a/dashboard/js/public/favicon.ico b/dashboard/js/public/favicon.ico new file mode 100644 index 00000000..8676dbbe Binary files /dev/null and b/dashboard/js/public/favicon.ico differ diff --git a/dashboard/js/public/index.html b/dashboard/js/public/index.html new file mode 100644 index 00000000..0c5e0deb --- /dev/null +++ b/dashboard/js/public/index.html @@ -0,0 +1,31 @@ + + +
+ + + + +| Rank | +{rankType} | +# Clients | +
|---|---|---|
| {i + 1} | +{record.value} | +{record.count} | +