Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*.pem
*.generated
cmd/wfx-viewer/wfx-viewer
cmd/wfx-loadtest/wfx-loadtest
/public
/result
/dist
Expand Down
2 changes: 1 addition & 1 deletion .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ public
test/test_helper/**/*
hugo/**/*.html
**/build
ui/priv/**
ui/dist/**
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Breaking

- Omit pagination metadata from responses by default. To restore the previous behavior, append the query parameter `pagination=true` to the `/jobs` and `/workflows` endpoints.

### Added

- Log all SQL queries when `--log-level` is set to `trace`
- wfx-loadtest: add `populate` sub-command to seed a database with sample data

### Removed

- Retry mechanism for storage initialization
- Retry loop when creating network listeners

### Changed

- Distro-compliant package naming
- Querying jobs by tags is about 3x faster (e.g. ~1s instead of ~3s for 1 million jobs)

### Fixed

Expand Down
7 changes: 7 additions & 0 deletions api/wfx.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func (server WfxServer) GetJobs(ctx context.Context, request api.GetJobsRequestO
if request.Params.ParamLimit != nil {
pagination.Limit = *request.Params.ParamLimit
}
if request.Params.ParamPagination != nil {
pagination.ComputeTotal = *request.Params.ParamPagination
}

jobs, err := job.QueryJobs(ctx, server.storage, filter, pagination, (*string)(request.Params.ParamSort))
if err != nil {
Expand Down Expand Up @@ -402,6 +405,10 @@ func (server WfxServer) GetWorkflows(ctx context.Context, request api.GetWorkflo
limit = *request.Params.ParamLimit
}
pagination := persistence.PaginationParams{Offset: offset, Limit: limit}
if request.Params.ParamPagination != nil {
pagination.ComputeTotal = *request.Params.ParamPagination
}

log := logging.LoggerFromCtx(ctx)
workflows, err := workflow.QueryWorkflows(ctx, server.storage, pagination, (*string)(request.Params.ParamSort))
if err != nil {
Expand Down
48 changes: 48 additions & 0 deletions cmd/wfx-loadtest/cmd/populate/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package populate

import (
"fmt"
"runtime"
"slices"
"strings"

"github.com/Southclaws/fault"
"github.com/knadh/koanf/v2"
"github.com/siemens/wfx/cmd/wfx/cmd/config"
"github.com/siemens/wfx/persistence"
"github.com/spf13/cobra"
)

var (
flagCount = "count"
flagWorkers = "workers"
)

func NewCommand(k *koanf.Koanf) *cobra.Command {
cmd := &cobra.Command{
Use: "populate",
Short: "Populate database with jobs",
RunE: func(cmd *cobra.Command, args []string) error {
return fault.Wrap(run(cmd, k))
},
}

f := cmd.Flags()
f.Int(flagCount, 1000, "number of jobs to create")
f.Int(flagWorkers, runtime.NumCPU(), "number of concurrent workers")

supportedStorages := persistence.Storages()
defaultStorage := supportedStorages[0]
if slices.Index(supportedStorages, config.PreferedStorage) != -1 {
defaultStorage = config.PreferedStorage
}
f.String(config.StorageFlag, defaultStorage, fmt.Sprintf("persistence storage. one of: [%s]", strings.Join(supportedStorages, ", ")))

var storageOpts string
if defaultStorage == config.PreferedStorage {
storageOpts = config.SqliteDefaultOpts
}
f.String(config.StorageOptFlag, storageOpts, "storage options")

return cmd
}
84 changes: 84 additions & 0 deletions cmd/wfx-loadtest/cmd/populate/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package populate

import (
"fmt"
"math/rand"
"time"

"github.com/Southclaws/fault"
"github.com/knadh/koanf/v2"
"github.com/rs/zerolog/log"
"github.com/siemens/wfx/cmd/wfx/cmd/config"
"github.com/siemens/wfx/generated/api"
"github.com/siemens/wfx/internal/handler/job/definition"
"github.com/siemens/wfx/workflow/dau"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)

func run(cmd *cobra.Command, k *koanf.Koanf) error {
ctx := cmd.Context()

appConfig, err := config.NewAppConfig(cmd.Flags())
if err != nil {
return fault.Wrap(err)
}

storage, err := appConfig.InitStorage()
if err != nil {
return fault.Wrap(err)
}

wf := dau.DirectWorkflow()
wfExisting, err := storage.GetWorkflow(ctx, wf.Name)
if err != nil {
return fault.Wrap(err)
}
if wfExisting == nil {
if _, err := storage.CreateWorkflow(ctx, wf); err != nil {
return fault.Wrap(err)
}
log.Info().Str("name", wf.Name).Msg("Created workflow")
} else {
log.Info().Str("name", wf.Name).Msg("Worfklow already exists")
}

count := k.Int(flagCount)
workers := k.Int(flagWorkers)
log.Info().Int("count", count).Int("workers", workers).Msg("Populating storage with jobs")

tags := []string{"EUROPE_WEST", "loadtest"}

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(workers)
for i := range count {
g.Go(func() error {
now := time.Now()
clientID := fmt.Sprintf("LOAD-%d", i)
state := wf.States[rand.Intn(len(wf.States))] // pick random state
job := api.Job{
ClientID: clientID,
Workflow: wf,
Mtime: &now,
Stime: &now,
Status: &api.JobStatus{
ClientID: clientID,
State: state.Name,
},
Definition: map[string]any{},
Tags: &tags,
History: &[]api.History{},
}
job.Status.DefinitionHash = definition.Hash(&job)
_, err := storage.CreateJob(ctx, &job)
return fault.Wrap(err)
})
}
if err := g.Wait(); err != nil {
return fault.Wrap(err)
}

log.Info().Int("count", count).Msg("Finished populating storage with jobs")

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/knadh/koanf/v2"
"github.com/rs/zerolog/log"
"github.com/siemens/wfx/cmd/wfx-loadtest/wfx"
"github.com/siemens/wfx/cmd/wfxctl/flags"
"github.com/siemens/wfx/generated/api"
"github.com/siemens/wfx/workflow/dau"
vegeta "github.com/tsenart/vegeta/v12/lib"
Expand All @@ -30,10 +31,6 @@ const (
// Threshold of data points above which series are downsampled.
threshold = 4000

HostFlag = "host"
PortFlag = "port"
MgmtHostFlag = "mgmt-host"
MgmtPortFlag = "mgmt-port"
ReadFreqFlag = "read-freq"
WriteFreqFlag = "write-freq"
DurationFlag = "duration"
Expand All @@ -53,10 +50,10 @@ var (
)

func Run(k *koanf.Koanf) error {
host = k.String(HostFlag)
port = k.Int(PortFlag)
mgmtHost = k.String(MgmtHostFlag)
mgmtPort = k.Int(MgmtPortFlag)
host = k.String(flags.ClientHostFlag)
port = k.Int(flags.ClientPortFlag)
mgmtHost = k.String(flags.MgmtHostFlag)
mgmtPort = k.Int(flags.MgmtPortFlag)

if host == "" || mgmtHost == "" {
return errors.New("host or mgmtHost not set")
Expand Down Expand Up @@ -84,9 +81,7 @@ func Run(k *koanf.Koanf) error {

readerResultChan := make(chan vegeta.Result)
readerDoneChan := make(chan any)
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
readTargeter := vegeta.NewStaticTargeter(
vegeta.Target{
Method: http.MethodGet,
Expand All @@ -103,14 +98,11 @@ func Run(k *koanf.Koanf) error {
readerResultChan <- *res
}
readerDoneChan <- nil
}()
})

writerResultChan := make(chan vegeta.Result)
writerDoneChan := make(chan any)
wg.Add(1)
go func() {
defer wg.Done()

wg.Go(func() {
attacker := newAttacker()
for res := range attacker.Attack(writeTargeter, writeRate, duration, "Generate and update jobs") {
// forward result to reporter
Expand Down Expand Up @@ -151,7 +143,7 @@ func Run(k *koanf.Koanf) error {

}
writerDoneChan <- nil
}()
})

var metrics vegeta.Metrics
p := plot.New(
Expand All @@ -160,10 +152,8 @@ func Run(k *koanf.Koanf) error {
plot.Label(plot.ErrorLabeler),
)

wg.Add(1)
go func() {
wg.Go(func() {
// collect results
defer wg.Done()

doneCounter := 0
for doneCounter < 2 {
Expand All @@ -182,7 +172,7 @@ func Run(k *koanf.Koanf) error {
}
metrics.Close()
p.Close()
}()
})

wg.Wait()
if err := dumpResults(&metrics, p); err != nil {
Expand Down
12 changes: 7 additions & 5 deletions cmd/wfx-loadtest/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/knadh/koanf/v2"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/siemens/wfx/cmd/wfx-loadtest/cmd/populate"
"github.com/siemens/wfx/cmd/wfx-loadtest/loadtest"
"github.com/siemens/wfx/cmd/wfxctl/flags"
"github.com/siemens/wfx/internal/cmd/man"
Expand All @@ -32,7 +33,7 @@ func NewCommand() *cobra.Command {
Use: "wfx-loadtest",
Short: "Run a loadtest against wfx",
Example: "wfx-loadtest --duration 10s",
PreRun: func(cmd *cobra.Command, _ []string) {
PersistentPreRun: func(cmd *cobra.Command, _ []string) {
envProvider := env.Provider(".", env.Opt{
Prefix: "WFX_",
TransformFunc: func(k string, v string) (string, any) {
Expand Down Expand Up @@ -62,12 +63,13 @@ func NewCommand() *cobra.Command {
},
}
cmd.AddCommand(man.NewCommand())
cmd.AddCommand(populate.NewCommand(k))
f := cmd.PersistentFlags()

f.String(loadtest.HostFlag, "localhost", "host")
f.Int(loadtest.PortFlag, 8080, "port")
f.String(loadtest.MgmtHostFlag, "localhost", "management host")
f.Int(loadtest.MgmtPortFlag, 8081, "management port")
f.String(flags.ClientHostFlag, "localhost", "host")
f.Int(flags.ClientPortFlag, 8080, "port")
f.String(flags.MgmtHostFlag, "localhost", "management host")
f.Int(flags.MgmtPortFlag, 8081, "management port")

f.String(flags.LogLevelFlag, "info", fmt.Sprintf("set log level. one of: %s,%s,%s,%s,%s,%s,%s",
zerolog.TraceLevel.String(),
Expand Down
22 changes: 20 additions & 2 deletions cmd/wfx/cmd/config/appconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/knadh/koanf/v2"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/siemens/wfx/persistence"
"github.com/spf13/pflag"
)

Expand Down Expand Up @@ -137,7 +138,7 @@ func NewAppConfig(flags *pflag.FlagSet) (*AppConfig, error) {

// start watching config
for _, fp := range fileProviders {
if err := fp.Watch(func(_ interface{}, err error) {
if err := fp.Watch(func(_ any, err error) {
if err != nil {
return
}
Expand Down Expand Up @@ -187,7 +188,7 @@ func (cfg *AppConfig) StorageOptions() string {
storageOpt := cfg.flags.Lookup(StorageOptFlag)
changed := storageOpt != nil && storageOpt.Changed
// do not return SQLite options for non-SQLite backends
if name != preferedStorage && (!changed || cfg.storageOpts == sqliteDefaultOpts) {
if name != PreferedStorage && (!changed || cfg.storageOpts == SqliteDefaultOpts) {
return ""
}
return cfg.storageOpts
Expand Down Expand Up @@ -418,3 +419,20 @@ func (cfg *AppConfig) SSEGraceInterval() time.Duration {
defer cfg.mutex.RUnlock()
return cfg.sseGraceInterval
}

func (cfg *AppConfig) InitStorage() (persistence.Storage, error) {
name, options := cfg.Storage(), cfg.StorageOptions()
log.Debug().Str("name", name).Str("options", options).Msg("Setting up persistent storage")

// note: storage is shared between north- and southbound API
storage := persistence.GetStorage(name)
if storage == nil {
return nil, fmt.Errorf("unknown storage %s", name)
Comment thread
michaeladler marked this conversation as resolved.
}
log.Debug().Str("name", name).Msg("Initializing storage")
if err := storage.Initialize(options); err != nil {
return nil, fault.Wrap(err)
}
log.Info().Str("name", name).Msg("Initialized storage")
return storage, nil
}
10 changes: 7 additions & 3 deletions cmd/wfx/cmd/config/appconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ import (

func TestNewAppConfig(t *testing.T) {
cfg, err := NewAppConfig(NewFlagset())
defer cfg.Stop()
require.NoError(t, err)
t.Cleanup(cfg.Stop)

// call all methods which do not accept arguments
structValue := reflect.ValueOf(cfg)
for i := 0; i < structValue.NumMethod(); i++ {
method := structValue.Method(i)
methodType := method.Type()
methodName := structValue.Type().Method(i).Name
if methodType.NumIn() == 0 {
t.Run(methodType.Name(), func(*testing.T) {
t.Run(methodName, func(*testing.T) {
if methodName == "InitStorage" {
return
}
_ = method.Call([]reflect.Value{})
})
}
Expand Down Expand Up @@ -70,7 +74,7 @@ func TestReload(t *testing.T) {
_, _ = cfgFile.Write([]byte("log-level: error"))
}

for i := 0; i < 500; i++ {
for range 500 {
if zerolog.GlobalLevel() == zerolog.ErrorLevel {
break
}
Expand Down
Loading
Loading