Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
f5afbf6
feat(ilp): QWiP store-and-forward client buffer
bluestreak01 Apr 26, 2026
efde7bc
fix(ilp): harden SF disk-full retry, reconnect, and CRC paths
bluestreak01 Apr 26, 2026
efbd8e1
Rebuild CXX libraries
Apr 26, 2026
cf3152d
fix(ilp): fail fast on fatal SF errors instead of looping
bluestreak01 Apr 26, 2026
86b6e6f
perf(ilp): replace O(N²) SF segment sort at open
bluestreak01 Apr 26, 2026
acb32b9
fix(ilp): plug SF rotate-OOM double-free and on-disk leak
bluestreak01 Apr 26, 2026
519f5e4
fix(ilp): trim acked frames from active SF segment
bluestreak01 Apr 26, 2026
91669d2
fix(ilp): pre-compute CRC32C table to remove lazy-init memory race
bluestreak01 Apr 26, 2026
e38b4d5
test(ilp): regression guards for SF disk-cap recovery
bluestreak01 Apr 26, 2026
03afa61
feat(ilp): opt-in sf_fsync_on_flush, fix sf_fsync default doc lie
bluestreak01 Apr 26, 2026
0304df8
fix(ilp): close schema-reset race window with connection-generation tag
bluestreak01 Apr 26, 2026
0ad83b9
Rebuild CXX libraries
Apr 26, 2026
07c20e0
fix(ilp): five SF correctness fixes from PR-17 review
bluestreak01 Apr 26, 2026
af58d7d
test(ilp): SegmentLog.append latency benchmark
bluestreak01 Apr 26, 2026
83bb368
perf(ilp): slice-by-8 CRC32C cuts SF append p50 ~4x
bluestreak01 Apr 26, 2026
87320e5
fix(ilp): three SF recovery correctness fixes from PR-17 review
bluestreak01 Apr 26, 2026
f58f766
test(ilp): JMH ingress latency benchmark for QWP Sender
bluestreak01 Apr 26, 2026
49f1683
feat(ilp): cursor SF engine primitives (mmap segments, ring, manager)
bluestreak01 Apr 26, 2026
cc4a68f
feat(ilp): cursor SF -- on-disk recovery + maxTotalBytes cap
bluestreak01 Apr 26, 2026
889c46c
feat(ilp): cursor SF -- happy-path WebSocket send loop
bluestreak01 Apr 26, 2026
e17c12d
feat(ilp): wire cursor SF as the only async path; refactor connect st…
bluestreak01 Apr 26, 2026
9781771
refactor(ilp): strip QwpWebSocketSender of legacy SF / sync paths
bluestreak01 Apr 26, 2026
36263c4
fix(ilp): cursor recovery — derive next FSN from on-disk segments
bluestreak01 Apr 26, 2026
4f4e1e5
docs: cursor SF — durability & reconnect spec
bluestreak01 Apr 27, 2026
3caa2d3
feat(ilp): close() drain — bounded ACK wait via close_flush_timeout_m…
bluestreak01 Apr 27, 2026
71afa21
feat(ilp): connectionGeneration foundation + encode-mid-reconnect retry
bluestreak01 Apr 27, 2026
0ec66f3
feat(ilp): cursor I/O loop reconnect + replay
bluestreak01 Apr 27, 2026
8828038
feat(ilp): cursor reconnect policy — backoff cap + auth-terminal
bluestreak01 Apr 27, 2026
f152583
feat(ilp): slot directory model — sender_id + advisory exclusive .lock
bluestreak01 Apr 27, 2026
40f9742
feat(ilp): initial-connect retry opt-in + replay/attempt counters
bluestreak01 Apr 27, 2026
b9b6e2f
feat(ilp): orphan-slot scanner + .failed sentinel + drain_orphans knob
bluestreak01 Apr 27, 2026
520231c
feat(ilp): cursor frames are self-sufficient — full schemas, full dict
bluestreak01 Apr 27, 2026
fa5c838
fix(ilp): recovery replays sealed segments from baseSeq, not active
bluestreak01 Apr 27, 2026
c25773f
feat(ilp): background drainer pool — adopt orphan slots and replay them
bluestreak01 Apr 27, 2026
267b380
docs(ilp): TODO for cursor SF — multi-host failover, deferred items
bluestreak01 Apr 27, 2026
923dcb4
fix(ilp): cursor SF review fixes — perf, cap, torn-tail, races
bluestreak01 Apr 27, 2026
07b930a
fix(ilp): cursor SF correctness — concurrency, lifecycle, findFirst
bluestreak01 Apr 28, 2026
05c3829
test(ilp): wrap SF cursor tests in assertMemoryLeak; PR-17 regression…
bluestreak01 Apr 28, 2026
36b0839
error handling
bluestreak01 Apr 28, 2026
41ae975
fix(ilp): cursor SF — apply PR-17 review critical and moderate findings
bluestreak01 Apr 28, 2026
052f6ee
Make close() rethrow latched terminal errors
bluestreak01 Apr 29, 2026
fc8d8b3
Rebuild CXX libraries
Apr 29, 2026
12049d8
Add async initial connect for cursor SF sender
bluestreak01 Apr 29, 2026
41b9ec0
ci: auto-detect matching questdb branch
bluestreak01 Apr 29, 2026
ce92148
test(ilp): align cursor SF tests with close() rethrow + drainer changes
bluestreak01 Apr 29, 2026
13ea8a2
Harden close() error preservation and SegmentManager lifecycle
bluestreak01 Apr 29, 2026
16a3eb6
Merge branch 'vi_sf' of https://github.com/questdb/java-questdb-clien…
bluestreak01 Apr 29, 2026
9e298e7
test fix
bluestreak01 Apr 29, 2026
9be35cb
bugfix
bluestreak01 Apr 30, 2026
a6b45c3
fix(ilp): cross-platform drainer/slot-lock and quieter PARSE_ERROR logs
bluestreak01 Apr 30, 2026
21d885b
Defer cursor SF trim to durable-ack when opted in
bluestreak01 May 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 112 additions & 10 deletions .claude/skills/review-pr/SKILL.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ core/questdb/client/bin-local
core/cmake-build-debug
core/cmake-build-debug-coverage
core/cmake-build-release
core/build_native
core/CMakeCache.txt
**/.project
**/.settings
Expand Down
11 changes: 10 additions & 1 deletion ci/run_tests_pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,16 @@ stages:
maven | "$(Agent.OS)"
path: $(HOME)/.m2/repository
displayName: "Cache Maven repository"
- script: git clone --depth 1 https://github.com/questdb/questdb.git ./questdb
- bash: |
BRANCH="${SYSTEM_PULLREQUEST_SOURCEBRANCH:-$BUILD_SOURCEBRANCHNAME}"
BRANCH="${BRANCH#refs/heads/}"
if git ls-remote --exit-code --heads https://github.com/questdb/questdb.git "$BRANCH" >/dev/null 2>&1; then
echo "Cloning matching questdb branch: $BRANCH"
git clone --depth 1 --branch "$BRANCH" https://github.com/questdb/questdb.git ./questdb
else
echo "No matching questdb branch '$BRANCH', falling back to master"
git clone --depth 1 https://github.com/questdb/questdb.git ./questdb
fi
displayName: git clone questdb
- task: Maven@3
displayName: "Update client version"
Expand Down
1 change: 1 addition & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ set(
src/main/c/share/cpprt_overrides.cpp
src/main/c/share/byte_sink.cpp
src/main/c/share/byte_sink.h
src/main/c/share/crc32c.c
)

