Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a32263d
test(jepsen): add ZSet safety workload with model-based checker
bootjp Apr 19, 2026
2693a30
ci(jepsen): run ZSet safety workload in per-push and scheduled jobs
bootjp Apr 19, 2026
94be1bd
fix(jepsen-zset-safety): plug checker false positives + add unit tests
bootjp Apr 19, 2026
9bfcc13
fix(jepsen-zset-safety): no-op-ZREM-only member must not trigger :sco…
bootjp Apr 20, 2026
6d0b4c3
fix(jepsen-zset-safety): address CodeRabbit checker soundness issues
bootjp Apr 20, 2026
86d23dd
Merge branch 'main' into feat/jepsen-zset-safety
bootjp Apr 21, 2026
ecb3983
fix(jepsen): correct ZSet checker for infinity, stale reads, and :inf…
bootjp Apr 22, 2026
2a194a4
fix(jepsen): accept linearization of concurrent ops and uncertain mut…
bootjp Apr 22, 2026
49c5e0c
Merge branch 'main' into feat/jepsen-zset-safety
bootjp Apr 22, 2026
0c0efc4
fix(jepsen): keep strict score check when concurrent ZINCRBY score is…
bootjp Apr 22, 2026
e5dcc34
fix(jepsen): exclude :fail completions from concurrent mutation uncer…
bootjp Apr 22, 2026
02a8adf
fix(jepsen): restrict committed ZINCRBY candidates to linearization-c…
bootjp Apr 22, 2026
da85560
fix(jepsen): restrict unknown-score? to :info zincrby, not any concur…
bootjp Apr 23, 2026
502f64a
fix(jepsen): guard setup! and ZINCRBY response parsing against nil/mi…
bootjp Apr 23, 2026
33d59c5
fix(jepsen): restrict can-be-present? existence evidence to ZADD/ZINCRBY
bootjp Apr 23, 2026
9535ff3
fix(jepsen): correct clojure.tools.logging/warn call style in zset wo…
bootjp Apr 23, 2026
fde116c
fix(jepsen): decode Redis ZSET member bytes as UTF-8 explicitly
bootjp Apr 23, 2026
0c948a2
fix(jepsen-zset): hard-fail setup! when :conn-spec is missing
bootjp Apr 23, 2026
84989f1
fix(jepsen-zset): prepend test subcommand only when absent or an option
bootjp Apr 23, 2026
7a7a218
fix(jepsen-zset): document why :final-generator is overridden to nil
bootjp Apr 23, 2026
6219831
fix(jepsen-zset): guard ZREM nil reply to avoid NPE in invoke!
bootjp Apr 23, 2026
e67d29f
fix(jepsen-zset): return :valid? :unknown when no successful reads
bootjp Apr 23, 2026
623d5c2
fix(jepsen-zset): hard-fail setup! when cleanup DEL errors
bootjp Apr 23, 2026
22e41e1
fix(jepsen-zset): guard nil .getMessage on exception :error fields
bootjp Apr 23, 2026
9f5e958
docs(jepsen-zset): strip LLM reviewer artifact markers from comments
bootjp Apr 23, 2026
d0c8a03
fix(jepsen-zset): let inner workload's :final-generator pass through
bootjp Apr 23, 2026
559e83d
fix(jepsen-zset): reject odd-length WITHSCORES replies
bootjp Apr 23, 2026
1a9370f
fix(jepsen-zset): coerce ZREM count across Long / string / bytes
bootjp Apr 23, 2026
ad9079c
fix(jepsen-zset): include full :allowed set in missing-member-range
bootjp Apr 23, 2026
69db24e
fix(jepsen-zset): catch Throwable in invoke! so Errors don't crash wo…
bootjp Apr 23, 2026
d03672e
Merge branch 'main' into feat/jepsen-zset-safety
bootjp Apr 25, 2026
29e62ca
Merge branch 'main' into feat/jepsen-zset-safety
bootjp Apr 25, 2026
03ce992
jepsen: tighten zset safety checker
bootjp Jun 23, 2026
d31d8b9
jepsen: tighten zset safety checker
bootjp Jun 23, 2026
d094a85
jepsen: tighten zset safety linearization
bootjp Jun 23, 2026
c514791
jepsen: fix zset safety regressions
bootjp Jun 23, 2026
a2b57b8
jepsen: tighten zset prefix anchors
bootjp Jun 23, 2026
31f156c
Merge remote-tracking branch 'origin/main' into HEAD
bootjp Jun 25, 2026
d74cfed
adapter: fix zset range tie ordering
bootjp Jun 25, 2026
72e4c53
test: stabilize CI race checks
bootjp Jun 25, 2026
c3edbfb
adapter: fallback on capped zset range scans
bootjp Jun 25, 2026
a03c2cf
jepsen: fix bounded zset prefix checker
bootjp Jun 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/jepsen-test-scheduled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ jobs:
--max-txn-length ${{ inputs.max-txn-length || '4' }} \
--ports 63791,63792,63793 \
--host 127.0.0.1
- name: Run Redis ZSet safety Jepsen workload against elastickv
working-directory: jepsen
timeout-minutes: 10
run: |
timeout 480 ~/lein run -m elastickv.redis-zset-safety-workload \
--time-limit ${{ inputs.time-limit || '150' }} \
--rate ${{ inputs.rate || '10' }} \
--concurrency ${{ inputs.concurrency || '8' }} \
--ports 63791,63792,63793 \
--host 127.0.0.1
- name: Run DynamoDB Jepsen workload against elastickv
working-directory: jepsen
timeout-minutes: 10
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/jepsen-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ jobs:
timeout-minutes: 3
run: |
timeout 120 ~/lein run -m elastickv.redis-workload --time-limit 5 --rate 5 --concurrency 5 --ports 63791,63792,63793 --host 127.0.0.1
- name: Run Redis ZSet safety Jepsen workload against elastickv
working-directory: jepsen
timeout-minutes: 3
run: |
timeout 120 ~/lein run -m elastickv.redis-zset-safety-workload --time-limit 5 --rate 5 --concurrency 5 --ports 63791,63792,63793 --host 127.0.0.1
- name: Run DynamoDB Jepsen workload against elastickv
working-directory: jepsen
timeout-minutes: 3
Expand Down
9 changes: 5 additions & 4 deletions adapter/redis_compat_commands_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,20 +284,21 @@ func TestRedis_StreamXReadLatencyIsConstant(t *testing.T) {
total = 10_000
probes = 100
)
lastID := ""
for i := range total {
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
id, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "stream-lat",
ID: fmt.Sprintf("%d-0", 1_000_000+i),
ID: "*",
Values: []string{"i", fmt.Sprint(i)},
}).Result()
require.NoError(t, err)
lastID = id
}

