Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ document.addEventListener('DOMContentLoaded', function() {
}

func NewApplicationHandler(a *types.APIOptions) (*ApplicationHandler, error) {
if a == nil {
return nil, fmt.Errorf("api options is required")
}
ensureAPIRepositories(a)

appHandler := &ApplicationHandler{A: a}

cfg, err := config.Get()
Expand Down Expand Up @@ -806,8 +811,21 @@ func (a *ApplicationHandler) mountControlPlaneRoutes(router chi.Router, handler

if a.A.Licenser.AsynqMonitoring() {
router.Route("/queue", func(asynqRouter chi.Router) {
asynqRouter.Use(middleware.RequireAuth(handler.A.Logger))
asynqRouter.Handle("/monitoring/*", a.A.Queue.(*redisqueue.RedisQueue).Monitor())
asynqRouter.Group(func(sessionRouter chi.Router) {
sessionRouter.Use(middleware.RequireAuth(handler.A.Logger))
sessionRouter.Post("/monitoring/session", handler.CreateQueueMonitoringSession)
sessionRouter.Delete("/monitoring/session", handler.RevokeQueueMonitoringSession)
})

rq := a.A.Queue.(*redisqueue.RedisQueue)
asynqRouter.Group(func(embedRouter chi.Router) {
embedRouter.Use(middleware.RequireQueueSessionCookie(handlers.ValidateQueueSessionCookie(handler.A.Cache)))
embedRouter.Handle("/monitoring/embed/*", rq.MonitorWithRootPath("/queue/monitoring/embed"))
})
asynqRouter.Group(func(monitorRouter chi.Router) {
monitorRouter.Use(middleware.RequireAuth(handler.A.Logger))
monitorRouter.Handle("/monitoring/*", rq.Monitor())
})
})
}

Expand Down
25 changes: 25 additions & 0 deletions api/api_options_repositories.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package api

import (
"github.com/frain-dev/convoy/api/types"
"github.com/frain-dev/convoy/internal/organisation_members"
"github.com/frain-dev/convoy/internal/organisations"
"github.com/frain-dev/convoy/internal/projects"
)

// ensureAPIRepositories wires organisation and project repositories when the caller
// omitted them but provided DB and Logger (e.g. cmd/server or dataplane startup).
func ensureAPIRepositories(a *types.APIOptions) {
if a == nil || a.DB == nil || a.Logger == nil {
return
}
if a.OrgRepo == nil {
a.OrgRepo = organisations.New(a.Logger, a.DB)
}
if a.OrgMemberRepo == nil {
a.OrgMemberRepo = organisation_members.New(a.Logger, a.DB)
}
if a.ProjectRepo == nil {
a.ProjectRepo = projects.New(a.Logger, a.DB)
}
}
66 changes: 66 additions & 0 deletions api/api_options_repositories_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package api

import (
"log/slog"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/frain-dev/convoy/api/types"
"github.com/frain-dev/convoy/mocks"
"github.com/frain-dev/convoy/pkg/logger"
)

func TestEnsureAPIRepositories_setsReposWhenNil(t *testing.T) {
ctrl := gomock.NewController(t)
db := mocks.NewMockDatabase(ctrl)
db.EXPECT().GetConn().Return(nil).AnyTimes()
db.EXPECT().GetHook().Return(nil).AnyTimes()

lo := logger.New("test", slog.LevelError)

a := &types.APIOptions{
DB: db,
Logger: lo,
}

ensureAPIRepositories(a)

require.NotNil(t, a.OrgRepo, "OrgRepo must be set when DB and Logger are present")
require.NotNil(t, a.OrgMemberRepo, "OrgMemberRepo must be set when DB and Logger are present")
require.NotNil(t, a.ProjectRepo, "ProjectRepo must be set when DB and Logger are present")
}

func TestEnsureAPIRepositories_preservesExplicitRepos(t *testing.T) {
ctrl := gomock.NewController(t)
db := mocks.NewMockDatabase(ctrl)
db.EXPECT().GetConn().Return(nil).AnyTimes()
db.EXPECT().GetHook().Return(nil).AnyTimes()

explicitOrg := mocks.NewMockOrganisationRepository(ctrl)
lo := logger.New("test", slog.LevelError)

a := &types.APIOptions{
DB: db,
Logger: lo,
OrgRepo: explicitOrg,
}

ensureAPIRepositories(a)

require.Same(t, explicitOrg, a.OrgRepo)
require.NotNil(t, a.OrgMemberRepo)
require.NotNil(t, a.ProjectRepo)
}

func TestEnsureAPIRepositories_noOpWithoutDB(t *testing.T) {
lo := logger.New("test", slog.LevelError)
a := &types.APIOptions{Logger: lo}

ensureAPIRepositories(a)

require.Nil(t, a.OrgRepo)
require.Nil(t, a.OrgMemberRepo)
require.Nil(t, a.ProjectRepo)
}
172 changes: 172 additions & 0 deletions api/handlers/queue_monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package handlers

import (
"context"
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/go-chi/render"

"github.com/frain-dev/convoy/cache"
"github.com/frain-dev/convoy/config"
"github.com/frain-dev/convoy/util"
)

const (
queueMonitoringCookieName = "convoy_queue_monitoring_session"
// Cookie must include /queue/monitoring/session so revoke can receive and invalidate it server-side.
queueMonitoringCookiePath = "/queue/monitoring"
queueMonitoringCookieTTL = 15 * time.Minute
revokedKeyPrefix = "convoy:queue_session:revoked:"
)

var (
cookieSigningKey []byte
cookieSigningKeyOnce sync.Once
)

func shouldSetSecureCookie(r *http.Request) bool {
if r.TLS != nil {
return true
}

cfg, err := config.Get()
if err != nil {
return false
}

return !strings.EqualFold(cfg.Environment, "development")
}

func getCookieSigningKey() []byte {
cookieSigningKeyOnce.Do(func() {
cookieSigningKey = make([]byte, 32)
if _, err := rand.Read(cookieSigningKey); err != nil {
panic(fmt.Sprintf("failed to generate cookie signing key: %v", err))
}
})
return cookieSigningKey
}

func signCookieValue(expiry time.Time) string {
payload := fmt.Sprintf("%d", expiry.Unix())
mac := hmac.New(sha256.New, getCookieSigningKey())
mac.Write([]byte(payload))
sig := hex.EncodeToString(mac.Sum(nil))
return fmt.Sprintf("%s.%s", payload, sig)
}

func parseAndVerifyCookie(cookieValue string) (expiryUnix int64, ok bool) {
dotIdx := -1
for i, c := range cookieValue {
if c == '.' {
dotIdx = i
break
}
}
if dotIdx < 0 {
return 0, false
}
expiryStr := cookieValue[:dotIdx]
sig := cookieValue[dotIdx+1:]

mac := hmac.New(sha256.New, getCookieSigningKey())
mac.Write([]byte(expiryStr))
expectedSig := hex.EncodeToString(mac.Sum(nil))
if !hmac.Equal([]byte(sig), []byte(expectedSig)) {
return 0, false
}

if _, err := fmt.Sscanf(expiryStr, "%d", &expiryUnix); err != nil {
return 0, false
}
if time.Now().Unix() >= expiryUnix {
return 0, false
}
return expiryUnix, true
}

// ValidateQueueSessionCookie checks HMAC signature, expiry, and Redis revocation list.
func ValidateQueueSessionCookie(c cache.Cache) func(string) bool {
return func(cookieValue string) bool {
if _, ok := parseAndVerifyCookie(cookieValue); !ok {
return false
}

var revoked bool
key := revokedKeyPrefix + cookieValue
if err := c.Get(context.Background(), key, &revoked); err == nil && revoked {
return false
}

return true
}
}

func (h *Handler) requireInstanceAdmin(w http.ResponseWriter, r *http.Request) bool {
user, err := h.retrieveUser(r)
if err != nil {
_ = render.Render(w, r, util.NewErrorResponse("unauthorized", http.StatusUnauthorized))
return false
}

isAdmin, err := h.A.OrgMemberRepo.HasInstanceAdminAccess(r.Context(), user.UID)
if err != nil || !isAdmin {
_ = render.Render(w, r, util.NewErrorResponse("instance admin access required", http.StatusForbidden))
return false
}
return true
}

func (h *Handler) CreateQueueMonitoringSession(w http.ResponseWriter, r *http.Request) {
if !h.requireInstanceAdmin(w, r) {
return
}

expiry := time.Now().Add(queueMonitoringCookieTTL)
value := signCookieValue(expiry)

http.SetCookie(w, &http.Cookie{
Name: queueMonitoringCookieName,
Value: value,
Path: queueMonitoringCookiePath,
Expires: expiry,
MaxAge: int(queueMonitoringCookieTTL.Seconds()),
HttpOnly: true,
Secure: shouldSetSecureCookie(r),
SameSite: http.SameSiteLaxMode,
})

Check warning

Code scanning / CodeQL

Cookie 'Secure' attribute is not set to true Medium

Cookie does not set Secure attribute to true.
Comment on lines +136 to +145

_ = render.Render(w, r, util.NewServerResponse("queue monitoring session created", nil, http.StatusOK))
}

func (h *Handler) RevokeQueueMonitoringSession(w http.ResponseWriter, r *http.Request) {
if cookie, err := r.Cookie(queueMonitoringCookieName); err == nil {
if expiryUnix, ok := parseAndVerifyCookie(cookie.Value); ok {
remaining := time.Until(time.Unix(expiryUnix, 0))
if remaining > 0 {
key := revokedKeyPrefix + cookie.Value
_ = h.A.Cache.Set(r.Context(), key, true, remaining)
}
}
}

http.SetCookie(w, &http.Cookie{
Name: queueMonitoringCookieName,
Value: "",
Path: queueMonitoringCookiePath,
MaxAge: -1,
HttpOnly: true,
Secure: shouldSetSecureCookie(r),
SameSite: http.SameSiteLaxMode,
})

Check warning

Code scanning / CodeQL

Cookie 'Secure' attribute is not set to true Medium

Cookie does not set Secure attribute to true.
Comment on lines +161 to +169

_ = render.Render(w, r, util.NewServerResponse("queue monitoring session revoked", nil, http.StatusOK))
}
4 changes: 2 additions & 2 deletions api/ui/build/index.html

Large diffs are not rendered by default.

Loading
Loading