# libzstd is included via a git submodule at src/main/c/share/zstd (pinned to
Expand Down
20 changes: 20 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<argLine>-ea -Dfile.encoding=UTF-8 -XX:+UseParallelGC -Dslf4j.provider=ch.qos.logback.classic.spi.LogbackServiceProvider</argLine>
<test.exclude>None</test.exclude>
<test.include>%regex[.*[^o].class]</test.include><!-- exclude module-info.class-->
<jmh.version>1.37</jmh.version>
</properties>

<version>1.2.1-SNAPSHOT</version>
Expand Down Expand Up @@ -88,6 +89,13 @@
<excludes>
<exclude>${excludeTestPattern1}</exclude>
</excludes>
<annotationProcessorPaths>
<path>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
Expand Down Expand Up @@ -434,5 +442,17 @@
<version>1.5.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
404 changes: 404 additions & 0 deletions core/src/main/c/share/crc32c.c

Large diffs are not rendered by default.

286 changes: 286 additions & 0 deletions core/src/main/c/share/files.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,296 @@
*
******************************************************************************/

#define _GNU_SOURCE

#include <unistd.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <dirent.h>
#include <stdint.h>

#include "files.h"

/* Mirror of io.questdb.client.std.Files.MAP_RO / MAP_RW. Hard-coded rather
* than #include'd from a javah-generated header because this file does not
* pull in any generated symbols (the rest of the file works the same way). */
#define QDB_MAP_RO 1
#define QDB_MAP_RW 2

