From 4cb928d3e63cdca5995b9dd5510e9994baa12e24 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Sat, 28 Mar 2026 13:40:40 -0500 Subject: [PATCH] verbs: implement flush all command --- e2e_test.go | 30 +++++++++++++++++++++++++++ verbs.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/e2e_test.go b/e2e_test.go index 54eb063..ebbdc4d 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -508,6 +508,36 @@ func TestE2E_StatsItems(t *testing.T) { must.Positive(t, data[0].MemRequested) } +func TestE2E_Flush(t *testing.T) { + t.Parallel() + + address, done := memctest.LaunchTCP(t, nil) + t.Cleanup(done) + + c := New([]string{address}) + defer ignore.Close(c) + + t.Run("success", func(t *testing.T) { + err := Set(c, "key1", "value1") + must.NoError(t, err) + + v, verr := Get[string](c, "key1") + must.NoError(t, verr) + must.Eq(t, "value1", v) + + err = Flush(c, 0) + must.NoError(t, err) + + _, err = Get[string](c, "key1") + must.ErrorIs(t, err, ErrCacheMiss) + }) + + t.Run("empty cache", func(t *testing.T) { + err := Flush(c, 0) + must.NoError(t, err) + }) +} + func TestE2E_CAS(t *testing.T) { t.Parallel() diff --git a/verbs.go b/verbs.go index 07e4ffe..00b823f 100644 --- a/verbs.go +++ b/verbs.go @@ -689,6 +689,48 @@ func getPayloadWithCAS(r *bufio.Reader) ([]byte, uint64, error) { return payload, cas, nil } +// Flush will delete all items from memcached. +// +// The timeout parameter is optional. A timeout of 0 means flush right now. +// A non-zero timeout will delay the flush by that many seconds. +// +// Note: this operation is performed on a single memcached server, even when +// the Client is configured with multiple server addresses. This is intentional, +// as flush is typically used by local administration tools that connect to a +// single memcached instance. +func Flush(c *Client, timeout time.Duration) error { + return c.do("", func(conn *iopool.Buffer) error { + expiration, err := c.seconds(timeout) + if err != nil { + return err + } + + if _, err := fmt.Fprintf( + conn, + "flush_all %d\r\n", + expiration, + ); err != nil { + return err + } + + if err := conn.Flush(); err != nil { + return err + } + + line, lerr := conn.ReadSlice('\n') + if lerr != nil { + return lerr + } + + switch string(line) { + case "OK\r\n": + return nil + default: + return unexpected(line) + } + }) +} + // Delete will remove the value associated with key from memcached. // // Uses Client c to connect to a memcached instance, and automatically handles @@ -857,6 +899,12 @@ func Decrement[T Countable](c *Client, key string, delta T) (T, error) { return result, err } +// Stats returns runtime statistics for a single memcached server. +// +// Note: this operation is performed on a single memcached server, even when +// the Client is configured with multiple server addresses. This is intentional, +// as stats is typically used by local monitoring tools that connect to a +// single memcached instance. func Stats(c *Client) (*Statistics, error) { var statistics *Statistics @@ -884,6 +932,12 @@ func Stats(c *Client) (*Statistics, error) { return statistics, err } +// StatsSlabs returns slab statistics for a single memcached server. +// +// Note: this operation is performed on a single memcached server, even when +// the Client is configured with multiple server addresses. This is intentional, +// as stats is typically used by local monitoring tools that connect to a +// single memcached instance. func StatsSlabs(c *Client) (*SlabStatistics, error) { var statistics *SlabStatistics @@ -911,6 +965,12 @@ func StatsSlabs(c *Client) (*SlabStatistics, error) { return statistics, err } +// StatsItems returns item statistics for a single memcached server. +// +// Note: this operation is performed on a single memcached server, even when +// the Client is configured with multiple server addresses. This is intentional, +// as stats is typically used by local monitoring tools that connect to a +// single memcached instance. func StatsItems(c *Client) ([]*ItemStatistics, error) { var statistics []*ItemStatistics