afterID := fmt.Sprintf("%d-0", 1_000_000+total-1)
measure := func() time.Duration {
start := time.Now()
streams, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"stream-lat", afterID},
Streams: []string{"stream-lat", lastID},
Count: 10,
Block: 10 * time.Millisecond,
}).Result()
Expand Down
5 changes: 4 additions & 1 deletion adapter/redis_lua_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2623,7 +2623,10 @@ func (c *luaScriptContext) cmdZRangeByScoreSlow(key []byte, options luaZRangeByS
// the whole offset+limit budget on those filtered-out rows and miss
// the real matches at score > value.
//
// The score index is lex-sorted by (userKey, sortableScore, member).
// The score index is grouped by (userKey, sortableScore). Member bytes
// follow the score, but the MVCC timestamp suffix means equal-score
// member ordering is normalized by zsetRangeByScoreFast rather than
// trusted directly from physical scan order.
// Conventions:
//
// minBound = -Inf -> startKey = ZSetScoreScanPrefix(key)
Expand Down
44 changes: 44 additions & 0 deletions adapter/redis_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,50 @@ func TestRedisExecLuaCompatRetriesWriteConflict(t *testing.T) {
require.Equal(t, 1.0, zset.Entries[0].Score)
}

func TestRedisZRemWideColumnRemovesMemberAndScoreIndex(t *testing.T) {
t.Parallel()

ctx := context.Background()
st := store.NewMVCCStore()
coord := newRetryOnceCoordinator(st)

srv := &RedisServer{
store: st,
coordinator: coord,
scriptCache: map[string]string{},
}

key := []byte("retry:zrem-wide")
addConn := &recordingConn{}
srv.zadd(addConn, redcon.Command{Args: [][]byte{
[]byte(cmdZAdd), key,
[]byte("55"), []byte("m8"),
[]byte("-49"), []byte("m6"),
}})
require.Empty(t, addConn.err)
require.Equal(t, int64(2), addConn.int)

remConn := &recordingConn{}
srv.zrem(remConn, redcon.Command{Args: [][]byte{
[]byte(cmdZRem), key, []byte("m8"),
}})
require.Empty(t, remConn.err)
require.Equal(t, int64(1), remConn.int)

readTS := snapshotTS(coord.clock, st)
zset, exists, err := srv.loadZSetAt(ctx, key, readTS)
require.NoError(t, err)
require.True(t, exists)
require.Equal(t, []redisZSetEntry{{Member: "m6", Score: -49}}, zset.Entries)

memberExists, err := st.ExistsAt(ctx, store.ZSetMemberKey(key, []byte("m8")), readTS)
require.NoError(t, err)
require.False(t, memberExists)
scoreExists, err := st.ExistsAt(ctx, store.ZSetScoreKey(key, 55, []byte("m8")), readTS)
require.NoError(t, err)
require.False(t, scoreExists)
}

func TestRedisEvalRetriesWriteConflict(t *testing.T) {
t.Parallel()

Expand Down
150 changes: 102 additions & 48 deletions adapter/redis_zset_cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,12 @@ func (r *RedisServer) zsetRangeByScoreFast(
hit, reason, err := r.zsetRangeEmptyFastResult(ctx, key, readTS)
return nil, hit, reason, err
}
scanLimit = zsetFastScanLimitWithTieSentinel(scanLimit, limit)
kvs, err := r.zsetScoreScan(ctx, startKey, endKey, scanLimit, reverse, readTS)
if err != nil {
return nil, false, monitoring.LuaFastPathFallbackOther, err
}
return r.finalizeZSetFastRange(ctx, key, kvs, offset, limit, scanLimit, scoreFilter, readTS)
return r.finalizeZSetFastRange(ctx, key, kvs, reverse, offset, limit, scanLimit, scoreFilter, readTS)
}

// finalizeZSetFastRange runs the post-scan priority guard, decodes
Expand All @@ -165,12 +166,16 @@ func (r *RedisServer) zsetRangeByScoreFast(
//
// Takes scanLimit so we can detect a saturated scan: if the scanner
// returned exactly scanLimit rows AND the caller's request is not
// satisfied (unbounded limit, or collected fewer entries than limit),
// there MAY be more entries beyond the scan window. In that case we
// return hit=false so the slow path can produce the authoritative
// answer -- the fast path MUST NOT silently truncate.
// satisfied (unbounded limit, or fewer than offset+limit matching
// entries), there MAY be more entries beyond the scan window. In
// that case we return hit=false so the slow path can produce the
// authoritative answer -- the fast path MUST NOT silently truncate.
// Bounded ranges with duplicate scores also fall back: the MVCC
// timestamp suffix can disturb physical ordering among equal-score
// member keys, so the slow full-load path owns exact LIMIT semantics
// for those ties.
func (r *RedisServer) finalizeZSetFastRange(
ctx context.Context, key []byte, kvs []*store.KVPair,
ctx context.Context, key []byte, kvs []*store.KVPair, reverse bool,
offset, limit, scanLimit int, scoreFilter func(float64) bool, readTS uint64,
) ([]redisZSetEntry, bool, monitoring.LuaFastPathFallbackReason, error) {
// Priority guard runs after a candidate hit (mirrors post-PR #565
Expand All @@ -183,13 +188,21 @@ func (r *RedisServer) finalizeZSetFastRange(
return nil, false, monitoring.LuaFastPathFallbackWrongType, nil
}
}
entries := decodeZSetScoreRange(key, kvs, offset, limit, scoreFilter)
entries := decodeZSetScoreRange(key, kvs, scoreFilter)
// Truncation guard: the raw scanner hit its cap AND the caller did
// not get a satisfied result. Entries beyond the window may
// exist; defer to the slow path for correctness.
if zsetFastPathTruncated(len(kvs), scanLimit, len(entries), limit) {
if zsetFastPathTruncated(len(kvs), scanLimit, len(entries), offset, limit) {
return nil, false, monitoring.LuaFastPathFallbackTruncated, nil
}
if zsetFastPathNeedsTieFallback(entries, limit) {
return nil, false, monitoring.LuaFastPathFallbackTruncated, nil
Comment on lines +198 to +199

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid routing duplicate +inf ranges through the slow filter

When a bounded ZRANGEBYSCORE key +inf +inf LIMIT ... scan sees at least two +inf members, this new tie fallback sends the command to cmdZRangeByScoreSlow; that slow path uses scoreInRange, which does not enforce zBoundPosInf/zBoundNegInf bounds, so a zset that also has finite scores can return the lowest finite member instead of a +inf member. This regresses the wide-column fast path specifically for duplicate infinite scores; either keep this case on the fast path or make the slow score filter handle infinite bounds before falling back.

Useful? React with 👍 / 👎.

}
sortZSetEntries(entries)
if reverse {
reverseZSetEntries(entries)
}
entries = applyZRangeLimit(entries, offset, limit)
if len(entries) == 0 {
hit, reason, err := r.zsetRangeEmptyFastResult(ctx, key, readTS)
return nil, hit, reason, err
Expand All @@ -208,17 +221,21 @@ func (r *RedisServer) finalizeZSetFastRange(
// may have dropped entries that the caller's request would otherwise
// include. Returns true when the scanner returned the full quota
// (scannedRows == scanLimit) AND the caller's request is still
// unsatisfied (unbounded limit or collectedEntries < limit). In that
// case the caller must fall back to the slow full-load path to get
// the authoritative result.
func zsetFastPathTruncated(scannedRows, scanLimit, collectedEntries, limit int) bool {
// unsatisfied (unbounded limit or fewer than offset+limit matching
// entries). In that case the caller must fall back to the slow
// full-load path to get the authoritative result.
func zsetFastPathTruncated(scannedRows, scanLimit, matchingEntries, offset, limit int) bool {
if scannedRows < scanLimit {
return false
}
if limit < 0 {
return true
}
return collectedEntries < limit
needed := offset + limit
if needed < offset || needed > maxWideScanLimit {
return true
}
return matchingEntries < needed
}

// zsetFastPathEligible returns false (without error) when a legacy-
Expand Down Expand Up @@ -263,6 +280,13 @@ func zsetFastScanLimit(offset, limit int) int {
return offset + limit
}

func zsetFastScanLimitWithTieSentinel(scanLimit, limit int) int {
if limit <= 0 || scanLimit >= maxWideScanLimit {
return scanLimit
}
return scanLimit + 1
}

// zsetScoreScan picks Forward / Reverse ScanAt based on direction.
func (r *RedisServer) zsetScoreScan(
ctx context.Context, startKey, endKey []byte, scanLimit int, reverse bool, readTS uint64,
Expand All @@ -275,32 +299,14 @@ func (r *RedisServer) zsetScoreScan(
return kvs, cockerrors.WithStack(err)
}

// zsetDecodeAllocSize returns a tight upper bound on the collected
// entry count for decodeZSetScoreRange: (kvLen - offset) capped by
// limit, never negative. Avoiding a make([]...len(kvs)) saves up to
// maxWideScanLimit entries of wasted slice capacity when the caller
// asked for a small window at a large offset.
func zsetDecodeAllocSize(kvLen, offset, limit int) int {
allocSize := kvLen - offset
if allocSize < 0 {
return 0
}
if limit >= 0 && limit < allocSize {
return limit
}
return allocSize
}

// decodeZSetScoreRange decodes score-index scan results into
// redisZSetEntry, applying the post-scan score filter (exclusive
// bound edges) and the offset / limit pagination. Entries that fail
// to decode are silently dropped -- they can only appear under data
// corruption.
// redisZSetEntry, applying the post-scan score filter for exclusive
// bound edges. Entries that fail to decode are silently dropped --
// they can only appear under data corruption.
func decodeZSetScoreRange(
key []byte, kvs []*store.KVPair, offset, limit int, scoreFilter func(float64) bool,
key []byte, kvs []*store.KVPair, scoreFilter func(float64) bool,
) []redisZSetEntry {
entries := make([]redisZSetEntry, 0, zsetDecodeAllocSize(len(kvs), offset, limit))
skipped := 0
entries := make([]redisZSetEntry, 0, len(kvs))
for _, kv := range kvs {
score, member, ok := store.ExtractZSetScoreAndMember(kv.Key, key)
if !ok {
Expand All @@ -309,23 +315,29 @@ func decodeZSetScoreRange(
if scoreFilter != nil && !scoreFilter(score) {
continue
}
// Check limit saturation BEFORE the offset skip so a small
// limit with a large offset exits immediately instead of
// burning offset iterations on the skip branch. Correct for
// any (offset, limit): once len(entries) >= limit we are done
// regardless of remaining skip budget.
if limit >= 0 && len(entries) >= limit {
break
}
if skipped < offset {
skipped++
continue
}
entries = append(entries, redisZSetEntry{Member: string(member), Score: score})
}
return entries
}

func zsetFastPathNeedsTieFallback(entries []redisZSetEntry, limit int) bool {
if limit < 0 {
return false
}
seen := make(map[uint64]struct{}, len(entries))
for _, entry := range entries {
bits := math.Float64bits(entry.Score)
if entry.Score == 0 {
bits = math.Float64bits(0)
}
if _, ok := seen[bits]; ok {
return true
}
seen[bits] = struct{}{}
}
return false
}

// zsetRangeEmptyFastResult is the empty-result tail: either the
// score range is genuinely empty on a live zset (return empty +
// hit=true) or the zset does not exist in wide-column form (return
Expand Down Expand Up @@ -800,6 +812,48 @@ func (r *RedisServer) persistZSetEntriesTxn(ctx context.Context, key []byte, rea
}
return r.dispatchElems(ctx, true, readTS, elems)
}

memberPrefix := store.ZSetMemberScanPrefix(key)
memberEnd := store.PrefixScanEnd(memberPrefix)
probeKVs, probeErr := r.store.ScanAt(ctx, memberPrefix, memberEnd, 1, readTS)
if probeErr != nil {
return cockerrors.WithStack(probeErr)
}
if len(probeKVs) > 0 {
current, _, err := r.loadZSetAt(ctx, key, readTS)
if err != nil {
return err
}
st := &zsetTxnState{
members: zsetEntriesToMap(entries),
origMembers: zsetEntriesToMap(current.Entries),
isWide: true,
exists: true,
dirty: true,
}
elems, lenDelta := buildZSetWideElems(key, st)
if lenDelta != 0 {
commitTS, err := r.coordinator.Clock().NextFenced()
if err != nil {
return cockerrors.Wrap(err, "persistZSetEntriesTxn: allocate commitTS")
}
deltaVal := store.MarshalZSetMetaDelta(store.ZSetMetaDelta{LenDelta: lenDelta})
elems = append(elems, &kv.Elem[kv.OP]{
Op: kv.Put,
Key: store.ZSetMetaDeltaKey(key, commitTS, 0),
Value: deltaVal,
})
_, dispatchErr := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: normalizeStartTS(readTS),
CommitTS: commitTS,
Elems: elems,
})
return cockerrors.WithStack(dispatchErr)
}
return r.dispatchElems(ctx, true, readTS, elems)
}

payload, err := marshalZSetValue(redisZSetValue{Entries: entries})
if err != nil {
return err
Expand Down
Loading
Loading