#define RESTARTABLE(_expr_, _rc_) \
do { _rc_ = (_expr_); } while ((_rc_) == -1 && errno == EINTR)

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_close0
(JNIEnv *e, jclass cl, jint fd) {
return close((int) fd);
}

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_openRO0
(JNIEnv *e, jclass cl, jlong lpszName) {
int fd;
RESTARTABLE(open((const char *) (uintptr_t) lpszName, O_RDONLY), fd);
return (jint) fd;
}

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_openRW0
(JNIEnv *e, jclass cl, jlong lpszName) {
int fd;
RESTARTABLE(open((const char *) (uintptr_t) lpszName, O_CREAT | O_RDWR, 0644), fd);
return (jint) fd;
}

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_openAppend0
(JNIEnv *e, jclass cl, jlong lpszName) {
int fd;
RESTARTABLE(open((const char *) (uintptr_t) lpszName, O_CREAT | O_WRONLY | O_APPEND, 0644), fd);
return (jint) fd;
}

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_openCleanRW0
(JNIEnv *e, jclass cl, jlong lpszName, jlong size) {
int fd;
RESTARTABLE(open((const char *) (uintptr_t) lpszName, O_CREAT | O_TRUNC | O_RDWR, 0644), fd);
if (fd < 0) {
return -1;
}
if (size > 0) {
int rc;
RESTARTABLE(ftruncate(fd, (off_t) size), rc);
if (rc != 0) {
int saved = errno;
close(fd);
errno = saved;
return -1;
}
}
return (jint) fd;
}

JNIEXPORT jlong JNICALL Java_io_questdb_client_std_Files_read
(JNIEnv *e, jclass cl, jint fd, jlong addr, jlong len, jlong offset) {
// Reject negative len explicitly: jlong is signed but pread takes a
// size_t. Without this guard the cast wraps a small negative value
// into an enormous unsigned read length and the kernel may either
// SEGV on the address space or scribble far past the caller's buffer.
// The Win32 path already does this; matching here.
if (len < 0) {
errno = EINVAL;
return -1;
}
ssize_t res;
RESTARTABLE(pread((int) fd, (void *) (uintptr_t) addr, (size_t) len, (off_t) offset), res);
return (jlong) res;
}

JNIEXPORT jlong JNICALL Java_io_questdb_client_std_Files_write
(JNIEnv *e, jclass cl, jint fd, jlong addr, jlong len, jlong offset) {
if (len < 0) {
errno = EINVAL;
return -1;
}
ssize_t res;
RESTARTABLE(pwrite((int) fd, (const void *) (uintptr_t) addr, (size_t) len, (off_t) offset), res);
return (jlong) res;
}

JNIEXPORT jlong JNICALL Java_io_questdb_client_std_Files_append
(JNIEnv *e, jclass cl, jint fd, jlong addr, jlong len) {
if (len < 0) {
errno = EINVAL;
return -1;
}
ssize_t res;
RESTARTABLE(write((int) fd, (const void *) (uintptr_t) addr, (size_t) len), res);
return (jlong) res;
}

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_fsync
(JNIEnv *e, jclass cl, jint fd) {
int res;
RESTARTABLE(fsync((int) fd), res);
return res;
}

JNIEXPORT jboolean JNICALL Java_io_questdb_client_std_Files_truncate
(JNIEnv *e, jclass cl, jint fd, jlong size) {
int res;
RESTARTABLE(ftruncate((int) fd, (off_t) size), res);
return res == 0 ? JNI_TRUE : JNI_FALSE;
}

JNIEXPORT jboolean JNICALL Java_io_questdb_client_std_Files_allocate
(JNIEnv *e, jclass cl, jint fd, jlong size) {
#if defined(__linux__)
int res = posix_fallocate((int) fd, 0, (off_t) size);
if (res == 0) {
return JNI_TRUE;
}
if (res != EINVAL && res != EOPNOTSUPP) {
errno = res;
return JNI_FALSE;
}
/* fall through to ftruncate */
#elif defined(__APPLE__)
fstore_t fst;
fst.fst_flags = F_ALLOCATECONTIG | F_ALLOCATEALL;
fst.fst_posmode = F_PEOFPOSMODE;
fst.fst_offset = 0;
fst.fst_length = (off_t) size;
fst.fst_bytesalloc = 0;
if (fcntl((int) fd, F_PREALLOCATE, &fst) == -1) {
fst.fst_flags = F_ALLOCATEALL;
(void) fcntl((int) fd, F_PREALLOCATE, &fst);
/* if F_PREALLOCATE fails we still try ftruncate to set logical size */
}
#endif
int res2;
RESTARTABLE(ftruncate((int) fd, (off_t) size), res2);
return res2 == 0 ? JNI_TRUE : JNI_FALSE;
}

JNIEXPORT jlong JNICALL Java_io_questdb_client_std_Files_length
(JNIEnv *e, jclass cl, jint fd) {
struct stat st;
if (fstat((int) fd, &st) != 0) {
return -1;
}
return (jlong) st.st_size;
}

