Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions dune/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,21 @@ func (e *execution) GetResultsCSV() (io.Reader, error) {
}

func (e *execution) WaitGetResults(pollInterval time.Duration, maxRetries int) (*models.ResultsResponse, error) {
errCount := 0
errAttempts := 0
for {
resultsResp, err := e.client.QueryResultsV2(e.ID, models.ResultOptions{})
if err != nil {
if maxRetries != 0 && errCount > maxRetries {
errAttempts++
if maxRetries != 0 && errAttempts >= maxRetries {
return nil, fmt.Errorf("%w. %s", ErrorRetriesExhausted, err.Error())
}
fmt.Fprintln(os.Stderr, "failed to retrieve results. Retrying...\n", err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the print here?

errCount += 1
} else if resultsResp.IsExecutionFinished {
sleep := defaultRetryPolicy.NextBackoff(errAttempts)
time.Sleep(sleep)
continue
}
errAttempts = 0
if resultsResp.IsExecutionFinished {
return resultsResp, nil
}
time.Sleep(pollInterval)
Expand Down
163 changes: 151 additions & 12 deletions dune/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"time"
)

var ErrorReqUnsuccessful = errors.New("request was not successful")
Expand All @@ -13,6 +16,59 @@ type ErrorResponse struct {
Error string `json:"error"`
}

type RateLimit struct {
Limit int
Remaining int
Reset int64
}

type APIError struct {
StatusCode int
StatusText string
BodySnippet string
RateLimit *RateLimit
RetryAfter time.Duration
}

func (e *APIError) Error() string {
if e.BodySnippet != "" {
return fmt.Sprintf("http %d %s: %s", e.StatusCode, e.StatusText, e.BodySnippet)
}
return fmt.Sprintf("http %d %s", e.StatusCode, e.StatusText)
}


func parseRateLimitHeaders(h http.Header) *RateLimit {
limStr := h.Get("X-RateLimit-Limit")
remStr := h.Get("X-RateLimit-Remaining")
resetStr := h.Get("X-RateLimit-Reset")

var limit, remaining int
var reset int64

if limStr != "" {
if v, err := strconv.Atoi(limStr); err == nil {
limit = v
}
}
if remStr != "" {
if v, err := strconv.Atoi(remStr); err == nil {
remaining = v
}
}
if resetStr != "" {
if v, err := strconv.ParseInt(resetStr, 10, 64); err == nil {
reset = v
}
}

if limit == 0 && remaining == 0 && reset == 0 {
return nil
}
return &RateLimit{Limit: limit, Remaining: remaining, Reset: reset}
}


func decodeBody(resp *http.Response, dest interface{}) error {
defer resp.Body.Close()
err := json.NewDecoder(resp.Body).Decode(dest)
Expand All @@ -24,20 +80,103 @@ func decodeBody(resp *http.Response, dest interface{}) error {

func httpRequest(apiKey string, req *http.Request) (*http.Response, error) {
req.Header.Add("X-DUNE-API-KEY", apiKey)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send request: %w", err)
}
p := defaultRetryPolicy
attempt := 1
for {
resp, err := http.DefaultClient.Do(req)
if err != nil {
if attempt >= p.MaxAttempts {
return nil, fmt.Errorf("failed to send request: %w", err)
}
time.Sleep(p.NextBackoff(attempt))
attempt++
continue
}

if resp.StatusCode == 200 {
return resp, nil
}

if resp.StatusCode != 200 {
defer resp.Body.Close()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Defer in loop accumulates unclosed response bodies

The defer resp.Body.Close() statement is inside the retry loop. Each time the loop iterates with a non-200 status code, a new defer is added without the previous response body being closed immediately. This keeps multiple response bodies and their associated resources (TCP connections, file descriptors) open until the function returns, which could exhaust connection pool slots or cause "too many open files" errors during extended retry sequences.

Fix in Cursor Fix in Web

var errorResponse ErrorResponse
err := json.NewDecoder(resp.Body).Decode(&errorResponse)
if err != nil {
return nil, fmt.Errorf("failed to read error response body: %w", err)
snippetBytes, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
var errorResp ErrorResponse
if err := json.Unmarshal(snippetBytes, &errorResp); err == nil && errorResp.Error != "" {
msg := errorResp.Error
rl := parseRateLimitHeaders(resp.Header)
retryAfter := time.Duration(0)
if ra := resp.Header.Get("Retry-After"); ra != "" {
if secs, err := strconv.Atoi(ra); err == nil {
retryAfter = time.Duration(secs) * time.Second
}
}
apiErr := &APIError{StatusCode: resp.StatusCode, StatusText: resp.Status, BodySnippet: msg, RateLimit: rl, RetryAfter: retryAfter}
retryable := false
for _, code := range p.RetryableStatusCodes {
if resp.StatusCode == code {
retryable = true
break
}
}
if retryable && attempt < p.MaxAttempts {
sleep := p.NextBackoff(attempt)
if apiErr.RetryAfter > 0 && apiErr.RetryAfter > sleep {
sleep = apiErr.RetryAfter
}
time.Sleep(sleep)
attempt++
continue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: POST request body empty on retry

When retrying a request after receiving a retryable status code (429, 5xx), the code reuses the same http.Request object without resetting its body. The request body (req.Body) is consumed by http.DefaultClient.Do() on the first attempt. On subsequent retry attempts, the body is empty because it has already been read. POST requests with JSON payloads (like QueryExecute, SQLExecute, QueryPipelineExecute) will send empty bodies on retry, causing silent failures. The fix would require calling req.GetBody() to regenerate the body before each retry.

Additional Locations (1)

Fix in Cursor Fix in Web

}
return nil, fmt.Errorf("%w: %v", ErrorReqUnsuccessful, apiErr)
} else {
msg := string(snippetBytes)
rl := parseRateLimitHeaders(resp.Header)
retryAfter := time.Duration(0)
if ra := resp.Header.Get("Retry-After"); ra != "" {
if secs, err := strconv.Atoi(ra); err == nil {
retryAfter = time.Duration(secs) * time.Second
}
}
apiErr := &APIError{StatusCode: resp.StatusCode, StatusText: resp.Status, BodySnippet: msg, RateLimit: rl, RetryAfter: retryAfter}
retryable := false
for _, code := range p.RetryableStatusCodes {
if resp.StatusCode == code {
retryable = true
break
}
}
if retryable && attempt < p.MaxAttempts {
sleep := p.NextBackoff(attempt)
if apiErr.RetryAfter > 0 && apiErr.RetryAfter > sleep {
sleep = apiErr.RetryAfter
}
time.Sleep(sleep)
attempt++
continue
}
return nil, fmt.Errorf("%w: %v", ErrorReqUnsuccessful, apiErr)
}
return resp, fmt.Errorf("%w [%d]: %s", ErrorReqUnsuccessful, resp.StatusCode, errorResponse.Error)
rl := parseRateLimitHeaders(resp.Header)
retryAfter := time.Duration(0)
if ra := resp.Header.Get("Retry-After"); ra != "" {
if secs, err := strconv.Atoi(ra); err == nil {
retryAfter = time.Duration(secs) * time.Second
}
}
apiErr := &APIError{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
BodySnippet: msg,
RateLimit: rl,
RetryAfter: retryAfter,
}
retryable := false
for _, code := range p.RetryableStatusCodes {
if resp.StatusCode == code {
retryable = true
break
}
}
// unreachable due to early returns above; kept for clarity
return nil, fmt.Errorf("%w: unexpected error state", ErrorReqUnsuccessful)
}

return resp, nil
}
34 changes: 34 additions & 0 deletions dune/retries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package dune

import "time"

type RetryPolicy struct {
MaxAttempts int
InitialBackoff time.Duration
MaxBackoff time.Duration
Jitter time.Duration
RetryableStatusCodes []int
}

var defaultRetryPolicy = RetryPolicy{
MaxAttempts: 5,
InitialBackoff: 2 * time.Second,
MaxBackoff: 60 * time.Second,
Jitter: 250 * time.Millisecond,
RetryableStatusCodes: []int{429, 500, 502, 503, 504},
}

func (p RetryPolicy) NextBackoff(attempt int) time.Duration {
b := p.InitialBackoff
for i := 1; i < attempt; i++ {
b *= 2
if b > p.MaxBackoff {
b = p.MaxBackoff
break
}
}
if p.Jitter > 0 {
b += p.Jitter
}
return b
}