diff --git a/package-lock.json b/package-lock.json index 6f7cbb8..bfff86e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,7 @@ "license": "Apache-2.0", "dependencies": { "@emotion/css": "11.10.6", + "@grafana/async-query-data": "^0.4.2", "@grafana/data": "^11.5.3", "@grafana/runtime": "^11.5.3", "@grafana/schema": "^11.5.3", @@ -1150,6 +1151,22 @@ "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", "license": "0BSD" }, + "node_modules/@grafana/async-query-data": { + "version": "0.4.2", + "resolved": "https://registry.npmjs.org/@grafana/async-query-data/-/async-query-data-0.4.2.tgz", + "integrity": "sha512-cNcOnH4+yshXA+Igjig/v/MQhpWoNo1vGb/Xthgu+ihIsZ2P6rVM/tCuM7/LquE3qFnXRSc35HXenjr0JRfBvQ==", + "license": "Apache-2.0", + "dependencies": { + "semver": "^7.6.3", + "tslib": "^2.8.0" + } + }, + "node_modules/@grafana/async-query-data/node_modules/tslib": { + "version": "2.8.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", + "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", + "license": "0BSD" + }, "node_modules/@grafana/data": { "version": "11.6.7", "resolved": "https://registry.npmjs.org/@grafana/data/-/data-11.6.7.tgz", diff --git a/package.json b/package.json index f8a0243..9fab469 100644 --- a/package.json +++ b/package.json @@ -80,6 +80,7 @@ }, "dependencies": { "@emotion/css": "11.10.6", + "@grafana/async-query-data": "^0.4.2", "@grafana/data": "^11.5.3", "@grafana/runtime": "^11.5.3", "@grafana/schema": "^11.5.3", diff --git a/pkg/plugin/async_query.go b/pkg/plugin/async_query.go new file mode 100644 index 0000000..072db11 --- /dev/null +++ b/pkg/plugin/async_query.go @@ -0,0 +1,283 @@ +package plugin + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "github.com/google/uuid" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/log" + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +const ( + asyncJobStatusStarted = "started" + asyncJobStatusRunning = "running" + asyncJobStatusComplete = "complete" + asyncJobStatusError = "error" + + asyncJobTTL = 10 * time.Minute + asyncJobReapTick = 1 * time.Minute + + // debugAsyncDelay adds an artificial delay before executing async queries. + // Set to e.g. 30*time.Second to manually test polling, then set back to 0. + debugAsyncDelay = 5 * time.Second // TODO: SET BACK TO ZERO BEFORE RELEASE + +) + +// asyncJob represents an in-flight or completed async query. +type asyncJob struct { + Status string + Frames []*data.Frame + Error string + CreatedAt time.Time + cancel context.CancelFunc +} + +// asyncJobStore manages async query jobs with TTL-based cleanup. +type asyncJobStore struct { + mu sync.RWMutex + jobs map[string]*asyncJob + done chan struct{} +} + +func newAsyncJobStore() *asyncJobStore { + s := &asyncJobStore{ + jobs: make(map[string]*asyncJob), + done: make(chan struct{}), + } + go s.reapLoop() + return s +} + +func (s *asyncJobStore) reapLoop() { + ticker := time.NewTicker(asyncJobReapTick) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.mu.Lock() + now := time.Now() + for id, job := range s.jobs { + if now.Sub(job.CreatedAt) > asyncJobTTL { + if job.cancel != nil { + job.cancel() + } + delete(s.jobs, id) + log.DefaultLogger.Debug("async job reaped", "id", id) + } + } + s.mu.Unlock() + case <-s.done: + return + } + } +} + +func (s *asyncJobStore) stop() { + close(s.done) +} + +func (s *asyncJobStore) create(cancel context.CancelFunc) string { + id := uuid.New().String() + s.mu.Lock() + s.jobs[id] = &asyncJob{ + Status: asyncJobStatusStarted, + CreatedAt: time.Now(), + cancel: cancel, + } + s.mu.Unlock() + return id +} + +func (s *asyncJobStore) get(id string) (*asyncJob, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + job, ok := s.jobs[id] + return job, ok +} + +func (s *asyncJobStore) complete(id string, frames []*data.Frame) { + s.mu.Lock() + defer s.mu.Unlock() + if job, ok := s.jobs[id]; ok { + job.Status = asyncJobStatusComplete + job.Frames = frames + } +} + +func (s *asyncJobStore) fail(id string, errMsg string) { + s.mu.Lock() + defer s.mu.Unlock() + if job, ok := s.jobs[id]; ok { + job.Status = asyncJobStatusError + job.Error = errMsg + } +} + +func (s *asyncJobStore) cancel(id string) bool { + s.mu.Lock() + defer s.mu.Unlock() + job, ok := s.jobs[id] + if !ok { + return false + } + if job.cancel != nil { + job.cancel() + } + delete(s.jobs, id) + return true +} + +// asyncCustomMeta is the frame metadata format expected by @grafana/async-query-data. +// The library checks for meta.custom.queryID and meta.custom.status on each frame. +type asyncCustomMeta struct { + QueryID string `json:"queryID"` + Status string `json:"status"` +} + +// asyncStatusFrame builds an empty frame with the async status metadata that +// DatasourceWithAsyncBackend's requestLooper inspects to decide whether to keep polling. +func asyncStatusFrame(refID string, queryID string, status string) *data.Frame { + frame := data.NewFrame("async-status") + frame.RefID = refID + frame.Meta = &data.FrameMeta{ + Custom: asyncCustomMeta{ + QueryID: queryID, + Status: status, + }, + } + return frame +} + +// handleAsyncQuery is called from QueryData. It inspects the query JSON for a +// "queryID" field to determine whether this is an initial request or a poll. +// +// Initial request (no queryID): starts the query in a goroutine, returns a frame +// with status "started". +// +// Poll request (has queryID): checks job status and returns: +// - status "running" frame if still in progress +// - the actual data frames if complete +// - an error response if the job failed or was not found +func (d *SiftDatasource) handleAsyncQuery(pCtx backend.PluginContext, q backend.DataQuery, fqm queryModel) backend.DataResponse { + // Check if this query has a queryID (i.e. it's a poll request from the requestLooper) + var queryMeta struct { + QueryID string `json:"queryID"` + } + _ = json.Unmarshal(q.JSON, &queryMeta) + + if queryMeta.QueryID != "" { + return d.pollAsyncQuery(q.RefID, queryMeta.QueryID) + } + + return d.startAsyncQuery(pCtx, q, fqm) +} + +// startAsyncQuery kicks off the query in a background goroutine and immediately +// returns a frame with status "started" and the new queryID. +func (d *SiftDatasource) startAsyncQuery(pCtx backend.PluginContext, q backend.DataQuery, fqm queryModel) backend.DataResponse { + jobCtx, jobCancel := context.WithCancel(context.Background()) + queryID := d.asyncJobs.create(jobCancel) + + log.DefaultLogger.Debug("async query started", "queryId", queryID, "refId", q.RefID) + + go func() { + defer func() { + if r := recover(); r != nil { + d.asyncJobs.fail(queryID, fmt.Sprintf("panic: %v", r)) + log.DefaultLogger.Error("async query panic", "queryId", queryID, "error", r) + } + }() + + // Check if cancelled before starting + select { + case <-jobCtx.Done(): + d.asyncJobs.fail(queryID, "query cancelled") + return + default: + } + + // Apply debug delay if configured (for manual testing of async polling) + if debugAsyncDelay > 0 { + log.DefaultLogger.Debug("async query sleeping for debug delay", "queryId", queryID, "delay", debugAsyncDelay) + select { + case <-time.After(debugAsyncDelay): + case <-jobCtx.Done(): + d.asyncJobs.fail(queryID, "query cancelled during debug delay") + return + } + } + + res := d.query(pCtx, q, fqm) + if res.Error != nil { + d.asyncJobs.fail(queryID, res.Error.Error()) + return + } + + for _, frame := range res.Frames { + frame.RefID = q.RefID + } + + d.asyncJobs.complete(queryID, res.Frames) + log.DefaultLogger.Debug("async query complete", "queryId", queryID, "frames", len(res.Frames)) + }() + + return backend.DataResponse{ + Frames: data.Frames{asyncStatusFrame(q.RefID, queryID, asyncJobStatusStarted)}, + } +} + +// pollAsyncQuery checks the status of a running async job and returns the +// appropriate response for the requestLooper. +func (d *SiftDatasource) pollAsyncQuery(refID string, queryID string) backend.DataResponse { + job, ok := d.asyncJobs.get(queryID) + if !ok { + return backend.ErrDataResponse(backend.StatusBadRequest, fmt.Sprintf("async query not found: %s", queryID)) + } + + switch job.Status { + case asyncJobStatusComplete: + return backend.DataResponse{ + Frames: job.Frames, + } + case asyncJobStatusError: + return backend.ErrDataResponse(backend.StatusInternal, job.Error) + default: + // Still running — return a status frame so the requestLooper keeps polling + return backend.DataResponse{ + Frames: data.Frames{asyncStatusFrame(refID, queryID, asyncJobStatusRunning)}, + } + } +} + +// callAsyncQueryCancel handles the "cancel" CallResource endpoint. +// DatasourceWithAsyncBackend calls postResource('cancel', { queryId }) on unsubscribe. +func (d *SiftDatasource) callAsyncQueryCancel(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + var body struct { + QueryID string `json:"queryId"` + } + if err := json.Unmarshal(req.Body, &body); err != nil || body.QueryID == "" { + return sender.Send(&backend.CallResourceResponse{ + Status: http.StatusBadRequest, + Body: []byte(`{"error":"missing queryId"}`), + }) + } + + if d.asyncJobs.cancel(body.QueryID) { + log.DefaultLogger.Debug("async query cancelled", "queryId", body.QueryID) + return sender.Send(&backend.CallResourceResponse{ + Status: http.StatusOK, + Body: []byte(`{"status":"cancelled"}`), + }) + } + + return sender.Send(&backend.CallResourceResponse{ + Status: http.StatusNotFound, + Body: []byte(`{"error":"query not found"}`), + }) +} diff --git a/pkg/plugin/async_query_test.go b/pkg/plugin/async_query_test.go new file mode 100644 index 0000000..a93312d --- /dev/null +++ b/pkg/plugin/async_query_test.go @@ -0,0 +1,506 @@ +package plugin + +import ( + "context" + "encoding/json" + "net/http" + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// --------------------------------------------------------------------------- +// asyncJobStore unit tests +// --------------------------------------------------------------------------- + +func TestAsyncJobStore_CreateAndGet(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + _, cancel := context.WithCancel(context.Background()) + id := store.create(cancel) + + require.NotEmpty(t, id) + + job, ok := store.get(id) + require.True(t, ok) + assert.Equal(t, asyncJobStatusStarted, job.Status) + assert.WithinDuration(t, time.Now(), job.CreatedAt, 2*time.Second) +} + +func TestAsyncJobStore_GetNonExistent(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + _, ok := store.get("does-not-exist") + assert.False(t, ok) +} + +func TestAsyncJobStore_Complete(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + _, cancel := context.WithCancel(context.Background()) + id := store.create(cancel) + + frames := []*data.Frame{data.NewFrame("test-frame")} + store.complete(id, frames) + + job, ok := store.get(id) + require.True(t, ok) + assert.Equal(t, asyncJobStatusComplete, job.Status) + require.Len(t, job.Frames, 1) + assert.Equal(t, "test-frame", job.Frames[0].Name) +} + +func TestAsyncJobStore_Fail(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + _, cancel := context.WithCancel(context.Background()) + id := store.create(cancel) + + store.fail(id, "something went wrong") + + job, ok := store.get(id) + require.True(t, ok) + assert.Equal(t, asyncJobStatusError, job.Status) + assert.Equal(t, "something went wrong", job.Error) +} + +func TestAsyncJobStore_Cancel(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ctx, cancel := context.WithCancel(context.Background()) + id := store.create(cancel) + + // Cancel should return true and remove the job + ok := store.cancel(id) + assert.True(t, ok) + + // Context should be cancelled + select { + case <-ctx.Done(): + // expected + default: + t.Fatal("expected context to be cancelled") + } + + // Job should be gone + _, found := store.get(id) + assert.False(t, found) +} + +func TestAsyncJobStore_CancelNonExistent(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ok := store.cancel("does-not-exist") + assert.False(t, ok) +} + +func TestAsyncJobStore_CompleteNonExistent(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + // Should not panic + store.complete("does-not-exist", nil) + store.fail("does-not-exist", "error") +} + +// --------------------------------------------------------------------------- +// asyncStatusFrame tests +// --------------------------------------------------------------------------- + +func TestAsyncStatusFrame(t *testing.T) { + frame := asyncStatusFrame("A", "query-123", "started") + + assert.Equal(t, "async-status", frame.Name) + assert.Equal(t, "A", frame.RefID) + require.NotNil(t, frame.Meta) + + custom, ok := frame.Meta.Custom.(asyncCustomMeta) + require.True(t, ok) + assert.Equal(t, "query-123", custom.QueryID) + assert.Equal(t, "started", custom.Status) +} + +func TestAsyncStatusFrame_JSONSerialization(t *testing.T) { + frame := asyncStatusFrame("B", "q-456", "running") + + customBytes, err := json.Marshal(frame.Meta.Custom) + require.NoError(t, err) + + var parsed map[string]string + err = json.Unmarshal(customBytes, &parsed) + require.NoError(t, err) + + assert.Equal(t, "q-456", parsed["queryID"]) + assert.Equal(t, "running", parsed["status"]) +} + +// --------------------------------------------------------------------------- +// pollAsyncQuery tests +// --------------------------------------------------------------------------- + +func TestPollAsyncQuery_Running(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + _, cancel := context.WithCancel(context.Background()) + id := store.create(cancel) + + resp := ds.pollAsyncQuery("A", id) + require.Nil(t, resp.Error) + require.Len(t, resp.Frames, 1) + + custom, ok := resp.Frames[0].Meta.Custom.(asyncCustomMeta) + require.True(t, ok) + assert.Equal(t, asyncJobStatusRunning, custom.Status) + assert.Equal(t, id, custom.QueryID) +} + +func TestPollAsyncQuery_Complete(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + _, cancel := context.WithCancel(context.Background()) + id := store.create(cancel) + + resultFrame := data.NewFrame("result") + resultFrame.Fields = append(resultFrame.Fields, data.NewField("time", nil, []time.Time{time.Now()})) + store.complete(id, []*data.Frame{resultFrame}) + + resp := ds.pollAsyncQuery("A", id) + require.Nil(t, resp.Error) + require.Len(t, resp.Frames, 1) + assert.Equal(t, "result", resp.Frames[0].Name) +} + +func TestPollAsyncQuery_Error(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + _, cancel := context.WithCancel(context.Background()) + id := store.create(cancel) + store.fail(id, "query timeout") + + resp := ds.pollAsyncQuery("A", id) + require.NotNil(t, resp.Error) + assert.Contains(t, resp.Error.Error(), "query timeout") +} + +func TestPollAsyncQuery_NotFound(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + resp := ds.pollAsyncQuery("A", "nonexistent-id") + require.NotNil(t, resp.Error) + assert.Contains(t, resp.Error.Error(), "async query not found") +} + +// --------------------------------------------------------------------------- +// handleAsyncQuery routing tests +// --------------------------------------------------------------------------- + +func TestHandleAsyncQuery_RoutesToPoll_WhenQueryIDPresent(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + _, cancel := context.WithCancel(context.Background()) + id := store.create(cancel) + store.complete(id, []*data.Frame{data.NewFrame("done")}) + + queryJSON, _ := json.Marshal(map[string]interface{}{ + "queryID": id, + "queryVersion": "2.1", + "refId": "A", + }) + + q := backend.DataQuery{ + RefID: "A", + JSON: queryJSON, + } + + resp := ds.handleAsyncQuery(backend.PluginContext{}, q, queryModel{}) + require.Nil(t, resp.Error) + require.Len(t, resp.Frames, 1) + assert.Equal(t, "done", resp.Frames[0].Name) +} + +func TestHandleAsyncQuery_RoutesToStart_WhenNoQueryID(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + queryJSON, _ := json.Marshal(map[string]interface{}{ + "queryVersion": "2.1", + "refId": "A", + }) + + q := backend.DataQuery{ + RefID: "A", + JSON: queryJSON, + } + + resp := ds.handleAsyncQuery(backend.PluginContext{}, q, queryModel{}) + + // Should return a "started" status frame + require.Nil(t, resp.Error) + require.Len(t, resp.Frames, 1) + + custom, ok := resp.Frames[0].Meta.Custom.(asyncCustomMeta) + require.True(t, ok) + assert.Equal(t, asyncJobStatusStarted, custom.Status) + assert.NotEmpty(t, custom.QueryID) + assert.Equal(t, "A", resp.Frames[0].RefID) +} + +// --------------------------------------------------------------------------- +// callAsyncQueryCancel CallResource handler tests +// --------------------------------------------------------------------------- + +func TestCallAsyncQueryCancel_Success(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + ctx, cancel := context.WithCancel(context.Background()) + id := store.create(cancel) + + body, _ := json.Marshal(map[string]string{"queryId": id}) + req := &backend.CallResourceRequest{ + PluginContext: backend.PluginContext{}, + Path: "cancel", + Method: http.MethodPost, + Body: body, + } + + sender := &mockCallResourceResponseSender{} + err := ds.callAsyncQueryCancel(context.Background(), req, sender) + + require.NoError(t, err) + assert.Equal(t, http.StatusOK, sender.status) + assert.Contains(t, string(sender.body), "cancelled") + + // Context should be cancelled + select { + case <-ctx.Done(): + // expected + default: + t.Fatal("expected context to be cancelled after cancel call") + } + + // Job should no longer exist + _, found := store.get(id) + assert.False(t, found) +} + +func TestCallAsyncQueryCancel_NotFound(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + body, _ := json.Marshal(map[string]string{"queryId": "nonexistent"}) + req := &backend.CallResourceRequest{ + PluginContext: backend.PluginContext{}, + Path: "cancel", + Method: http.MethodPost, + Body: body, + } + + sender := &mockCallResourceResponseSender{} + err := ds.callAsyncQueryCancel(context.Background(), req, sender) + + require.NoError(t, err) + assert.Equal(t, http.StatusNotFound, sender.status) +} + +func TestCallAsyncQueryCancel_MissingQueryId(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + req := &backend.CallResourceRequest{ + PluginContext: backend.PluginContext{}, + Path: "cancel", + Method: http.MethodPost, + Body: []byte(`{}`), + } + + sender := &mockCallResourceResponseSender{} + err := ds.callAsyncQueryCancel(context.Background(), req, sender) + + require.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, sender.status) +} + +func TestCallAsyncQueryCancel_InvalidBody(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + req := &backend.CallResourceRequest{ + PluginContext: backend.PluginContext{}, + Path: "cancel", + Method: http.MethodPost, + Body: []byte(`not json`), + } + + sender := &mockCallResourceResponseSender{} + err := ds.callAsyncQueryCancel(context.Background(), req, sender) + + require.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, sender.status) +} + +// --------------------------------------------------------------------------- +// QueryData integration: async routing (all non-annotation queries are async) +// --------------------------------------------------------------------------- + +func TestQueryData_StandardQuery_RoutesToAsync(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + // A standard data query (no annotationType) should go through async path + queryJSON, _ := json.Marshal(map[string]interface{}{ + "queryVersion": "2.1", + "refId": "A", + "channelDataQueries": []interface{}{}, + }) + + req := &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{}, + Queries: []backend.DataQuery{ + { + RefID: "A", + JSON: queryJSON, + TimeRange: backend.TimeRange{ + From: time.Now().Add(-1 * time.Hour), + To: time.Now(), + }, + }, + }, + } + + resp, err := ds.QueryData(context.Background(), req) + require.NoError(t, err) + + result, ok := resp.Responses["A"] + require.True(t, ok) + require.Nil(t, result.Error) + require.Len(t, result.Frames, 1) + + // Verify the response is an async status frame with "started" + custom, ok := result.Frames[0].Meta.Custom.(asyncCustomMeta) + require.True(t, ok) + assert.Equal(t, asyncJobStatusStarted, custom.Status) + assert.NotEmpty(t, custom.QueryID) +} + +func TestQueryData_PollReturnsComplete(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + // Pre-create a completed job + _, cancel := context.WithCancel(context.Background()) + jobID := store.create(cancel) + resultFrame := data.NewFrame("my-result") + resultFrame.RefID = "A" + store.complete(jobID, []*data.Frame{resultFrame}) + + // Build a poll query with queryID set + queryJSON, _ := json.Marshal(map[string]interface{}{ + "queryVersion": "2.1", + "refId": "A", + "queryID": jobID, + "channelDataQueries": []interface{}{}, + }) + + req := &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{}, + Queries: []backend.DataQuery{ + { + RefID: "A", + JSON: queryJSON, + TimeRange: backend.TimeRange{ + From: time.Now().Add(-1 * time.Hour), + To: time.Now(), + }, + }, + }, + } + + resp, err := ds.QueryData(context.Background(), req) + require.NoError(t, err) + + result, ok := resp.Responses["A"] + require.True(t, ok) + require.Nil(t, result.Error) + require.Len(t, result.Frames, 1) + assert.Equal(t, "my-result", result.Frames[0].Name) +} + +func TestQueryData_AnnotationQuery_RunsSync(t *testing.T) { + store := newAsyncJobStore() + defer store.stop() + + ds := &SiftDatasource{asyncJobs: store} + + // An annotation query should NOT go through the async path + queryJSON, _ := json.Marshal(map[string]interface{}{ + "queryVersion": "2.1", + "refId": "A", + "annotationType": "annotationsQuery", + "channelDataQueries": []interface{}{}, + }) + + req := &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{}, + Queries: []backend.DataQuery{ + { + RefID: "A", + JSON: queryJSON, + TimeRange: backend.TimeRange{ + From: time.Now().Add(-1 * time.Hour), + To: time.Now(), + }, + }, + }, + } + + resp, err := ds.QueryData(context.Background(), req) + require.NoError(t, err) + + result, ok := resp.Responses["A"] + require.True(t, ok) + + // Annotation queries should not produce async-status frames + for _, frame := range result.Frames { + assert.NotEqual(t, "async-status", frame.Name, "annotation query should not produce async-status frames") + } +} diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index 6412ba2..3305c8f 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -112,6 +112,7 @@ func NewSiftDatasource(ctx context.Context, s backend.DataSourceInstanceSettings channelsIdSearchCache: channelIdsCache, channelsNameSearchCache: channelNameCache, channelsRegexSearchCache: channelRegexCache, + asyncJobs: newAsyncJobStore(), }, nil } @@ -129,13 +130,16 @@ type SiftDatasource struct { // channel caches use loader to avoid duplicate API calls at the same time channelsNameSearchCache *TypedCacheWithLoader[channelSearchKey, []Channel, string] channelsRegexSearchCache *TypedCacheWithLoader[channelSearchKey, []Channel, string] + asyncJobs *asyncJobStore } // Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance // created. As soon as datasource settings change detected by SDK old datasource instance will // be disposed and a new one will be created using NewSampleDatasource factory function. func (d *SiftDatasource) Dispose() { - // Clean up datasource instance resources. + if d.asyncJobs != nil { + d.asyncJobs.stop() + } } func (d *SiftDatasource) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { @@ -158,6 +162,9 @@ func (d *SiftDatasource) CallResource(ctx context.Context, req *backend.CallReso case "resolve-query-to-sift-metadata": return d.resolveQueryToSiftMetadata(ctx, req, sender) + case "cancel": + return d.callAsyncQueryCancel(ctx, req, sender) + default: return sender.Send(&backend.CallResourceResponse{ Status: http.StatusNotFound, @@ -186,7 +193,14 @@ func (d *SiftDatasource) QueryData(ctx context.Context, req *backend.QueryDataRe continue } - res := d.query(req.PluginContext, q, *fqm) + // Annotation queries run synchronously; all other data queries go through + // the async path so DatasourceWithAsyncBackend can poll for results. + var res backend.DataResponse + if fqm.AnnotationType != "" { + res = d.query(req.PluginContext, q, *fqm) + } else { + res = d.handleAsyncQuery(req.PluginContext, q, *fqm) + } // save the response in a hashmap // based on with RefID as identifier response.Responses[q.RefID] = res diff --git a/src/datasource.test.ts b/src/datasource.test.ts index 7fa590b..879113b 100644 --- a/src/datasource.test.ts +++ b/src/datasource.test.ts @@ -11,6 +11,15 @@ jest.mock('@grafana/runtime', () => ({ }, })); +jest.mock('@grafana/async-query-data', () => ({ + DatasourceWithAsyncBackend: class { + constructor() {} + postResource = jest.fn(); + getResource = jest.fn(); + getRef = jest.fn().mockReturnValue({ uid: 'test-uid', type: 'sift-datasource' }); + }, +})); + describe('SiftDataSource', () => { let datasource: SiftDataSource; let mockTemplateSrv: any; diff --git a/src/datasource.ts b/src/datasource.ts index 0b54b92..d5c9052 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -6,7 +6,7 @@ import { DataSourceInstanceSettings, ScopedVars, } from '@grafana/data'; -import { DataSourceWithBackend } from '@grafana/runtime'; +import { DatasourceWithAsyncBackend } from '@grafana/async-query-data'; import { Observable, from } from 'rxjs'; import { switchMap } from 'rxjs/operators'; import { SiftVariableSupport } from 'variables'; @@ -15,7 +15,7 @@ import { DEFAULT_QUERY, SiftDataSourceOptions, SiftQuery, QUERY_VERSION } from ' import { ensureQueryDefaults, filterQueryBeforeRequest, replaceTemplateVariablesInQuery } from './utils'; import { AnnotationQueryEditor } from './components/AnnotationQueryEditor'; -export class SiftDataSource extends DataSourceWithBackend { +export class SiftDataSource extends DatasourceWithAsyncBackend { annotations: AnnotationSupport = { QueryEditor: AnnotationQueryEditor, }; diff --git a/src/datasourceCache.ts b/src/datasourceCache.ts index 9e6d165..1c63569 100644 --- a/src/datasourceCache.ts +++ b/src/datasourceCache.ts @@ -10,7 +10,7 @@ import { Labels, FieldConfig, } from '@grafana/data'; -import { Observable, firstValueFrom } from 'rxjs'; +import { Observable, lastValueFrom } from 'rxjs'; import { SiftQuery } from './types'; import { replaceTemplateVariablesInQuery } from './utils'; @@ -83,7 +83,7 @@ export class SiftDataSourceCache { newIntervalMs !== cacheEntry.fetchedIntervalMs || // new resolution/sample frequency requested liveLookbackTime <= newFrom // all data is liveish ) { - const fullData = await firstValueFrom(fetchCallback(request)); + const fullData = await lastValueFrom(fetchCallback(request)); // Store in cache this.cache.set(panelId, { @@ -152,7 +152,7 @@ export class SiftDataSourceCache { }, }; - const subResp = await firstValueFrom(fetchCallback(subReq)); + const subResp = await lastValueFrom(fetchCallback(subReq)); if (!subResp.errors && subResp.data.length > 0) { newFrames.push(subResp.data); } else { @@ -214,7 +214,7 @@ export class SiftDataSourceCache { return result; } catch (e) { console.error(`Panel ${panelId} - Failed to handle cache`, e); - return await firstValueFrom(fetchCallback(request)); + return await lastValueFrom(fetchCallback(request)); } } }