JNIEXPORT jlong JNICALL Java_io_questdb_client_std_Files_length0
(JNIEnv *e, jclass cl, jlong lpszName) {
struct stat st;
if (stat((const char *) (uintptr_t) lpszName, &st) != 0) {
return -1;
}
return (jlong) st.st_size;
}

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_lock
(JNIEnv *e, jclass cl, jint fd) {
return flock((int) fd, LOCK_EX | LOCK_NB);
}

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_mkdir0
(JNIEnv *e, jclass cl, jlong lpszPath, jint mode) {
return mkdir((const char *) (uintptr_t) lpszPath, (mode_t) mode);
}

JNIEXPORT jboolean JNICALL Java_io_questdb_client_std_Files_exists0
(JNIEnv *e, jclass cl, jlong lpszPath) {
return access((const char *) (uintptr_t) lpszPath, F_OK) == 0 ? JNI_TRUE : JNI_FALSE;
}

JNIEXPORT jboolean JNICALL Java_io_questdb_client_std_Files_remove0
(JNIEnv *e, jclass cl, jlong lpszPath) {
return remove((const char *) (uintptr_t) lpszPath) == 0 ? JNI_TRUE : JNI_FALSE;
}

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_rename0
(JNIEnv *e, jclass cl, jlong lpszOld, jlong lpszNew) {
return rename((const char *) (uintptr_t) lpszOld, (const char *) (uintptr_t) lpszNew);
}

typedef struct {
DIR *dir;
struct dirent *entry;
} qdb_find_t;

JNIEXPORT jlong JNICALL Java_io_questdb_client_std_Files_findFirst0
(JNIEnv *e, jclass cl, jlong lpszName) {
DIR *dir = opendir((const char *) (uintptr_t) lpszName);
if (!dir) {
return 0;
}
qdb_find_t *find = (qdb_find_t *) malloc(sizeof(qdb_find_t));
if (!find) {
closedir(dir);
return 0;
}
find->dir = dir;
errno = 0;
find->entry = readdir(dir);
if (!find->entry) {
int saved = errno;
closedir(dir);
free(find);
errno = saved;
return 0;
}
return (jlong) (uintptr_t) find;
}

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_findNext
(JNIEnv *e, jclass cl, jlong findPtr) {
qdb_find_t *find = (qdb_find_t *) (uintptr_t) findPtr;
if (!find) {
return -1;
}
errno = 0;
find->entry = readdir(find->dir);
if (find->entry) {
return 1;
}
return errno == 0 ? 0 : -1;
}

JNIEXPORT jlong JNICALL Java_io_questdb_client_std_Files_findName
(JNIEnv *e, jclass cl, jlong findPtr) {
qdb_find_t *find = (qdb_find_t *) (uintptr_t) findPtr;
if (!find || !find->entry) {
return 0;
}
return (jlong) (uintptr_t) find->entry->d_name;
}

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_findType
(JNIEnv *e, jclass cl, jlong findPtr) {
qdb_find_t *find = (qdb_find_t *) (uintptr_t) findPtr;
if (!find || !find->entry) {
return 0;
}
return (jint) find->entry->d_type;
}

JNIEXPORT void JNICALL Java_io_questdb_client_std_Files_findClose
(JNIEnv *e, jclass cl, jlong findPtr) {
qdb_find_t *find = (qdb_find_t *) (uintptr_t) findPtr;
if (find) {
closedir(find->dir);
free(find);
}
}

JNIEXPORT jlong JNICALL Java_io_questdb_client_std_Files_getPageSize0
(JNIEnv *e, jclass cl) {
long sz = sysconf(_SC_PAGESIZE);
return (jlong) (sz > 0 ? sz : 4096);
}

JNIEXPORT jlong JNICALL Java_io_questdb_client_std_Files_mmap0
(JNIEnv *e, jclass cl, jint fd, jlong len, jlong offset, jint flags, jlong baseAddress) {
int prot = 0;
if (flags == QDB_MAP_RO) {
prot = PROT_READ;
} else if (flags == QDB_MAP_RW) {
prot = PROT_READ | PROT_WRITE;
}
void *addr = mmap((void *) (uintptr_t) baseAddress, (size_t) len, prot, MAP_SHARED, (int) fd, (off_t) offset);
/* MAP_FAILED is (void *) -1; cast to jlong gives -1 sentinel matching FAILED_MMAP_ADDRESS. */
return (jlong) (intptr_t) addr;
}

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_munmap0
(JNIEnv *e, jclass cl, jlong address, jlong len) {
return munmap((void *) (uintptr_t) address, (size_t) len);
}

JNIEXPORT jint JNICALL Java_io_questdb_client_std_Files_msync
(JNIEnv *e, jclass cl, jlong addr, jlong len, jboolean async) {
return msync((void *) (uintptr_t) addr, (size_t) len, async ? MS_ASYNC : MS_SYNC);
}
Loading
Loading