diff --git a/.github/workflows/jepsen-test-scheduled.yml b/.github/workflows/jepsen-test-scheduled.yml index 9bc2869c..8bedcb0c 100644 --- a/.github/workflows/jepsen-test-scheduled.yml +++ b/.github/workflows/jepsen-test-scheduled.yml @@ -147,6 +147,16 @@ jobs: --max-txn-length ${{ inputs.max-txn-length || '4' }} \ --ports 63791,63792,63793 \ --host 127.0.0.1 + - name: Run Redis ZSet safety Jepsen workload against elastickv + working-directory: jepsen + timeout-minutes: 10 + run: | + timeout 480 ~/lein run -m elastickv.redis-zset-safety-workload \ + --time-limit ${{ inputs.time-limit || '150' }} \ + --rate ${{ inputs.rate || '10' }} \ + --concurrency ${{ inputs.concurrency || '8' }} \ + --ports 63791,63792,63793 \ + --host 127.0.0.1 - name: Run DynamoDB Jepsen workload against elastickv working-directory: jepsen timeout-minutes: 10 diff --git a/.github/workflows/jepsen-test.yml b/.github/workflows/jepsen-test.yml index 8f95db7f..38b219fa 100644 --- a/.github/workflows/jepsen-test.yml +++ b/.github/workflows/jepsen-test.yml @@ -122,6 +122,11 @@ jobs: timeout-minutes: 3 run: | timeout 120 ~/lein run -m elastickv.redis-workload --time-limit 5 --rate 5 --concurrency 5 --ports 63791,63792,63793 --host 127.0.0.1 + - name: Run Redis ZSet safety Jepsen workload against elastickv + working-directory: jepsen + timeout-minutes: 3 + run: | + timeout 120 ~/lein run -m elastickv.redis-zset-safety-workload --time-limit 5 --rate 5 --concurrency 5 --ports 63791,63792,63793 --host 127.0.0.1 - name: Run DynamoDB Jepsen workload against elastickv working-directory: jepsen timeout-minutes: 3 diff --git a/adapter/redis_compat_commands_stream_test.go b/adapter/redis_compat_commands_stream_test.go index 28fb9825..bc78c846 100644 --- a/adapter/redis_compat_commands_stream_test.go +++ b/adapter/redis_compat_commands_stream_test.go @@ -284,20 +284,21 @@ func TestRedis_StreamXReadLatencyIsConstant(t *testing.T) { total = 10_000 probes = 100 ) + lastID := "" for i := range total { - _, err := rdb.XAdd(ctx, &redis.XAddArgs{ + id, err := rdb.XAdd(ctx, &redis.XAddArgs{ Stream: "stream-lat", - ID: fmt.Sprintf("%d-0", 1_000_000+i), + ID: "*", Values: []string{"i", fmt.Sprint(i)}, }).Result() require.NoError(t, err) + lastID = id } - afterID := fmt.Sprintf("%d-0", 1_000_000+total-1) measure := func() time.Duration { start := time.Now() streams, err := rdb.XRead(ctx, &redis.XReadArgs{ - Streams: []string{"stream-lat", afterID}, + Streams: []string{"stream-lat", lastID}, Count: 10, Block: 10 * time.Millisecond, }).Result() diff --git a/adapter/redis_lua_context.go b/adapter/redis_lua_context.go index 50628f11..a1f03e25 100644 --- a/adapter/redis_lua_context.go +++ b/adapter/redis_lua_context.go @@ -2623,7 +2623,10 @@ func (c *luaScriptContext) cmdZRangeByScoreSlow(key []byte, options luaZRangeByS // the whole offset+limit budget on those filtered-out rows and miss // the real matches at score > value. // -// The score index is lex-sorted by (userKey, sortableScore, member). +// The score index is grouped by (userKey, sortableScore). Member bytes +// follow the score, but the MVCC timestamp suffix means equal-score +// member ordering is normalized by zsetRangeByScoreFast rather than +// trusted directly from physical scan order. // Conventions: // // minBound = -Inf -> startKey = ZSetScoreScanPrefix(key) diff --git a/adapter/redis_retry_test.go b/adapter/redis_retry_test.go index 77fa079d..3af3d222 100644 --- a/adapter/redis_retry_test.go +++ b/adapter/redis_retry_test.go @@ -219,6 +219,50 @@ func TestRedisExecLuaCompatRetriesWriteConflict(t *testing.T) { require.Equal(t, 1.0, zset.Entries[0].Score) } +func TestRedisZRemWideColumnRemovesMemberAndScoreIndex(t *testing.T) { + t.Parallel() + + ctx := context.Background() + st := store.NewMVCCStore() + coord := newRetryOnceCoordinator(st) + + srv := &RedisServer{ + store: st, + coordinator: coord, + scriptCache: map[string]string{}, + } + + key := []byte("retry:zrem-wide") + addConn := &recordingConn{} + srv.zadd(addConn, redcon.Command{Args: [][]byte{ + []byte(cmdZAdd), key, + []byte("55"), []byte("m8"), + []byte("-49"), []byte("m6"), + }}) + require.Empty(t, addConn.err) + require.Equal(t, int64(2), addConn.int) + + remConn := &recordingConn{} + srv.zrem(remConn, redcon.Command{Args: [][]byte{ + []byte(cmdZRem), key, []byte("m8"), + }}) + require.Empty(t, remConn.err) + require.Equal(t, int64(1), remConn.int) + + readTS := snapshotTS(coord.clock, st) + zset, exists, err := srv.loadZSetAt(ctx, key, readTS) + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, []redisZSetEntry{{Member: "m6", Score: -49}}, zset.Entries) + + memberExists, err := st.ExistsAt(ctx, store.ZSetMemberKey(key, []byte("m8")), readTS) + require.NoError(t, err) + require.False(t, memberExists) + scoreExists, err := st.ExistsAt(ctx, store.ZSetScoreKey(key, 55, []byte("m8")), readTS) + require.NoError(t, err) + require.False(t, scoreExists) +} + func TestRedisEvalRetriesWriteConflict(t *testing.T) { t.Parallel() diff --git a/adapter/redis_zset_cmds.go b/adapter/redis_zset_cmds.go index e1808471..9ae90abe 100644 --- a/adapter/redis_zset_cmds.go +++ b/adapter/redis_zset_cmds.go @@ -151,11 +151,12 @@ func (r *RedisServer) zsetRangeByScoreFast( hit, reason, err := r.zsetRangeEmptyFastResult(ctx, key, readTS) return nil, hit, reason, err } + scanLimit = zsetFastScanLimitWithTieSentinel(scanLimit, limit) kvs, err := r.zsetScoreScan(ctx, startKey, endKey, scanLimit, reverse, readTS) if err != nil { return nil, false, monitoring.LuaFastPathFallbackOther, err } - return r.finalizeZSetFastRange(ctx, key, kvs, offset, limit, scanLimit, scoreFilter, readTS) + return r.finalizeZSetFastRange(ctx, key, kvs, reverse, offset, limit, scanLimit, scoreFilter, readTS) } // finalizeZSetFastRange runs the post-scan priority guard, decodes @@ -165,12 +166,16 @@ func (r *RedisServer) zsetRangeByScoreFast( // // Takes scanLimit so we can detect a saturated scan: if the scanner // returned exactly scanLimit rows AND the caller's request is not -// satisfied (unbounded limit, or collected fewer entries than limit), -// there MAY be more entries beyond the scan window. In that case we -// return hit=false so the slow path can produce the authoritative -// answer -- the fast path MUST NOT silently truncate. +// satisfied (unbounded limit, or fewer than offset+limit matching +// entries), there MAY be more entries beyond the scan window. In +// that case we return hit=false so the slow path can produce the +// authoritative answer -- the fast path MUST NOT silently truncate. +// Bounded ranges with duplicate scores also fall back: the MVCC +// timestamp suffix can disturb physical ordering among equal-score +// member keys, so the slow full-load path owns exact LIMIT semantics +// for those ties. func (r *RedisServer) finalizeZSetFastRange( - ctx context.Context, key []byte, kvs []*store.KVPair, + ctx context.Context, key []byte, kvs []*store.KVPair, reverse bool, offset, limit, scanLimit int, scoreFilter func(float64) bool, readTS uint64, ) ([]redisZSetEntry, bool, monitoring.LuaFastPathFallbackReason, error) { // Priority guard runs after a candidate hit (mirrors post-PR #565 @@ -183,13 +188,21 @@ func (r *RedisServer) finalizeZSetFastRange( return nil, false, monitoring.LuaFastPathFallbackWrongType, nil } } - entries := decodeZSetScoreRange(key, kvs, offset, limit, scoreFilter) + entries := decodeZSetScoreRange(key, kvs, scoreFilter) // Truncation guard: the raw scanner hit its cap AND the caller did // not get a satisfied result. Entries beyond the window may // exist; defer to the slow path for correctness. - if zsetFastPathTruncated(len(kvs), scanLimit, len(entries), limit) { + if zsetFastPathTruncated(len(kvs), scanLimit, len(entries), offset, limit) { return nil, false, monitoring.LuaFastPathFallbackTruncated, nil } + if zsetFastPathNeedsTieFallback(entries, limit) { + return nil, false, monitoring.LuaFastPathFallbackTruncated, nil + } + sortZSetEntries(entries) + if reverse { + reverseZSetEntries(entries) + } + entries = applyZRangeLimit(entries, offset, limit) if len(entries) == 0 { hit, reason, err := r.zsetRangeEmptyFastResult(ctx, key, readTS) return nil, hit, reason, err @@ -208,17 +221,21 @@ func (r *RedisServer) finalizeZSetFastRange( // may have dropped entries that the caller's request would otherwise // include. Returns true when the scanner returned the full quota // (scannedRows == scanLimit) AND the caller's request is still -// unsatisfied (unbounded limit or collectedEntries < limit). In that -// case the caller must fall back to the slow full-load path to get -// the authoritative result. -func zsetFastPathTruncated(scannedRows, scanLimit, collectedEntries, limit int) bool { +// unsatisfied (unbounded limit or fewer than offset+limit matching +// entries). In that case the caller must fall back to the slow +// full-load path to get the authoritative result. +func zsetFastPathTruncated(scannedRows, scanLimit, matchingEntries, offset, limit int) bool { if scannedRows < scanLimit { return false } if limit < 0 { return true } - return collectedEntries < limit + needed := offset + limit + if needed < offset || needed > maxWideScanLimit { + return true + } + return matchingEntries < needed } // zsetFastPathEligible returns false (without error) when a legacy- @@ -263,6 +280,13 @@ func zsetFastScanLimit(offset, limit int) int { return offset + limit } +func zsetFastScanLimitWithTieSentinel(scanLimit, limit int) int { + if limit <= 0 || scanLimit >= maxWideScanLimit { + return scanLimit + } + return scanLimit + 1 +} + // zsetScoreScan picks Forward / Reverse ScanAt based on direction. func (r *RedisServer) zsetScoreScan( ctx context.Context, startKey, endKey []byte, scanLimit int, reverse bool, readTS uint64, @@ -275,32 +299,14 @@ func (r *RedisServer) zsetScoreScan( return kvs, cockerrors.WithStack(err) } -// zsetDecodeAllocSize returns a tight upper bound on the collected -// entry count for decodeZSetScoreRange: (kvLen - offset) capped by -// limit, never negative. Avoiding a make([]...len(kvs)) saves up to -// maxWideScanLimit entries of wasted slice capacity when the caller -// asked for a small window at a large offset. -func zsetDecodeAllocSize(kvLen, offset, limit int) int { - allocSize := kvLen - offset - if allocSize < 0 { - return 0 - } - if limit >= 0 && limit < allocSize { - return limit - } - return allocSize -} - // decodeZSetScoreRange decodes score-index scan results into -// redisZSetEntry, applying the post-scan score filter (exclusive -// bound edges) and the offset / limit pagination. Entries that fail -// to decode are silently dropped -- they can only appear under data -// corruption. +// redisZSetEntry, applying the post-scan score filter for exclusive +// bound edges. Entries that fail to decode are silently dropped -- +// they can only appear under data corruption. func decodeZSetScoreRange( - key []byte, kvs []*store.KVPair, offset, limit int, scoreFilter func(float64) bool, + key []byte, kvs []*store.KVPair, scoreFilter func(float64) bool, ) []redisZSetEntry { - entries := make([]redisZSetEntry, 0, zsetDecodeAllocSize(len(kvs), offset, limit)) - skipped := 0 + entries := make([]redisZSetEntry, 0, len(kvs)) for _, kv := range kvs { score, member, ok := store.ExtractZSetScoreAndMember(kv.Key, key) if !ok { @@ -309,23 +315,29 @@ func decodeZSetScoreRange( if scoreFilter != nil && !scoreFilter(score) { continue } - // Check limit saturation BEFORE the offset skip so a small - // limit with a large offset exits immediately instead of - // burning offset iterations on the skip branch. Correct for - // any (offset, limit): once len(entries) >= limit we are done - // regardless of remaining skip budget. - if limit >= 0 && len(entries) >= limit { - break - } - if skipped < offset { - skipped++ - continue - } entries = append(entries, redisZSetEntry{Member: string(member), Score: score}) } return entries } +func zsetFastPathNeedsTieFallback(entries []redisZSetEntry, limit int) bool { + if limit < 0 { + return false + } + seen := make(map[uint64]struct{}, len(entries)) + for _, entry := range entries { + bits := math.Float64bits(entry.Score) + if entry.Score == 0 { + bits = math.Float64bits(0) + } + if _, ok := seen[bits]; ok { + return true + } + seen[bits] = struct{}{} + } + return false +} + // zsetRangeEmptyFastResult is the empty-result tail: either the // score range is genuinely empty on a live zset (return empty + // hit=true) or the zset does not exist in wide-column form (return @@ -800,6 +812,48 @@ func (r *RedisServer) persistZSetEntriesTxn(ctx context.Context, key []byte, rea } return r.dispatchElems(ctx, true, readTS, elems) } + + memberPrefix := store.ZSetMemberScanPrefix(key) + memberEnd := store.PrefixScanEnd(memberPrefix) + probeKVs, probeErr := r.store.ScanAt(ctx, memberPrefix, memberEnd, 1, readTS) + if probeErr != nil { + return cockerrors.WithStack(probeErr) + } + if len(probeKVs) > 0 { + current, _, err := r.loadZSetAt(ctx, key, readTS) + if err != nil { + return err + } + st := &zsetTxnState{ + members: zsetEntriesToMap(entries), + origMembers: zsetEntriesToMap(current.Entries), + isWide: true, + exists: true, + dirty: true, + } + elems, lenDelta := buildZSetWideElems(key, st) + if lenDelta != 0 { + commitTS, err := r.coordinator.Clock().NextFenced() + if err != nil { + return cockerrors.Wrap(err, "persistZSetEntriesTxn: allocate commitTS") + } + deltaVal := store.MarshalZSetMetaDelta(store.ZSetMetaDelta{LenDelta: lenDelta}) + elems = append(elems, &kv.Elem[kv.OP]{ + Op: kv.Put, + Key: store.ZSetMetaDeltaKey(key, commitTS, 0), + Value: deltaVal, + }) + _, dispatchErr := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{ + IsTxn: true, + StartTS: normalizeStartTS(readTS), + CommitTS: commitTS, + Elems: elems, + }) + return cockerrors.WithStack(dispatchErr) + } + return r.dispatchElems(ctx, true, readTS, elems) + } + payload, err := marshalZSetValue(redisZSetValue{Entries: entries}) if err != nil { return err diff --git a/adapter/redis_zset_cmds_test.go b/adapter/redis_zset_cmds_test.go new file mode 100644 index 00000000..c3fbfd65 --- /dev/null +++ b/adapter/redis_zset_cmds_test.go @@ -0,0 +1,138 @@ +package adapter + +import ( + "context" + "testing" + + "github.com/bootjp/elastickv/monitoring" + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" +) + +func seedZSetScoreRowsForTest(t *testing.T, st store.MVCCStore, key []byte, commitTS uint64, entries []redisZSetEntry) { + t.Helper() + ctx := context.Background() + for _, entry := range entries { + member := []byte(entry.Member) + require.NoError(t, st.PutAt(ctx, store.ZSetMemberKey(key, member), store.MarshalZSetScore(entry.Score), commitTS, 0)) + require.NoError(t, st.PutAt(ctx, store.ZSetScoreKey(key, entry.Score, member), []byte{}, commitTS, 0)) + } + require.NoError(t, st.PutAt( + ctx, + store.ZSetMetaKey(key), + store.MarshalZSetMeta(store.ZSetMeta{Len: int64(len(entries))}), + commitTS, + 0, + )) +} + +func TestZSetRangeByScoreFastSortsSameScoreMembers(t *testing.T) { + t.Parallel() + + ctx := context.Background() + st := store.NewMVCCStore() + srv := &RedisServer{store: st} + key := []byte("zfast:same-score") + seedZSetScoreRowsForTest(t, st, key, 10, []redisZSetEntry{ + {Member: "m14", Score: -3}, + {Member: "m1", Score: -3}, + {Member: "m6", Score: -3}, + }) + + scorePrefix := store.ZSetScoreRangeScanPrefix(key, -3) + got, hit, reason, err := srv.zsetRangeByScoreFast( + ctx, key, scorePrefix, store.PrefixScanEnd(scorePrefix), + false, 0, -1, nil, 20, + ) + require.NoError(t, err) + require.True(t, hit) + require.Equal(t, monitoring.LuaFastPathFallbackNone, reason) + require.Equal(t, []redisZSetEntry{ + {Member: "m1", Score: -3}, + {Member: "m14", Score: -3}, + {Member: "m6", Score: -3}, + }, got) + + got, hit, reason, err = srv.zsetRangeByScoreFast( + ctx, key, scorePrefix, store.PrefixScanEnd(scorePrefix), + true, 0, -1, nil, 20, + ) + require.NoError(t, err) + require.True(t, hit) + require.Equal(t, monitoring.LuaFastPathFallbackNone, reason) + require.Equal(t, []redisZSetEntry{ + {Member: "m6", Score: -3}, + {Member: "m14", Score: -3}, + {Member: "m1", Score: -3}, + }, got) +} + +func TestZSetRangeByScoreFastFallsBackForBoundedScoreTies(t *testing.T) { + t.Parallel() + + ctx := context.Background() + st := store.NewMVCCStore() + srv := &RedisServer{store: st} + key := []byte("zfast:bounded-tie") + seedZSetScoreRowsForTest(t, st, key, 10, []redisZSetEntry{ + {Member: "m14", Score: 44}, + {Member: "m1", Score: 44}, + {Member: "m6", Score: 44}, + }) + + scorePrefix := store.ZSetScoreRangeScanPrefix(key, 44) + got, hit, reason, err := srv.zsetRangeByScoreFast( + ctx, key, scorePrefix, store.PrefixScanEnd(scorePrefix), + false, 0, 1, nil, 20, + ) + require.NoError(t, err) + require.False(t, hit) + require.Equal(t, monitoring.LuaFastPathFallbackTruncated, reason) + require.Nil(t, got) +} + +func TestZSetRangeByScoreFastAppliesBoundedWindowForUniqueScores(t *testing.T) { + t.Parallel() + + ctx := context.Background() + st := store.NewMVCCStore() + srv := &RedisServer{store: st} + key := []byte("zfast:bounded-unique") + seedZSetScoreRowsForTest(t, st, key, 10, []redisZSetEntry{ + {Member: "m14", Score: 1}, + {Member: "m1", Score: 2}, + {Member: "m6", Score: 3}, + }) + + prefix := store.ZSetScoreScanPrefix(key) + got, hit, reason, err := srv.zsetRangeByScoreFast( + ctx, key, prefix, store.PrefixScanEnd(prefix), + false, 1, 2, nil, 20, + ) + require.NoError(t, err) + require.True(t, hit) + require.Equal(t, monitoring.LuaFastPathFallbackNone, reason) + require.Equal(t, []redisZSetEntry{ + {Member: "m1", Score: 2}, + {Member: "m6", Score: 3}, + }, got) +} + +func TestZSetFastPathTruncatedFallsBackWhenRequestedWindowExceedsScanCap(t *testing.T) { + t.Parallel() + + require.True(t, zsetFastPathTruncated( + maxWideScanLimit, + maxWideScanLimit, + maxWideScanLimit, + maxWideScanLimit-10, + 20, + )) + require.False(t, zsetFastPathTruncated( + maxWideScanLimit-1, + maxWideScanLimit, + maxWideScanLimit-1, + maxWideScanLimit-10, + 20, + )) +} diff --git a/cmd/elastickv-admin/main.go b/cmd/elastickv-admin/main.go index e0a2064e..5657c0a2 100644 --- a/cmd/elastickv-admin/main.go +++ b/cmd/elastickv-admin/main.go @@ -662,6 +662,14 @@ func (f *fanout) currentTargets(ctx context.Context) []string { f.mu.Unlock() ch := f.refreshGroup.DoChan("members", func() (any, error) { + f.mu.Lock() + if f.members != nil && time.Since(f.members.fetchedAt) < f.refreshInterval { + addrs := append([]string(nil), f.members.addrs...) + f.mu.Unlock() + return addrs, nil + } + f.mu.Unlock() + bgCtx, cancel := context.WithTimeout(context.Background(), membershipRefreshBudget) defer cancel() return f.refreshMembership(bgCtx), nil diff --git a/jepsen/src/elastickv/jepsen_test.clj b/jepsen/src/elastickv/jepsen_test.clj index 0468a868..9de017df 100644 --- a/jepsen/src/elastickv/jepsen_test.clj +++ b/jepsen/src/elastickv/jepsen_test.clj @@ -1,24 +1,42 @@ (ns elastickv.jepsen-test (:gen-class) (:require [elastickv.redis-workload :as redis-workload] + [elastickv.redis-zset-safety-workload :as zset-safety-workload] [elastickv.dynamodb-workload :as dynamodb-workload] [elastickv.dynamodb-types-workload :as dynamodb-types-workload] [elastickv.s3-workload :as s3-workload] [elastickv.sqs-htfifo-workload :as sqs-htfifo-workload] [jepsen.cli :as cli])) -(defn elastickv-test [] - (redis-workload/elastickv-redis-test {})) +(defn elastickv-test + ([] (elastickv-test {})) + ([opts] (redis-workload/elastickv-redis-test opts))) -(defn elastickv-dynamodb-test [] - (dynamodb-workload/elastickv-dynamodb-test {})) +(defn elastickv-dynamodb-test + ([] (elastickv-dynamodb-test {})) + ([opts] (dynamodb-workload/elastickv-dynamodb-test opts))) (defn elastickv-dynamodb-types-test ([] (elastickv-dynamodb-types-test {})) ([opts] (dynamodb-types-workload/elastickv-dynamodb-types-test opts))) -(defn elastickv-s3-test [] - (s3-workload/elastickv-s3-test {})) +(defn elastickv-s3-test + ([] (elastickv-s3-test {})) + ([opts] (s3-workload/elastickv-s3-test opts))) + +(defn elastickv-zset-safety-test + ([] (elastickv-zset-safety-test {})) + ([opts] (zset-safety-workload/elastickv-zset-safety-test opts))) + +(def ^:private test-fns + "Map of user-facing test names to their constructor fns. The first + positional CLI arg selects which workload runs; if absent or unknown, + we default to `elastickv-test` for backward compatibility with + pre-existing invocations." + {"elastickv-test" elastickv-test + "elastickv-zset-safety-test" elastickv-zset-safety-test + "elastickv-dynamodb-test" elastickv-dynamodb-test + "elastickv-s3-test" elastickv-s3-test}) (defn elastickv-sqs-htfifo-test "HT-FIFO Jepsen test (PR 7b). Run via the workload's own -main: @@ -29,5 +47,39 @@ ([opts] (sqs-htfifo-workload/elastickv-sqs-htfifo-test opts))) (defn -main + "Dispatch to a named workload. Usage: + + lein run -m elastickv.jepsen-test [jepsen-subcmd] [jepsen-opts ...] + + Supported s: elastickv-test, elastickv-zset-safety-test, + elastickv-dynamodb-test, elastickv-s3-test. When the first positional + arg is not a known test name, we default to `elastickv-test` for + backward compatibility and forward ALL args to jepsen.cli/run!. + + The jepsen subcommand (`test` or `analyze`) is auto-prepended when + missing, so `lein run elastickv-zset-safety-test --nodes n1,n2` works + without the user repeating `test`." [& args] - (cli/run! (cli/single-test-cmd {:test-fn elastickv-test}) args)) + (let [[head & tail] args + [selected-fn remaining-args] (if-let [f (get test-fns head)] + [f tail] + [elastickv-test args]) + ;; jepsen.cli/run! requires a subcommand ("test" or "analyze") + ;; as the first arg. Insert "test" only when the user clearly + ;; did NOT supply a subcommand: + ;; - remaining-args is empty, OR + ;; - the first token is an option (starts with "-") + ;; If the first token looks like a subcommand (any non-option + ;; word, e.g. "test", "analyze", "serve", or a future jepsen.cli + ;; subcommand we don't hard-code), leave it alone and let + ;; jepsen.cli/run! handle it (including producing a better + ;; error message for unknown subcommands than we could here). + [next-head & _] remaining-args + prepend-test? (or (empty? remaining-args) + (and (string? next-head) + (.startsWith ^String next-head "-"))) + final-args (if prepend-test? + (cons "test" remaining-args) + remaining-args)] + (cli/run! (cli/single-test-cmd {:test-fn selected-fn}) + final-args))) diff --git a/jepsen/src/elastickv/redis_zset_safety_workload.clj b/jepsen/src/elastickv/redis_zset_safety_workload.clj new file mode 100644 index 00000000..26ba6aec --- /dev/null +++ b/jepsen/src/elastickv/redis_zset_safety_workload.clj @@ -0,0 +1,1241 @@ +(ns elastickv.redis-zset-safety-workload + "Jepsen workload verifying stronger safety properties of elastickv's + Redis ZSet (sorted set) implementation under faults. + + Beyond the simple visibility check in redis-zset-workload, this workload + exercises score correctness, ordering, range queries, phantom-member + freedom, and atomicity of compound ZSet mutations by using a custom, + model-based Checker. + + Operations (all target a single well-known key): + + {:f :zadd :value [member score]} ZADD key score member + {:f :zincrby :value [member delta]} ZINCRBY key delta member + {:f :zrem :value member} ZREM key member + {:f :zrange-all} ZRANGE key 0 -1 WITHSCORES + {:f :zrangebyscore :value [lo hi]} ZRANGEBYSCORE key lo hi WITHSCORES + + Semantics checked (see `zset-safety-checker`): + + 1. Score correctness: the score of any member observed by a :zrange-all + read must match the model's latest committed score for that member, + OR must match a score written by an operation that is concurrent with + the read (we cannot linearize concurrent writes to the same member, + so any such \"in-flight\" value is permitted). + 2. Order preservation: the result of :zrange-all must be sorted by + (score ascending, member lexicographically ascending). + 3. ZRANGEBYSCORE correctness: every member in a score-range read must + have a latest committed (or concurrent) score within [lo, hi]; and + every model member with a score in [lo, hi] must either be present + or be subject to a concurrent mutation. + 4. No phantom members: every member observed by a read must have been + introduced by some successful (or in-flight) operation. + 5. Atomicity: there is no explicit \"partial\" state to probe from the + client, but the checker treats every :ok operation as atomic — any + visible inconsistency (member present with no matching op, score + disagreeing with any known write, etc.) is reported." + (:require [clojure.string :as str] + [clojure.tools.logging :refer [warn]] + [elastickv.cli :as cli] + [elastickv.db :as ekdb] + [jepsen.db :as jdb] + [jepsen [checker :as checker] + [client :as client] + [generator :as gen] + [net :as net]] + [jepsen.checker.timeline :as timeline] + [jepsen.control :as control] + [jepsen.nemesis :as nemesis] + [jepsen.nemesis.combined :as combined] + [jepsen.os :as os] + [jepsen.os.debian :as debian] + [taoensso.carmine :as car])) + +;; --------------------------------------------------------------------------- +;; Constants +;; --------------------------------------------------------------------------- + +(def ^:private zset-key "jepsen-zset-safety") + +(def default-nodes ["n1" "n2" "n3" "n4" "n5"]) + +;; A small, fixed universe of members keeps contention high and makes the +;; model's state small enough to enumerate. +(def ^:private members + (mapv #(str "m" %) (range 16))) + +;; --------------------------------------------------------------------------- +;; Client +;; --------------------------------------------------------------------------- + +(defn- parse-double-safe + "Parse a Redis score string into a Double. Redis serializes infinite + scores as \"inf\" / \"+inf\" / \"-inf\", which Java's Double/parseDouble + does not accept (it expects \"Infinity\" / \"-Infinity\"). Handle both + encodings so the checker doesn't throw on infinite ZSET scores." + [s] + (let [raw (if (bytes? s) (String. ^bytes s "UTF-8") (str s)) + lower (str/lower-case raw)] + (cond + (or (= lower "inf") (= lower "+inf") (= lower "infinity") (= lower "+infinity")) + Double/POSITIVE_INFINITY + + (or (= lower "-inf") (= lower "-infinity")) + Double/NEGATIVE_INFINITY + + :else + (Double/parseDouble raw)))) + +(defn- coerce-zincrby-score + "Carmine's ZINCRBY reply is normally a score string, but under error / + timeout / protocol edge cases it may be nil, a numeric value, or + something else entirely. Stringifying nil produces \"nil\", which + parse-double-safe would then hand to Double/parseDouble and throw. + Explicitly classify the response so the invoke! path can record + :unknown-response as :info instead of masking it in a catch-all. + + Returns one of: + [:ok (double score)] + [:nil] ; nil response + [:error ] ; Carmine error reply + [:unexpected ] ; anything else" + [response] + (cond + (nil? response) + [:nil] + + (number? response) + [:ok (double response)] + + (or (string? response) (bytes? response)) + (try + [:ok (parse-double-safe response)] + (catch NumberFormatException _ + [:unexpected response])) + + ;; Carmine surfaces Redis error replies as exceptions by default, + ;; but some codepaths wrap them in an ex-info / Throwable value. + (instance? Throwable response) + [:error (or (.getMessage ^Throwable response) (str response))] + + :else + [:unexpected response])) + +(defn- coerce-zrem-count + "Carmine's ZREM reply is normally a Long (count of removed members), + but under protocol edge cases / Carmine versions / RESP2 vs RESP3 + differences it can also arrive as a numeric string (\"1\") or raw + bytes. Blindly calling `(long reply)` on those forms throws + ClassCastException, which would fall through to the general exception + handler and mask the real signal. + + Returns a non-negative long count. Unparseable or unexpected values + are treated as 0 (i.e. \"nothing removed\") so the op still resolves + as :ok -- matching the existing nil-guard behaviour. + " + [response] + (cond + (nil? response) + 0 + + (number? response) + (long response) + + (string? response) + (try + (Long/parseLong ^String response) + (catch NumberFormatException _ 0)) + + (bytes? response) + (try + (Long/parseLong (String. ^bytes response "UTF-8")) + (catch NumberFormatException _ 0)) + + :else + 0)) + +(defn- parse-withscores + "Carmine returns a flat [member score member score ...] vector for + ZRANGE WITHSCORES. Convert to a sorted vector of [member (double score)] + preserving server-returned order (score ascending, then member). + + Throws on nil or odd-length payloads: a nil WITHSCORES reply or a reply + with a dangling member is a protocol violation and this workload is meant + to surface exactly that kind of anomaly, not silently drop evidence." + [flat] + (when (nil? flat) + (throw (ex-info "WITHSCORES reply is nil" + {:payload flat}))) + (when (odd? (count flat)) + (throw (ex-info "WITHSCORES reply has odd element count" + {:count (count flat) + :payload flat}))) + (->> flat + (partition 2) + (mapv (fn [[m s]] + [(if (bytes? m) (String. ^bytes m "UTF-8") (str m)) + (parse-double-safe s)])))) + +(defn- zincrby! + "Executes a ZINCRBY against conn-spec and returns Carmine's raw reply + (normally a score string). Extracted so tests can stub the Redis call + without going through the `car/wcar` macro." + [conn-spec key delta member] + (car/wcar conn-spec (car/zincrby key (double delta) member))) + +(defn- zrem! + "Executes a ZREM against conn-spec and returns Carmine's raw reply + (normally an integer count of removed members). Extracted so tests + can stub the Redis call without going through the `car/wcar` macro." + [conn-spec key member] + (car/wcar conn-spec (car/zrem key member))) + +(defrecord ElastickvRedisZSetSafetyClient [node->port conn-spec] + client/Client + + (open! [this test node] + (let [port (get node->port node 6379) + host (or (:redis-host test) (name node))] + (assoc this :conn-spec {:pool {} :spec {:host host + :port port + :timeout-ms 2000}}))) + + (close! [this _test] this) + + (setup! [this _test] + ;; Hard-fail when :conn-spec is missing after open!. Silently (or + ;; even loudly) proceeding would leave stale data from a previous + ;; run under zset-key and risk false-positive checker results from + ;; that dirty state. Better to abort the run and surface the + ;; configuration problem. + (let [cs (or (:conn-spec this) + (throw (ex-info + (str "ZSet safety setup! cannot clear prior state:" + " :conn-spec is missing on client (open! did" + " not populate it). Aborting to avoid running" + " against stale data under " zset-key ".") + {:type ::missing-conn-spec + :zset-key zset-key})))] + ;; The cleanup DEL MUST succeed. If it fails (connection refused, + ;; Redis error reply, timeout, whatever), stale data from a prior + ;; run survives under zset-key and can produce false-positive + ;; safety verdicts in the checker. Log loudly AND re-throw so + ;; Jepsen aborts the run instead of silently running against + ;; dirty state. + (try + (car/wcar cs (car/del zset-key)) + (catch Throwable t + (warn t "ZSet safety setup! DEL failed -- aborting to avoid stale data") + (throw (ex-info + (str "ZSet safety setup! failed to clear prior state at " + zset-key ": " (or (.getMessage t) (str t)) + ". Refusing to run against potentially stale data.") + {:type ::cleanup-failed + :zset-key zset-key} + t))))) + this) + + (teardown! [this _test] this) + + (invoke! [_ _test op] + (let [cs conn-spec] + (try + (case (:f op) + :zadd + (let [[member score] (:value op)] + (car/wcar cs (car/zadd zset-key (double score) member)) + (assoc op :type :ok)) + + :zincrby + (let [[member delta] (:value op) + new-score (zincrby! cs zset-key delta member) + [tag v] (coerce-zincrby-score new-score)] + (case tag + :ok (assoc op :type :ok :value [member v]) + :nil (do (warn (str "ZSet safety ZINCRBY returned nil for " member)) + (assoc op :type :info + :error :nil-response)) + :error (do (warn (str "ZSet safety ZINCRBY returned error reply: " v)) + (assoc op :type :info + :error {:kind :error-response + :message v})) + :unexpected (do (warn (str "ZSet safety ZINCRBY returned unexpected reply: " (pr-str v))) + (assoc op :type :info + :error {:kind :unexpected-response + :value (pr-str v)})))) + + :zrem + (let [member (:value op) + ;; Carmine normally returns an integer count. Guard + ;; against nil / missing reply (protocol edge, closed + ;; connection, etc.) AND against non-numeric shapes + ;; (string "1", raw bytes) that some Carmine versions + ;; or RESP3 codepaths surface. A naked `(long reply)` + ;; would NPE on nil and ClassCastException on + ;; string/bytes, falling through to the general + ;; Exception handler and masking the real signal. + removed (zrem! cs zset-key member) + n (coerce-zrem-count removed)] + (assoc op :type :ok :value [member (pos? n)])) + + :zrange-all + (let [flat (car/wcar cs (car/zrange zset-key 0 -1 "WITHSCORES"))] + (try + (assoc op :type :ok :value (parse-withscores flat)) + (catch Throwable t + (warn t "ZSet safety ZRANGE returned malformed WITHSCORES payload") + (assoc op :type :ok + :value {:malformed? true + :error (or (.getMessage ^Throwable t) (str t)) + :payload flat})))) + + :zrangebyscore + (let [[lo hi] (:value op) + flat (car/wcar cs (car/zrangebyscore zset-key + (double lo) + (double hi) + "WITHSCORES"))] + (try + (assoc op :type :ok :value {:bounds [lo hi] + :members (parse-withscores flat)}) + (catch Throwable t + (warn t "ZSet safety ZRANGEBYSCORE returned malformed WITHSCORES payload") + (assoc op :type :ok + :value {:bounds [lo hi] + :malformed? true + :error (or (.getMessage ^Throwable t) (str t)) + :payload flat}))))) + (catch Throwable t + (warn t (str "ZSet safety op failed: " (:f op))) + (assoc op :type :info :error (or (.getMessage ^Throwable t) (str t)))))))) + +;; --------------------------------------------------------------------------- +;; Generator +;; --------------------------------------------------------------------------- + +(defn- rand-member [] (rand-nth members)) + +(defn- gen-op [] + (let [roll (rand)] + (cond + (< roll 0.35) + {:f :zadd :value [(rand-member) (double (- (rand-int 200) 100))]} + + (< roll 0.55) + {:f :zincrby :value [(rand-member) + (double (- (rand-int 20) 10))]} + + (< roll 0.65) + {:f :zrem :value (rand-member)} + + (< roll 0.90) + {:f :zrange-all} + + :else + (let [a (- (rand-int 200) 100) + b (- (rand-int 200) 100)] + {:f :zrangebyscore :value [(double (min a b)) (double (max a b))]})))) + +(defn- op-generator [] + (reify gen/Generator + (op [this test ctx] + [(gen/fill-in-op (gen-op) ctx) this]) + (update [this _ _ _] this))) + +;; --------------------------------------------------------------------------- +;; Checker +;; --------------------------------------------------------------------------- + +(defn- sorted-by-score-then-member? + "Validates the zset invariant: (score, member) ascending, strict." + [entries] + (loop [prev nil + es entries] + (cond + (empty? es) true + (nil? prev) (recur (first es) (rest es)) + :else + (let [[pm ps] prev + [cm cs] (first es)] + (cond + (< ps cs) (recur (first es) (rest es)) + (> ps cs) false + ;; equal score: members must be strictly lexicographically ordered + (neg? (compare pm cm)) (recur (first es) (rest es)) + :else false))))) + +(defn- index-by-time + "Return a vector of ops sorted by :index." + [ops] + (vec (sort-by :index ops))) + +(defn- pair-invokes-with-completions + "Returns a sequence of {:invoke inv :complete cmp} pairs for each + completed op (ok/fail/info). Invokes without a matching completion are + paired with nil (still in flight at history end)." + [history] + (let [by-process (group-by :process history)] + (mapcat + (fn [[_p ops]] + (let [ops (index-by-time ops)] + (loop [ops ops acc []] + (if (empty? ops) acc + (let [[o & rest-ops] ops] + (cond + (= :invoke (:type o)) + (let [c (first rest-ops)] + (if (and c (#{:ok :fail :info} (:type c))) + (recur (drop 1 rest-ops) (conj acc {:invoke o :complete c})) + (recur rest-ops (conj acc {:invoke o :complete nil})))) + :else (recur rest-ops acc))))))) + by-process))) + +(defn- mutation? + [op] + (#{:zadd :zincrby :zrem} (:f op))) + +(defn- completed-mutation-window + "For each completed mutation, produce + {:member m :score s :zrem? bool? :invoke-idx i :complete-idx j :type t}. + - :zadd: :score is the requested score (always known). + - :zincrby: when :ok, :score is the server-returned final score. When + :info or pending, :score is nil but :delta is still used to enumerate + the possible resulting scores from each admissible prior state. + - :zrem: :removed? is the boolean returned by ZREM (true iff the + member existed). A no-op ZREM (returns 0) does NOT mutate state, so + the model must not treat it as a deletion. + :info / :pending mutations are still emitted so concurrent windows + account for their possible effect." + [pairs] + (keep + (fn [{:keys [invoke complete]}] + (when (and invoke (mutation? invoke)) + (let [f (:f invoke) + t (if complete (:type complete) :pending) + inv-idx (:index invoke) + cmp-idx (when complete (:index complete))] + (case f + :zadd + (let [[m s] (:value invoke)] + {:f :zadd :member m :score (double s) + :type t :invoke-idx inv-idx :complete-idx cmp-idx}) + + :zincrby + (let [[m _delta] (:value invoke) + delta (double _delta) + ;; ZINCRBY's resulting score is only knowable from the + ;; server reply. For :info/:pending we don't have it. + ok? (= :ok t) + s (when (and ok? (vector? (:value complete))) + (second (:value complete)))] + {:f :zincrby :member m :delta delta :score (some-> s double) + :type t :invoke-idx inv-idx :complete-idx cmp-idx}) + + :zrem + (let [m (:value invoke) + ;; invoke! returns [member removed?]. For :info we don't + ;; know whether the member was removed. + removed? (cond + (and (= :ok t) + (vector? (:value complete))) + (boolean (second (:value complete))) + ;; pending / info: assume removal could have + ;; happened; the checker treats it as a + ;; possibly-concurrent deletion via the + ;; concurrent window. + :else true)] + {:f :zrem :member m :score nil + :zrem? true :removed? removed? + :type t :invoke-idx inv-idx :complete-idx cmp-idx}))))) + pairs)) + +(defn- mutations-by-member + [mutations] + (group-by :member mutations)) + +(defn- concurrent? + "A mutation m is concurrent with a read r iff m's invoke precedes r's + completion AND m's completion (or end-of-history) follows r's invoke." + [m read-inv-idx read-cmp-idx] + (and (<= (:invoke-idx m) read-cmp-idx) + (or (nil? (:complete-idx m)) + (>= (:complete-idx m) read-inv-idx)))) + +(def ^:private absent-state {:present? false :score nil}) +(def ^:private impossible-state {:present? false :score nil :impossible? true}) + +(defn- score-eq? + [a b] + (and (some? a) + (some? b) + (<= (Math/abs (- (double a) (double b))) 1.0E-9))) + +(defn- apply-mutation-possibilities + "Apply one mutation to a per-member state, returning every state still + compatible with the mutation's reply. Empty means the reply is impossible + from this input state; e.g. ZINCRBY returned a score other than + previous-score + delta, or ZREM reported a real deletion of an absent + member." + [st m] + (let [st (or st absent-state)] + (case (:f m) + :zadd + #{{:present? true :score (:score m)}} + + :zincrby + (let [previous-score (if (:present? st) (:score st) 0.0) + next-score (+ previous-score (:delta m))] + (if (and (= :ok (:type m)) (some? (:score m))) + (if (score-eq? next-score (:score m)) + #{{:present? true :score (:score m)}} + #{}) + #{{:present? true :score next-score}})) + + :zrem + (cond + ;; A confirmed no-op ZREM is only compatible with the member + ;; being absent at the ZREM linearization point. It is not + ;; deletion evidence, but it is still ordering evidence: if the + ;; prior state is definitely present, Redis could not have + ;; returned 0. + (and (= :ok (:type m)) (false? (:removed? m))) + (if (:present? st) #{} #{st}) + + ;; A confirmed deletion proves the member was present immediately + ;; before this ZREM in any admissible linearization. + (and (= :ok (:type m)) (true? (:removed? m))) + (if (:present? st) #{absent-state} #{}) + + ;; :info / pending ZREM may have been skipped, removed a present + ;; member, or observed the member absent. + :else + (if (:present? st) #{st absent-state} #{st}))))) + +(defn- advance-states + [states m] + (set (mapcat #(apply-mutation-possibilities % m) states))) + +(declare linearized-read-states) + +(defn- model-before + "Construct authoritative per-member state from mutations whose + completions strictly precede read-inv-idx. Returns + {member -> {:present? bool :score s}}. :ok mutations before the read + are required; pre-read :info mutations are optional because they may + have taken effect server-side and may be necessary to make a later :ok + relative mutation reply consistent. Pending operations without a + completion are deferred to the concurrent-window check." + [mutations-by-m read-inv-idx] + (reduce-kv + (fn [model member muts] + (let [before-read? #(and (some? (:complete-idx %)) + (< (:complete-idx %) read-inv-idx)) + required (->> muts + (filter #(and (= :ok (:type %)) + (before-read? %))) + vec) + optional (->> muts + (filter #(and (= :info (:type %)) + (before-read? %))) + vec) + states (linearized-read-states #{absent-state} required optional) + state (first states)] + (assoc model member (or state impossible-state)))) + {} + mutations-by-m)) + +(defn- concurrent-mutations-for-member + "All mutations concurrent with the read window that could have taken + effect. :fail completions are excluded: in Jepsen, :fail means the op + definitively did NOT execute, so it contributes neither an allowed + score nor uncertainty about presence. :ok and :info/:pending are + included (either may be visible to the read)." + [muts read-inv-idx read-cmp-idx] + (filter #(and (not= :fail (:type %)) + (concurrent? % read-inv-idx read-cmp-idx)) + muts)) + +(defn- effective-zrem? + "True iff this mutation can actually remove the member. + A completed ZREM with removed? false is a confirmed no-op and must not + relax required-presence or candidate-absence checks." + [m] + (and (= :zrem (:f m)) + (not= false (:removed? m)))) + +(defn- strictly-follows? + "True iff `later` is ordered after `earlier` by real time." + [later earlier] + (and (some? (:complete-idx earlier)) + (> (:invoke-idx later) (:complete-idx earlier)))) + +(defn- superseded-by-preceding-state-change? + "True iff a committed mutation before the read strictly follows m and + changes this member's state. Such a later committed write/remove + determines the read's base state and makes m's pre-read uncertainty + irrelevant for this read. + + Relative increments are not absolute overwrites. Earlier uncertainty + may still be required to make a later :ok ZINCRBY reply consistent, so + later ZINCRBYs do not supersede pre-read :info operations." + [preceding m] + (some #(and (or (= :zadd (:f %)) + (effective-zrem? %)) + (strictly-follows? % m)) + preceding)) + +(defn- real-time-before? + [a b] + (and (some? (:complete-idx a)) + (< (:complete-idx a) (:invoke-idx b)))) + +(defn- unique-mutations + [muts] + (->> muts + (reduce (fn [acc m] (assoc acc (:invoke-idx m) m)) + (sorted-map)) + vals + vec)) + +(defn- drop-index + [v idx] + (vec (concat (subvec v 0 idx) (subvec v (inc idx))))) + +(defn- enabled-indexed + [remaining] + (keep-indexed + (fn [idx m] + (when-not (some #(and (not (identical? % m)) + (real-time-before? % m)) + remaining) + [idx m])) + remaining)) + +(defn- linearized-read-states + "Enumerate possible member states at a read. Required mutations are + committed tail candidates and must appear before the read. Optional + mutations are :info/pending or concurrent ops and may either be absent + from the read's prefix or appear in any real-time-consistent order." + [initial-states required optional] + (let [items (vec (concat (map #(assoc % ::required? true) required) + (map #(assoc % ::required? false) optional))) + memo (atom {})] + (letfn [(step [states remaining] + (let [k [states remaining]] + (if (contains? @memo k) + (get @memo k) + (let [result (if (empty? remaining) + states + (reduce + (fn [acc [idx item]] + (let [remaining' (drop-index remaining idx) + applied (advance-states states item) + acc (if (::required? item) + acc + (into acc (step states remaining')))] + (if (seq applied) + (into acc (step applied remaining')) + acc))) + #{} + (enabled-indexed remaining)))] + (swap! memo assoc k result) + result))))] + (step initial-states items)))) + +(defn- possible-states-for-member + "Enumerate possible states for one member at a read. `force-required?` + can promote otherwise-concurrent successful mutations into the read prefix; + this is used by cross-member prefix checks when another visible mutation + forces all real-time predecessors into the same read prefix." + ([mutations-by-m member read-inv-idx read-cmp-idx] + (possible-states-for-member mutations-by-m + member + read-inv-idx + read-cmp-idx + (constantly false))) + ([mutations-by-m member read-inv-idx read-cmp-idx force-required?] + (let [muts (get mutations-by-m member []) + preceding-ok? #(and (= :ok (:type %)) + (some? (:complete-idx %)) + (< (:complete-idx %) read-inv-idx)) + required (->> muts + (filter #(or (preceding-ok? %) + (and (= :ok (:type %)) + (force-required? %)))) + unique-mutations) + required-ids (set (map :invoke-idx required)) + required? #(contains? required-ids (:invoke-idx %)) + pre-read-info (->> muts + (filter #(and (= :info (:type %)) + (some? (:complete-idx %)) + (< (:complete-idx %) read-inv-idx) + (not (required? %)) + (not (superseded-by-preceding-state-change? + required %))))) + concurrent (->> (concurrent-mutations-for-member muts + read-inv-idx + read-cmp-idx) + (remove required?))] + (linearized-read-states #{absent-state} + required + (concat pre-read-info concurrent))))) + +(defn- allowed-scores-for-member + "Compute the set of states considered valid for `member` by a read + whose window is [read-inv-idx, read-cmp-idx]. + + ZINCRBY replies are state constraints, not standalone allowed scores: + an :ok reply S is valid only if the immediately preceding score plus + delta equals S. ZREM true is also ordering evidence: it can only occur + after some linearized write made the member present. The checker keeps + those constraints by enumerating the committed tail plus any optional + :info/pending/concurrent operations in real-time-consistent orders." + [mutations-by-m member read-inv-idx read-cmp-idx] + (let [possible-states (possible-states-for-member mutations-by-m + member + read-inv-idx + read-cmp-idx) + present-states (filter :present? possible-states) + scores (set (map :score present-states)) + can-be-present? (boolean (seq present-states)) + must-be-present? (boolean (and (seq possible-states) + (every? :present? possible-states)))] + {:scores scores + :can-be-present? (boolean can-be-present?) + :must-be-present? must-be-present?})) + +(defn- score-definitely-in-range? + "True iff the member's committed score is definitively in [lo, hi] + for the purposes of completeness: every candidate score is inside the + range. Used by ZRANGEBYSCORE completeness." + [scores lo hi] + (boolean (and (seq scores) + (every? #(<= lo % hi) scores)))) + +(defn- duplicate-members + "Return the set of members that appear more than once in entries." + [entries] + (->> entries + (map first) + frequencies + (keep (fn [[m n]] (when (> n 1) m))) + set)) + +(defn- without-mutation + [mutations-by-m target] + (update mutations-by-m + (:member target) + (fn [muts] + (vec (remove #(= (:invoke-idx %) (:invoke-idx target)) + muts))))) + +(defn- mutation-can-produce-score? + [mutations-by-m m member score read-inv-idx read-cmp-idx] + (case (:f m) + :zadd + (score-eq? (:score m) score) + + :zincrby + (if (and (= :ok (:type m)) + (some? (:score m))) + (score-eq? (:score m) score) + (some (fn [st] + (some #(and (:present? %) + (score-eq? (:score %) score)) + (apply-mutation-possibilities st m))) + (possible-states-for-member (without-mutation mutations-by-m m) + member + read-inv-idx + read-cmp-idx))) + + false)) + +(defn- observed-score-requires-mutation? + [mutations-by-m m member score read-inv-idx read-cmp-idx] + (and (not= :fail (:type m)) + (= member (:member m)) + (concurrent? m read-inv-idx read-cmp-idx) + (mutation-can-produce-score? mutations-by-m + m + member + score + read-inv-idx + read-cmp-idx) + (let [{:keys [scores can-be-present?]} + (allowed-scores-for-member (without-mutation mutations-by-m m) + member + read-inv-idx + read-cmp-idx)] + (or (not can-be-present?) + (not (contains? scores score)))))) + +(defn- visible-state? + [st bounds] + (and (:present? st) + (if-let [[lo hi] bounds] + (<= lo (:score st) hi) + true))) + +(defn- forced-prefix-states + [mutations-by-m member read-inv-idx read-cmp-idx anchor bounds] + (let [states (possible-states-for-member + mutations-by-m + member + read-inv-idx + read-cmp-idx + #(real-time-before? % anchor))] + (when (and (seq states) + (every? #(visible-state? % bounds) states)) + states))) + +(defn- observed-absence-requires-zrem? + [mutations-by-m m member read-inv-idx read-cmp-idx bounds] + (and (effective-zrem? m) + (not= :fail (:type m)) + (= member (:member m)) + (concurrent? m read-inv-idx read-cmp-idx) + (let [states (possible-states-for-member (without-mutation mutations-by-m m) + member + read-inv-idx + read-cmp-idx)] + (and (seq states) + (every? #(visible-state? % bounds) states))))) + +(def ^:private missing-observed ::missing-observed) + +(defn- read-prefix-errors + [mutations-by-m entries read-inv-idx read-cmp-idx bounds] + (let [observed (into {} entries) + all-mutations (mapcat second mutations-by-m) + kind (if bounds :fractured-read-prefix-range :fractured-read-prefix) + score-anchors (for [[member score] entries + anchor (get mutations-by-m member) + :when (observed-score-requires-mutation? + mutations-by-m + anchor + member + score + read-inv-idx + read-cmp-idx)] + {:member member + :score score + :anchor anchor}) + zrem-anchors (for [[member muts] mutations-by-m + :when (not (contains? observed member)) + anchor muts + :when (observed-absence-requires-zrem? + mutations-by-m + anchor + member + read-inv-idx + read-cmp-idx + bounds)] + {:member member + :score nil + :anchor anchor}) + anchors (concat score-anchors zrem-anchors)] + (->> (for [{:keys [member score anchor]} anchors + predecessor all-mutations + :let [forced-states (forced-prefix-states mutations-by-m + (:member predecessor) + read-inv-idx + read-cmp-idx + anchor + bounds) + observed-score (get observed + (:member predecessor) + missing-observed)] + :when (and (not= member (:member predecessor)) + (= :ok (:type predecessor)) + (real-time-before? predecessor anchor) + (seq forced-states) + (or (= missing-observed observed-score) + (not (some #(score-eq? (:score %) observed-score) + forced-states))))] + (cond-> {:kind kind + :index read-cmp-idx + :visible-member member + :visible-score score + :visible-op (:f anchor) + :visible-invoke-idx (:invoke-idx anchor) + :predecessor-op (:f predecessor) + :predecessor-complete-idx (:complete-idx predecessor)} + (= missing-observed observed-score) + (assoc :omitted-member (:member predecessor)) + + (not= missing-observed observed-score) + (assoc :stale-member (:member predecessor) + :observed-predecessor-score observed-score + :forced-predecessor-scores (set (map :score forced-states))) + + bounds (assoc :bounds bounds))) + distinct + vec))) + +(defn- entry-map + [entries] + (into {} entries)) + +(defn- member-state-in-read + [entries-by-member member] + (if (contains? entries-by-member member) + {:present? true :score (get entries-by-member member)} + absent-state)) + +(defn- mutation-could-affect-read-gap? + [m first-read-inv-idx second-read-cmp-idx] + (and (not= :fail (:type m)) + (<= (:invoke-idx m) second-read-cmp-idx) + (or (nil? (:complete-idx m)) + (>= (:complete-idx m) first-read-inv-idx)))) + +(defn- check-zrange-all-read-stability + [mutations-by-m read-pairs] + (let [full-reads (->> read-pairs + (filter (fn [{:keys [invoke complete]}] + (and (= :zrange-all (:f invoke)) + (not (:malformed? (:value complete)))))) + (sort-by #(-> % :invoke :index)) + vec) + members (set (concat (keys mutations-by-m) + (mapcat (comp keys entry-map :value :complete) + full-reads)))] + (->> (for [[idx later] (map-indexed vector full-reads) + earlier (subvec full-reads 0 idx) + :let [first-cmp-idx (-> earlier :complete :index) + second-inv-idx (-> later :invoke :index)] + :when (< first-cmp-idx second-inv-idx) + :let [first-inv-idx (-> earlier :invoke :index) + second-cmp-idx (-> later :complete :index) + first-entries (entry-map (-> earlier :complete :value)) + second-entries (entry-map (-> later :complete :value))] + member members + :let [first-state (member-state-in-read first-entries member) + second-state (member-state-in-read second-entries member)] + :when (and (not= first-state second-state) + (not-any? #(mutation-could-affect-read-gap? + % + first-inv-idx + second-cmp-idx) + (get mutations-by-m member [])))] + {:kind :unstable-read-without-mutation + :first-index first-cmp-idx + :second-index second-cmp-idx + :member member + :first-state first-state + :second-state second-state}) + distinct + vec))) + +(defn- check-zrange-all + [mutations-by-m {:keys [invoke complete] :as _pair}] + (let [entries (:value complete) + inv-idx (:index invoke) + cmp-idx (:index complete) + errors (atom [])] + (if (:malformed? entries) + [{:kind :malformed-read + :index cmp-idx + :error (:error entries) + :payload (:payload entries)}] + (do + ;; 1. Ordering + (when-not (sorted-by-score-then-member? entries) + (swap! errors conj {:kind :unsorted + :index cmp-idx + :entries entries})) + ;; 1b. No duplicate members: a ZSet read must return each member at + ;; most once. A duplicate-member result could otherwise satisfy + ;; ordering and score-membership checks while hiding a real bug. + (let [dupes (duplicate-members entries)] + (when (seq dupes) + (swap! errors conj {:kind :duplicate-members + :index cmp-idx + :members dupes}))) + ;; 2. For each observed (member,score): validate presence + score. + ;; can-be-present? catches both phantoms (member never existed) + ;; and stale reads (member committed-removed before the read + ;; with no concurrent re-add). + (doseq [[member score] entries] + (let [{:keys [scores can-be-present?]} + (allowed-scores-for-member mutations-by-m member inv-idx cmp-idx)] + (cond + (not can-be-present?) + (swap! errors conj {:kind :unexpected-presence + :index cmp-idx + :member member + :score score}) + (not (contains? scores score)) + (swap! errors conj {:kind :score-mismatch + :index cmp-idx + :member member + :observed score + :allowed scores})))) + ;; 3. Completeness: model-required members must appear. + ;; A member is required-present only if every admissible + ;; linearization leaves it present (must-be-present?). This + ;; correctly skips members that an :info or concurrent ZREM + ;; might have removed before the read. + (let [model (model-before mutations-by-m inv-idx) + observed-members (into #{} (map first) entries)] + (doseq [[member state] model] + (if (:impossible? state) + (swap! errors conj {:kind :impossible-mutation-chain + :index cmp-idx + :member member}) + (let [{:keys [must-be-present?]} + (allowed-scores-for-member mutations-by-m member inv-idx cmp-idx)] + (when (and must-be-present? + (not (contains? observed-members member))) + (swap! errors conj {:kind :missing-member + :index cmp-idx + :member member})))))) + (swap! errors into (read-prefix-errors mutations-by-m + entries + inv-idx + cmp-idx + nil)) + @errors)))) + +(defn- check-zrangebyscore + [mutations-by-m {:keys [invoke complete] :as _pair}] + (let [{:keys [bounds members] :as value} (:value complete) + [lo hi] bounds + inv-idx (:index invoke) + cmp-idx (:index complete) + errors (atom [])] + (if (:malformed? value) + [{:kind :malformed-read-range + :index cmp-idx + :bounds bounds + :error (:error value) + :payload (:payload value)}] + (do + (when-not (sorted-by-score-then-member? members) + (swap! errors conj {:kind :unsorted-range + :index cmp-idx + :bounds bounds + :members members})) + (let [dupes (duplicate-members members)] + (when (seq dupes) + (swap! errors conj {:kind :duplicate-members-range + :index cmp-idx + :bounds bounds + :members dupes}))) + ;; Observed members must be within bounds AND have a known allowed score. + (doseq [[member score] members] + (when (or (< score lo) (> score hi)) + (swap! errors conj {:kind :out-of-range + :index cmp-idx + :bounds bounds + :member member + :score score})) + (let [{:keys [scores can-be-present?]} + (allowed-scores-for-member mutations-by-m member inv-idx cmp-idx)] + (cond + (not can-be-present?) + (swap! errors conj {:kind :unexpected-presence-range + :index cmp-idx + :member member + :score score}) + (not (contains? scores score)) + (swap! errors conj {:kind :score-mismatch-range + :index cmp-idx + :member member + :observed score + :allowed scores})))) + ;; Completeness within bounds: a model member must appear only when + ;; (a) every admissible linearization leaves it present + ;; (must-be-present?), AND + ;; (b) its score is definitively within [lo, hi] across all + ;; admissible linearizations. + (let [model (model-before mutations-by-m inv-idx) + observed-members (into #{} (map first) members)] + (doseq [[member state] model] + (if (:impossible? state) + (swap! errors conj {:kind :impossible-mutation-chain + :index cmp-idx + :bounds bounds + :member member}) + (let [{:keys [must-be-present? scores]} + (allowed-scores-for-member mutations-by-m member inv-idx cmp-idx)] + (when (and must-be-present? + (score-definitely-in-range? scores lo hi) + (not (contains? observed-members member))) + ;; Report the full set of admissible scores (:allowed), not + ;; just an arbitrary first element -- picking `(first + ;; scores)` on a multi-element set is misleading when + ;; concurrent writers leave several linearizations valid. + ;; :allowed matches the convention used by the sibling + ;; :score-mismatch-range error above. :expected-score is + ;; retained (as `(first scores)` for a single-element set, + ;; nil otherwise) for backward compatibility with any + ;; out-of-tree consumers. + (swap! errors conj {:kind :missing-member-range + :index cmp-idx + :bounds bounds + :member member + :allowed scores + :expected-score (when (= 1 (count scores)) + (first scores))})))))) + (swap! errors into (read-prefix-errors mutations-by-m + members + inv-idx + cmp-idx + bounds)) + @errors)))) + +(defn zset-safety-checker + "Custom Jepsen checker: validates ZSet safety properties using a + last-writer model combined with a concurrent-write relaxation." + [] + (reify checker/Checker + (check [_ _test history _opts] + (let [pairs (pair-invokes-with-completions history) + mutations (completed-mutation-window pairs) + mutations-by-m (mutations-by-member mutations) + read-pairs (filter (fn [{:keys [invoke complete]}] + (and invoke complete + (= :ok (:type complete)) + (#{:zrange-all :zrangebyscore} + (:f invoke)))) + pairs) + all-errors (reduce + (fn [acc {:keys [invoke] :as pair}] + (into acc + (case (:f invoke) + :zrange-all (check-zrange-all mutations-by-m pair) + :zrangebyscore (check-zrangebyscore mutations-by-m pair)))) + [] + read-pairs) + all-errors (into all-errors + (check-zrange-all-read-stability mutations-by-m + read-pairs)) + by-kind (group-by :kind all-errors) + ;; Vacuous-pass guard: if the run produced zero + ;; successful reads, we have no evidence that the system + ;; under test actually satisfies ZSet safety -- every op + ;; may have been downgraded to :info because Redis was + ;; unreachable or every read timed out. Returning + ;; `:valid? true` in that case would be a false-green. + ;; Emit `:valid? :unknown` with a diagnostic reason; the + ;; cli's `fail-on-invalid!` treats anything other than + ;; `true` as a failure (see elastickv.cli/fail-on-invalid!). + no-successful-reads? (zero? (count read-pairs)) + valid? (cond + (seq all-errors) false + no-successful-reads? :unknown + :else true)] + (cond-> {:valid? valid? + :reads (count read-pairs) + :mutations (count mutations) + :error-count (count all-errors) + :errors-by-kind (into {} (map (fn [[k v]] [k (count v)]) by-kind)) + :first-errors (take 20 all-errors)} + no-successful-reads? + (assoc :reason + (str "No successful :zrange-all / :zrangebyscore reads" + " completed -- cannot assert ZSet safety. Likely" + " Redis was unreachable or every read timed out;" + " re-run against a healthy cluster."))))))) + +;; --------------------------------------------------------------------------- +;; Workload +;; --------------------------------------------------------------------------- + +(defn elastickv-zset-safety-workload + [opts] + (let [node->port (or (:node->port opts) + (zipmap default-nodes (repeat 6379))) + client (->ElastickvRedisZSetSafetyClient node->port nil)] + {:client client + :checker (checker/compose + {:zset-safety (zset-safety-checker) + :timeline (timeline/html)}) + :generator (op-generator) + :final-generator (gen/once {:f :zrange-all})})) + +(defn elastickv-zset-safety-test + "Builds a Jepsen test map that drives elastickv's Redis ZSet safety + workload." + ([] (elastickv-zset-safety-test {})) + ([opts] + (let [nodes (or (:nodes opts) default-nodes) + redis-ports (or (:redis-ports opts) + (repeat (count nodes) (or (:redis-port opts) 6379))) + node->port (or (:node->port opts) + (cli/ports->node-map redis-ports nodes)) + local? (:local opts) + db (if local? + jdb/noop + (ekdb/db {:grpc-port (or (:grpc-port opts) 50051) + :redis-port node->port + :raft-groups (:raft-groups opts) + :shard-ranges (:shard-ranges opts)})) + rate (double (or (:rate opts) 10)) + time-limit (or (:time-limit opts) 60) + faults (if local? + [] + (cli/normalize-faults (or (:faults opts) [:partition :kill]))) + nemesis-p (when-not local? + (combined/nemesis-package {:db db + :faults faults + :interval (or (:fault-interval opts) 40)})) + nemesis-gen (if nemesis-p + (:generator nemesis-p) + (gen/once {:type :info :f :noop})) + workload (elastickv-zset-safety-workload + (assoc opts :node->port node->port))] + (merge workload + {:name (or (:name opts) "elastickv-redis-zset-safety") + :nodes nodes + :db db + :redis-host (:redis-host opts) + :os (if local? os/noop debian/os) + :net (if local? net/noop net/iptables) + :ssh (merge {:username "vagrant" + :private-key-path "/home/vagrant/.ssh/id_rsa" + :strict-host-key-checking false} + (when local? {:dummy true}) + (:ssh opts)) + :remote control/ssh + :nemesis (if nemesis-p (:nemesis nemesis-p) nemesis/noop) + ;; The inner workload's :final-generator is the trivially- + ;; serializable (gen/once {:f :zrange-all}) -- a single + ;; Limit defrecord wrapping a plain map. It round-trips + ;; through Jepsen 0.3.x's Fressian store cleanly + ;; (verified at 86 bytes), so we don't override it here. + :concurrency (or (:concurrency opts) 5) + :generator (->> (:generator workload) + (gen/nemesis nemesis-gen) + (gen/stagger (/ rate)) + (gen/time-limit time-limit))})))) + +;; --------------------------------------------------------------------------- +;; CLI +;; --------------------------------------------------------------------------- + +(def zset-safety-cli-opts + [[nil "--ports PORTS" "Comma-separated Redis ports (per node)." + :default nil + :parse-fn (fn [s] + (->> (str/split s #",") + (remove str/blank?) + (mapv #(Integer/parseInt %))))] + [nil "--redis-port PORT" "Redis port applied to all nodes." + :default 6379 + :parse-fn #(Integer/parseInt %)]]) + +(defn- prepare-zset-safety-opts [options] + (let [ports (or (:ports options) nil) + options (cli/parse-common-opts options ports)] + (assoc options + :redis-host (:host options) + :redis-ports ports + :redis-port (:redis-port options)))) + +(defn -main [& args] + (cli/run-workload! args + (into cli/common-cli-opts zset-safety-cli-opts) + prepare-zset-safety-opts + elastickv-zset-safety-test)) diff --git a/jepsen/test/elastickv/jepsen_test_test.clj b/jepsen/test/elastickv/jepsen_test_test.clj index a2e6b6c4..463c4a11 100644 --- a/jepsen/test/elastickv/jepsen_test_test.clj +++ b/jepsen/test/elastickv/jepsen_test_test.clj @@ -4,3 +4,10 @@ (deftest builds-test-spec (is (map? (jt/elastickv-test)))) + +(deftest selected-workloads-accept-option-map + (doseq [test-fn [jt/elastickv-test + jt/elastickv-dynamodb-test + jt/elastickv-s3-test + jt/elastickv-zset-safety-test]] + (is (map? (test-fn {}))))) diff --git a/jepsen/test/elastickv/redis_zset_safety_workload_test.clj b/jepsen/test/elastickv/redis_zset_safety_workload_test.clj new file mode 100644 index 00000000..1f0087be --- /dev/null +++ b/jepsen/test/elastickv/redis_zset_safety_workload_test.clj @@ -0,0 +1,1265 @@ +(ns elastickv.redis-zset-safety-workload-test + "Unit tests for the ZSet safety workload's test-spec construction and + the model-based checker's edge cases (no-op ZREM, :info ZINCRBY)." + (:require [clojure.test :refer :all] + [jepsen.checker :as checker] + [jepsen.client :as client] + [elastickv.redis-zset-safety-workload :as workload])) + +;; --------------------------------------------------------------------------- +;; Test-spec construction +;; --------------------------------------------------------------------------- + +(deftest builds-test-spec + (let [t (workload/elastickv-zset-safety-test {})] + (is (map? t)) + (is (= "elastickv-redis-zset-safety" (:name t))) + (is (= ["n1" "n2" "n3" "n4" "n5"] (:nodes t))) + (is (some? (:client t))) + (is (some? (:checker t))) + (is (some? (:generator t))))) + +(deftest custom-options-override-defaults + (let [t (workload/elastickv-zset-safety-test + {:time-limit 30 + :concurrency 8 + :rate 4})] + (is (= 8 (:concurrency t))))) + +;; --------------------------------------------------------------------------- +;; Checker edge cases +;; --------------------------------------------------------------------------- + +(defn- run-checker + "Run the workload's safety checker against an in-memory history. + Bypasses the composed timeline.html checker (which writes files to + the test store) so tests stay hermetic." + [history] + (checker/check (workload/zset-safety-checker) + (workload/elastickv-zset-safety-test {}) + history + nil)) + +(deftest noop-zrem-does-not-flag-correct-read + ;; ZREM of a member that was never added returns 0 (no-op). The model + ;; must not treat it as a deletion. A subsequent read showing the + ;; absence of that member is correct. + (let [history [{:type :invoke :process 0 :f :zrem :value "ghost" :index 0} + {:type :ok :process 0 :f :zrem :value ["ghost" false] :index 1} + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 2} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 3} + {:type :invoke :process 0 :f :zrange-all :index 4} + {:type :ok :process 0 :f :zrange-all :value [["m1" 1.0]] :index 5}] + result (run-checker history)] + (is (:valid? result) (str "expected valid, got: " result)))) + +(deftest info-zincrby-allows-derived-score + ;; ZINCRBY whose response was lost (:info) still has a known delta. A + ;; read concurrent with such an op may observe the derived score. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" 5] :index 2} + {:type :invoke :process 0 :f :zrange-all :index 3} + {:type :ok :process 0 :f :zrange-all :value [["m1" 6.0]] :index 4} + {:type :info :process 1 :f :zincrby :value ["m1" 5] :index 5}] + result (run-checker history)] + (is (:valid? result) (str "expected valid, got: " result)))) + +(deftest score-mismatch-is-detected-when-no-uncertainty + ;; Sanity check: with all ops :ok and no concurrency, an obviously + ;; wrong observed score IS flagged. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 0 :f :zrange-all :index 2} + {:type :ok :process 0 :f :zrange-all :value [["m1" 999.0]] :index 3}] + result (run-checker history)] + (is (not (:valid? result)) (str "expected mismatch, got: " result)))) + +(deftest single-ok-concurrent-zincrby-still-validates-scores + ;; A single concurrent :ok ZINCRBY has a known return value. The read + ;; may observe either the pre-op score or the post-op score, but an + ;; arbitrary impossible score must still be flagged. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" 5] :index 2} + {:type :invoke :process 0 :f :zrange-all :index 3} + ;; Read observes 999.0 — not 1.0 (pre) or 6.0 (post). + {:type :ok :process 0 :f :zrange-all + :value [["m1" 999.0]] :index 4} + ;; ZINCRBY eventually completes :ok with known score 6. + {:type :ok :process 1 :f :zincrby :value ["m1" 6.0] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected score-mismatch to still fire, got: " result)) + (is (contains? kinds :score-mismatch) + (str "expected :score-mismatch, got kinds=" kinds)))) + +(deftest two-concurrent-zincrbys-accept-reachable-prefix + ;; Prefix-sum ordering matters: with two concurrent ZINCRBYs, the + ;; intermediate score (pre + one delta) is reachable and must be in + ;; the enumerated score set. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" 2] :index 2} + {:type :invoke :process 2 :f :zincrby :value ["m1" 3] :index 3} + {:type :invoke :process 0 :f :zrange-all :index 4} + ;; Intermediate 3.0 = 1 + 2 (before +3 applied). + {:type :ok :process 0 :f :zrange-all + :value [["m1" 3.0]] :index 5} + {:type :ok :process 1 :f :zincrby :value ["m1" 3.0] :index 6} + {:type :ok :process 2 :f :zincrby :value ["m1" 6.0] :index 7}] + result (run-checker history)] + (is (:valid? result) + (str "expected reachable prefix for >=2 concurrent ZINCRBYs, got: " result)))) + +(deftest no-op-zrem-alone-does-not-false-positive + ;; CI-observed false positive: a member whose only prior ops are no-op + ;; ZREMs was classified as :score-mismatch with :allowed #{} instead + ;; of treated as never-existed (:phantom candidate, empty read -> OK). + ;; A read that observes NO such member must be accepted as valid. + (let [history [{:type :invoke :process 0 :f :zrem :value "never-added" :index 0} + {:type :invoke :process 1 :f :zrange-all :index 1} + {:type :ok :process 1 :f :zrange-all :value [] :index 2} + {:type :ok :process 0 :f :zrem :value ["never-added" false] :index 3}] + result (run-checker history)] + (is (:valid? result) (str "expected valid, got: " result)))) + +(deftest no-op-zrem-does-not-admit-impossible-absence + ;; If ZADD and ZREM overlap, a ZREM returning 0 constrains the ZREM to + ;; have observed the member absent. With a later-overlapping ZADD that + ;; completes before the read, the final state must be present; the + ;; no-op ZREM must not count as a possible deletion. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :invoke :process 1 :f :zrem :value "m1" :index 1} + {:type :ok :process 1 :f :zrem :value ["m1" false] :index 2} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 3} + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all :value [] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected no-op ZREM not to admit absence, got: " result)) + (is (contains? kinds :missing-member) + (str "expected :missing-member, got kinds=" kinds)))) + +(deftest no-op-zrem-after-present-member-is-impossible + ;; ZREM returning 0 is only compatible with the member being absent at + ;; the ZREM linearization point. If a prior ZADD definitely made the + ;; member present before ZREM was invoked, a false ZREM reply is an + ;; impossible successful mutation chain, not permission to keep the + ;; present state. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 0 :f :zrem :value "m1" :index 2} + {:type :ok :process 0 :f :zrem :value ["m1" false] :index 3} + {:type :invoke :process 1 :f :zrange-all :index 4} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 1.0]] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected impossible false ZREM to fail, got: " result)) + (is (contains? kinds :impossible-mutation-chain) + (str "expected :impossible-mutation-chain, got kinds=" kinds)))) + +(deftest duplicate-members-are-flagged + ;; ZRANGE must not return the same member twice. + ;; With a hypothetical committed + concurrent score for the same + ;; member, a duplicate could sneak past sort + score-membership + ;; checks. Enforce distinctness explicitly. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 0 :f :zrange-all :index 2} + {:type :ok :process 0 :f :zrange-all + :value [["m1" 1.0] ["m1" 1.0]] :index 3}] + result (run-checker history)] + (is (not (:valid? result)) (str "expected duplicate-members error, got: " result)))) + +(deftest overlapping-committed-zadds-allow-either-score + ;; Two :ok ZADDs for the same member whose + ;; invoke/complete windows overlap have ambiguous serialization + ;; order. Either's resulting score is a valid post-state; the checker + ;; must not pin to the higher :complete-idx value only. + ;; Timeline (overlap between A's [invoke=0, complete=3] and + ;; B's [invoke=1, complete=2]): + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 5] :index 0} + {:type :invoke :process 1 :f :zadd :value ["m1" 9] :index 1} + {:type :ok :process 1 :f :zadd :value ["m1" 9] :index 2} + {:type :ok :process 0 :f :zadd :value ["m1" 5] :index 3} + ;; Post-commit: either 5 or 9 is a valid final score. + ;; A read observing 5 must NOT be flagged as mismatch. + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [["m1" 5.0]] :index 5}] + result (run-checker history)] + (is (:valid? result) + (str "expected valid under overlapping-writes relaxation, got: " result)))) + +(deftest overlapping-base-zadds-are-enumerated-before-tail-zincrby + ;; Base mutations before a later tail candidate can still overlap each + ;; other. The checker must enumerate their real-time-consistent orders + ;; before applying the non-concurrent ZINCRBY tail; sorting the base by + ;; completion time alone would fix the base at score 1 and reject the + ;; valid final score 3. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :invoke :process 1 :f :zadd :value ["m1" 2] :index 1} + {:type :ok :process 1 :f :zadd :value ["m1" 2] :index 2} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 3} + {:type :invoke :process 2 :f :zincrby :value ["m1" 1] :index 4} + {:type :ok :process 2 :f :zincrby :value ["m1" 3.0] :index 5} + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 3.0]] :index 7}] + result (run-checker history)] + (is (:valid? result) + (str "expected base ZADD order score 2 -> ZINCRBY +1 to be valid, got: " + result)))) + +(deftest info-before-read-is-considered-uncertain + ;; An :info mutation that completed before a + ;; later read may have taken effect. It must be considered a possible + ;; source of state for that read, rather than being ignored by both + ;; model-before and the concurrent window. + (let [history [;; Add m1 with score 1. + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ZINCRBY m1 by 5 -- response lost, recorded :info. + {:type :invoke :process 1 :f :zincrby :value ["m1" 5] :index 2} + {:type :info :process 1 :f :zincrby :value ["m1" 5] :index 3} + ;; Later read observes m1 at score 6 (increment applied + ;; server-side before the response was lost). Valid. + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [["m1" 6.0]] :index 5}] + result (run-checker history)] + (is (:valid? result) + (str "expected :info-before-read derived score to be valid, got: " result)))) + +(deftest pre-read-info-zincrby-can-feed-later-ok-zincrby + ;; A pre-read :info ZINCRBY that completed before a later :ok ZINCRBY + ;; may be required to make the later ok reply consistent. Relative + ;; increments do not supersede prior uncertain increments. + (let [history [{:type :invoke :process 0 :f :zincrby :value ["m1" 5] :index 0} + {:type :info :process 0 :f :zincrby :value ["m1" 5] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" 1] :index 2} + {:type :ok :process 1 :f :zincrby :value ["m1" 6.0] :index 3} + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [["m1" 6.0]] :index 5}] + result (run-checker history)] + (is (:valid? result) + (str "expected earlier :info increment to remain admissible, got: " + result)))) + +(deftest pre-read-info-zincrby-superseded-by-later-zadd + ;; A pre-read :info ZINCRBY is uncertainty only until a later committed + ;; state-changing op strictly follows it before the read. The later ZADD + ;; overwrites any possible increment outcome, so an arbitrary score must + ;; be rejected. + (let [history [{:type :invoke :process 0 :f :zincrby :value ["m1" 5] :index 0} + {:type :info :process 0 :f :zincrby :value ["m1" 5] :index 1} + {:type :invoke :process 1 :f :zadd :value ["m1" 2] :index 2} + {:type :ok :process 1 :f :zadd :value ["m1" 2] :index 3} + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [["m1" 42.0]] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected superseded :info ZINCRBY not to admit score, got: " result)) + (is (contains? kinds :score-mismatch) + (str "expected :score-mismatch, got kinds=" kinds)))) + +(deftest pre-read-info-zrem-superseded-by-later-zadd + ;; Same supersession rule for presence: an :info ZREM completed before a + ;; later committed ZADD cannot make a post-ZADD read's absence valid. + (let [history [{:type :invoke :process 0 :f :zrem :value "m1" :index 0} + {:type :info :process 0 :f :zrem :value "m1" :index 1} + {:type :invoke :process 1 :f :zadd :value ["m1" 1] :index 2} + {:type :ok :process 1 :f :zadd :value ["m1" 1] :index 3} + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all :value [] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected superseded :info ZREM not to admit absence, got: " result)) + (is (contains? kinds :missing-member) + (str "expected :missing-member, got kinds=" kinds)))) + +;; --------------------------------------------------------------------------- +;; Stale-read / phantom / superseded-committed checks +;; --------------------------------------------------------------------------- + +(deftest phantom-member-is-flagged + ;; A read that observes a member which was never added + ;; (no ZADD/ZINCRBY/true-ZREM anywhere) must be rejected. + (let [history [{:type :invoke :process 0 :f :zrange-all :index 0} + {:type :ok :process 0 :f :zrange-all + :value [["never-added" 42.0]] :index 1}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) (str "expected phantom error, got: " result)) + (is (contains? kinds :unexpected-presence) + (str "expected :unexpected-presence, got kinds=" kinds)))) + +(deftest phantom-from-info-zrem-still-flagged + ;; An :info ZREM is the ONLY history contact + ;; with a member (no ZADD/ZINCRBY ever). Because completed-mutation- + ;; window defaults :removed? to true on :info ZREMs (for uncertainty + ;; accounting), the checker must NOT treat ZREM as proof the member + ;; ever existed. A read observing the member present must be flagged + ;; as :unexpected-presence. Since setup! clears the key at test + ;; start, every observed member must trace back to a successful (or + ;; in-flight) ZADD/ZINCRBY -- never to a ZREM. + (let [history [;; ZREM of a member that was never added. Invoked + ;; concurrently with the read, response eventually + ;; lost (:info). No ZADD/ZINCRBY anywhere in history. + {:type :invoke :process 0 :f :zrem :value "phantom" :index 0} + {:type :invoke :process 1 :f :zrange-all :index 1} + ;; Read observes the phantom present at some score. + {:type :ok :process 1 :f :zrange-all + :value [["phantom" 7.0]] :index 2} + {:type :info :process 0 :f :zrem :value "phantom" :index 3}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected :unexpected-presence for phantom, got: " result)) + (is (contains? kinds :unexpected-presence) + (str "expected :unexpected-presence, got kinds=" kinds)))) + +(deftest stale-read-after-committed-zrem-is-flagged + ;; Once a ZADD and a subsequent real (:removed? true) ZREM + ;; have BOTH committed (with no concurrent re-add), a later read that + ;; still sees the member must be rejected as a stale read. + (let [history [;; Add then remove m1 — both committed before any read. + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 0 :f :zrem :value "m1" :index 2} + {:type :ok :process 0 :f :zrem :value ["m1" true] :index 3} + ;; Stale read: m1 somehow still appears. + {:type :invoke :process 1 :f :zrange-all :index 4} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 1.0]] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) (str "expected stale-read error, got: " result)) + (is (contains? kinds :unexpected-presence) + (str "expected :unexpected-presence, got kinds=" kinds)))) + +(deftest superseded-committed-score-is-not-allowed + ;; A ZADD committed BEFORE another ZADD for the same + ;; member whose invoke strictly followed it should not be treated as + ;; a valid post-state score. Only the latest committed score (plus + ;; concurrent in-flight) may be observed. + (let [history [;; ZADD m1 1 commits first ... + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ... then ZADD m1 2 is invoked strictly after, and + ;; also commits before the read. + {:type :invoke :process 0 :f :zadd :value ["m1" 2] :index 2} + {:type :ok :process 0 :f :zadd :value ["m1" 2] :index 3} + ;; Read observing the SUPERSEDED score 1.0 — invalid. + {:type :invoke :process 1 :f :zrange-all :index 4} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 1.0]] :index 5}] + result (run-checker history)] + (is (not (:valid? result)) + (str "expected superseded-score mismatch, got: " result)))) + +;; --------------------------------------------------------------------------- +;; Infinity score parsing +;; --------------------------------------------------------------------------- + +;; --------------------------------------------------------------------------- +;; Linearization of concurrent ops / uncertain mutations +;; --------------------------------------------------------------------------- + +(deftest true-zrem-constrains-overlapping-zadd-order + ;; ZADD and ZREM for the same member whose invoke/complete + ;; windows overlap (both commit before the read) are not enough to admit + ;; either final state. ZREM returning true proves it observed the member + ;; present, so with an initially absent member it must serialize after + ;; the ZADD and the final state must be absent. + ;; Windows: ZADD=[inv=0, cmp=3], ZREM=[inv=1, cmp=2] — overlap. + (let [base [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :invoke :process 1 :f :zrem :value "m1" :index 1} + {:type :ok :process 1 :f :zrem :value ["m1" true] :index 2} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 3}] + hist-present (conj base + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [["m1" 1.0]] :index 5}) + hist-absent (conj base + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [] :index 5})] + (is (not (:valid? (run-checker hist-present))) + "expected read observing ZADD's outcome to be rejected") + (is (:valid? (run-checker hist-absent)) + "expected read observing ZREM's outcome (absent) to be accepted"))) + +(deftest info-zrem-concurrent-with-read-allows-missing-member + ;; An :info ZREM that might have applied before a read + ;; leaves the member's presence uncertain. A ZRANGE that omits the + ;; member must NOT be flagged as a completeness failure. + (let [history [;; ZADD m1 committed before the read. + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ZREM m1 is invoked, then the read runs, then the + ;; ZREM response is lost (:info). The ZREM may or may + ;; not have applied server-side. + {:type :invoke :process 1 :f :zrem :value "m1" :index 2} + {:type :invoke :process 0 :f :zrange-all :index 3} + {:type :ok :process 0 :f :zrange-all :value [] :index 4} + {:type :info :process 1 :f :zrem :value "m1" :index 5}] + result (run-checker history)] + (is (:valid? result) + (str "expected :info ZREM to make absence acceptable, got: " result)))) + +(deftest info-zincrby-does-not-flag-zrangebyscore-completeness + ;; A pre-read :info / concurrent ZINCRBY leaves the + ;; resulting score unknown. ZRANGEBYSCORE filtering on a specific + ;; range must not flag the member as missing, because its score may + ;; have moved outside [lo, hi]. + (let [history [;; ZADD m1 at score 1 (committed well before read). + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ZINCRBY m1 +100 — response lost (:info) BEFORE read. + {:type :invoke :process 1 :f :zincrby :value ["m1" 100] :index 2} + {:type :info :process 1 :f :zincrby :value ["m1" 100] :index 3} + ;; ZRANGEBYSCORE [0, 10] — m1's score is uncertain; it + ;; may now be 101 (outside range) or still 1. The + ;; checker must not complain about m1's absence. + {:type :invoke :process 2 :f :zrangebyscore :value [0.0 10.0] :index 4} + {:type :ok :process 2 :f :zrangebyscore + :value {:bounds [0.0 10.0] :members []} :index 5}] + result (run-checker history)] + (is (:valid? result) + (str "expected :info ZINCRBY to skip completeness, got: " result)))) + +(deftest zrangebyscore-completeness-still-detects-truly-missing-member + ;; Sanity: when NO uncertainty exists and a model member's committed + ;; score is definitively inside [lo, hi], its absence IS flagged. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 5] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 5] :index 1} + {:type :invoke :process 0 :f :zrangebyscore :value [0.0 10.0] :index 2} + {:type :ok :process 0 :f :zrangebyscore + :value {:bounds [0.0 10.0] :members []} :index 3}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) (str "expected missing-member-range, got: " result)) + (is (contains? kinds :missing-member-range) + (str "expected :missing-member-range, got kinds=" kinds)))) + +(deftest missing-member-range-error-reports-full-allowed-score-set + ;; When a member is missing from ZRANGEBYSCORE and multiple + ;; concurrent writers make several scores admissible, the error map + ;; must surface the FULL admissible set under :allowed (matching + ;; :score-mismatch-range convention) rather than pick an arbitrary + ;; single :expected-score. + (let [history [;; Two concurrent ZADDs for m1, both committed before + ;; the read. Either score (5 or 6) is admissible, both + ;; fall inside [0, 10]. + {:type :invoke :process 0 :f :zadd :value ["m1" 5] :index 0} + {:type :invoke :process 1 :f :zadd :value ["m1" 6] :index 1} + {:type :ok :process 0 :f :zadd :value ["m1" 5] :index 2} + {:type :ok :process 1 :f :zadd :value ["m1" 6] :index 3} + ;; Read sees nothing -- m1 must appear under any + ;; admissible linearization, so :missing-member-range + ;; fires. + {:type :invoke :process 2 :f :zrangebyscore :value [0.0 10.0] :index 4} + {:type :ok :process 2 :f :zrangebyscore + :value {:bounds [0.0 10.0] :members []} :index 5}] + result (run-checker history) + miss (first (filter #(= :missing-member-range (:kind %)) + (:first-errors result)))] + (is (not (:valid? result))) + (is (some? miss) + (str "expected a :missing-member-range error, got: " (:first-errors result))) + (is (contains? miss :allowed) + (str "error map must include :allowed, got: " miss)) + (is (= #{5.0 6.0} (set (:allowed miss))) + (str "expected :allowed to contain both admissible scores, got: " miss)) + ;; :expected-score is retained for backcompat but MUST be nil when + ;; there is more than one admissible score, to avoid misleading + ;; consumers that read it. + (is (nil? (:expected-score miss)) + (str "expected :expected-score nil for multi-score set, got: " miss)))) + +(deftest missing-member-range-error-keeps-expected-score-when-single + ;; Backcompat: when the admissible set has exactly one score, + ;; :expected-score matches it. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 5] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 5] :index 1} + {:type :invoke :process 0 :f :zrangebyscore :value [0.0 10.0] :index 2} + {:type :ok :process 0 :f :zrangebyscore + :value {:bounds [0.0 10.0] :members []} :index 3}] + result (run-checker history) + miss (first (filter #(= :missing-member-range (:kind %)) + (:first-errors result)))] + (is (some? miss)) + (is (= #{5.0} (set (:allowed miss)))) + (is (= 5.0 (:expected-score miss))))) + +(deftest zrange-completeness-still-detects-truly-missing-member + ;; Sanity: no uncertainty, member committed-present. Absence flagged. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 5] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 5] :index 1} + {:type :invoke :process 0 :f :zrange-all :index 2} + {:type :ok :process 0 :f :zrange-all :value [] :index 3}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) (str "expected missing-member, got: " result)) + (is (contains? kinds :missing-member) + (str "expected :missing-member, got kinds=" kinds)))) + +;; --------------------------------------------------------------------------- +;; Failed-concurrent mutations must not contribute to uncertainty +;; --------------------------------------------------------------------------- + +(deftest failed-concurrent-zrem-does-not-relax-must-be-present + ;; A concurrent ZREM that completes with :fail did NOT take + ;; effect. Its window must NOT make the member's presence uncertain, + ;; so a read that omits the member (which was ZADDed and committed + ;; beforehand) must be flagged as :missing-member. + (let [history [;; ZADD m1 committed before the read. + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ZREM m1 is invoked concurrently with the read but + ;; ultimately :fails -- the op definitively did NOT run. + {:type :invoke :process 1 :f :zrem :value "m1" :index 2} + {:type :invoke :process 0 :f :zrange-all :index 3} + ;; Read observes m1 ABSENT -- without the fix, the + ;; failed ZREM would admit this as "possibly removed". + {:type :ok :process 0 :f :zrange-all :value [] :index 4} + {:type :fail :process 1 :f :zrem :value "m1" :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected :missing-member despite failed ZREM, got: " result)) + (is (contains? kinds :missing-member) + (str "expected :missing-member, got kinds=" kinds)))) + +(deftest failed-concurrent-zadd-does-not-contribute-allowed-score + ;; A concurrent ZADD that completes with :fail did NOT take + ;; effect. Its score must NOT be added to the allowed set. A read + ;; observing that score must be flagged as :score-mismatch rather than + ;; being waved through by the failed ZADD's ghost contribution. + (let [history [;; ZADD m1 at score 1 committed before the read. + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; Concurrent ZADD m1 at score 42 ultimately :fails. + {:type :invoke :process 1 :f :zadd :value ["m1" 42] :index 2} + {:type :invoke :process 0 :f :zrange-all :index 3} + ;; Read observes score 42 -- only valid if the failed + ;; ZADD is (incorrectly) admitted as a possible write. + {:type :ok :process 0 :f :zrange-all + :value [["m1" 42.0]] :index 4} + {:type :fail :process 1 :f :zadd :value ["m1" 42] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected :score-mismatch ignoring failed ZADD, got: " result)) + (is (contains? kinds :score-mismatch) + (str "expected :score-mismatch, got kinds=" kinds)))) + +;; --------------------------------------------------------------------------- +;; Chained committed ZINCRBYs: only the linearization-chain tail is a +;; valid final score. Earlier intermediate return values are stale. +;; --------------------------------------------------------------------------- + +(deftest chained-committed-zincrby-rejects-stale-intermediate + ;; Sequential committed ZINCRBYs form a forced linearization + ;; chain. The first ZINCRBY's return value is an intermediate that no + ;; post-chain read may observe. Expect :score-mismatch on the stale + ;; intermediate. + (let [history [;; Start with score 1. + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ZINCRBY +2 -> ok=3 (committed). + {:type :invoke :process 0 :f :zincrby :value ["m1" 2] :index 2} + {:type :ok :process 0 :f :zincrby :value ["m1" 3.0] :index 3} + ;; ZINCRBY +3 -> ok=6 (committed). Strictly follows the + ;; previous ZINCRBY in real time (invoke 4 > complete 3). + {:type :invoke :process 0 :f :zincrby :value ["m1" 3] :index 4} + {:type :ok :process 0 :f :zincrby :value ["m1" 6.0] :index 5} + ;; Read AFTER the whole chain observes the stale + ;; intermediate 3.0 -- not admissible under any + ;; linearization. + {:type :invoke :process 1 :f :zrange-all :index 6} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 3.0]] :index 7}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected stale-intermediate to be flagged, got: " result)) + (is (contains? kinds :score-mismatch) + (str "expected :score-mismatch, got kinds=" kinds)))) + +(deftest chained-committed-zincrby-accepts-latest + ;; Same history but the read observes the LATEST chain tail + ;; (6.0) -- accept as valid. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 0 :f :zincrby :value ["m1" 2] :index 2} + {:type :ok :process 0 :f :zincrby :value ["m1" 3.0] :index 3} + {:type :invoke :process 0 :f :zincrby :value ["m1" 3] :index 4} + {:type :ok :process 0 :f :zincrby :value ["m1" 6.0] :index 5} + {:type :invoke :process 1 :f :zrange-all :index 6} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 6.0]] :index 7}] + result (run-checker history)] + (is (:valid? result) + (str "expected chain-tail score to be accepted, got: " result)))) + +(deftest committed-zincrby-return-values-constrain-final-score + ;; Two overlapping-in-real-time ZINCRBYs can both be candidates, but + ;; their returned scores still constrain the serialization. Here B + ;; returns 4.0 and A returns 6.0; A's result proves B was an + ;; intermediate prefix, so a post-completion read of 4.0 is stale while + ;; 6.0 is valid. + ;; Overlap: A=[inv=2, cmp=5], B=[inv=3, cmp=4]. + (let [base [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" 2] :index 2} + {:type :invoke :process 2 :f :zincrby :value ["m1" 3] :index 3} + ;; B completes first with ok=4 (delta applied to score 1). + {:type :ok :process 2 :f :zincrby :value ["m1" 4.0] :index 4} + ;; A completes with ok=6 (delta applied after B). + {:type :ok :process 1 :f :zincrby :value ["m1" 6.0] :index 5}] + read-a (conj base + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 4.0]] :index 7}) + read-b (conj base + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 6.0]] :index 7})] + (is (not (:valid? (run-checker read-a))) + "expected B's intermediate return value (4.0) to be rejected") + (is (:valid? (run-checker read-b)) + "expected A's final return value (6.0) to be accepted"))) + +(deftest zadd-resets-zincrby-chain + ;; A committed ZADD between ZINCRBYs resets the chain -- + ;; subsequent ZINCRBYs operate on the new ZADD'd value. The pre-reset + ;; ZINCRBY score is NOT a valid read after the chain completes. + (let [base [;; ZADD m1 1 + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ZINCRBY +2 -> 3 + {:type :invoke :process 0 :f :zincrby :value ["m1" 2] :index 2} + {:type :ok :process 0 :f :zincrby :value ["m1" 3.0] :index 3} + ;; ZADD m1 10 -- chain reset to absolute value. + {:type :invoke :process 0 :f :zadd :value ["m1" 10] :index 4} + {:type :ok :process 0 :f :zadd :value ["m1" 10] :index 5} + ;; ZINCRBY +1 -> 11 + {:type :invoke :process 0 :f :zincrby :value ["m1" 1] :index 6} + {:type :ok :process 0 :f :zincrby :value ["m1" 11.0] :index 7}] + read-ok (conj base + {:type :invoke :process 1 :f :zrange-all :index 8} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 11.0]] :index 9}) + read-bad (conj base + {:type :invoke :process 1 :f :zrange-all :index 8} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 3.0]] :index 9})] + (is (:valid? (run-checker read-ok)) + "expected post-reset chain tail (11.0) to be accepted") + (is (not (:valid? (run-checker read-bad))) + "expected pre-reset intermediate (3.0) to be flagged"))) + +(deftest overlapping-zadd-and-zincrby-respect-return-value-order + ;; ZINCRBY +1 returning 1.0 proves it ran from an absent/zero score. If + ;; an overlapping ZADD 10.0 also commits before the read, the only valid + ;; order is ZINCRBY then ZADD, so the final readable score is 10.0. + (let [base [{:type :invoke :process 0 :f :zincrby :value ["m1" 1] :index 0} + {:type :invoke :process 1 :f :zadd :value ["m1" 10] :index 1} + {:type :ok :process 0 :f :zincrby :value ["m1" 1.0] :index 2} + {:type :ok :process 1 :f :zadd :value ["m1" 10] :index 3}] + read-zadd (conj base + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [["m1" 10.0]] :index 5}) + read-incr (conj base + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [["m1" 1.0]] :index 5})] + (is (:valid? (run-checker read-zadd)) + "expected ZADD's final score to be accepted") + (is (not (:valid? (run-checker read-incr))) + "expected ZINCRBY's pre-ZADD return to be rejected"))) + +(deftest zincrby-return-value-must-match-prior-state + ;; A non-concurrent ZINCRBY's ok reply must equal prior-score + delta. + ;; The checker must not trust an impossible reply as the member's state. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 0 :f :zincrby :value ["m1" 5] :index 2} + {:type :ok :process 0 :f :zincrby :value ["m1" 999.0] :index 3} + {:type :invoke :process 1 :f :zrange-all :index 4} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 999.0]] :index 5}] + result (run-checker history)] + (is (not (:valid? result)) + (str "expected impossible ZINCRBY reply to be rejected, got: " result)))) + +(deftest impossible-mutation-chain-fails-even-when-read-is-empty + ;; Empty state sets from successful mutation replies are checker + ;; failures, not absent members. Otherwise a bad ZINCRBY reply can be + ;; dropped from the model and an empty read can falsely pass. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 0 :f :zincrby :value ["m1" 5] :index 2} + {:type :ok :process 0 :f :zincrby :value ["m1" 999.0] :index 3} + {:type :invoke :process 1 :f :zrange-all :index 4} + {:type :ok :process 1 :f :zrange-all :value [] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected impossible chain to fail even on empty read, got: " + result)) + (is (contains? kinds :impossible-mutation-chain) + (str "expected :impossible-mutation-chain, got kinds=" kinds)))) + +(deftest negative-zincrby-tail-remains-admissible + ;; Negative deltas can make a later valid tail numerically lower than an + ;; earlier return. Pairwise score pruning must not discard that final tail. + (let [base [{:type :invoke :process 0 :f :zadd :value ["m1" 5] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 5] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" -2] :index 2} + {:type :invoke :process 2 :f :zincrby :value ["m1" -3] :index 3} + {:type :ok :process 1 :f :zincrby :value ["m1" 3.0] :index 4} + {:type :ok :process 2 :f :zincrby :value ["m1" 0.0] :index 5}] + read-tail (conj base + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 0.0]] :index 7}) + read-prefix (conj base + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 3.0]] :index 7})] + (is (:valid? (run-checker read-tail)) + "expected negative-delta final tail to be accepted") + (is (not (:valid? (run-checker read-prefix))) + "expected intermediate negative-delta return to be rejected"))) + +(deftest pre-read-info-zincrby-does-not-admit-arbitrary-score + ;; A completed-before-read :info ZINCRBY may have happened or not, but + ;; its delta still bounds the possible scores when no later uncertainty + ;; exists. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" 5] :index 2} + {:type :info :process 1 :f :zincrby :value ["m1" 5] :index 3} + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [["m1" 42.0]] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected arbitrary pre-read :info ZINCRBY score rejected, got: " result)) + (is (contains? kinds :score-mismatch) + (str "expected :score-mismatch, got kinds=" kinds)))) + +;; --------------------------------------------------------------------------- +;; ZINCRBY return values pin the linearization. Concurrent :ok ZINCRBYs +;; add only the scores reachable under a real-time-consistent order. +;; --------------------------------------------------------------------------- + +(deftest two-ok-concurrent-zincrbys-reject-impossible-score + ;; Two overlapping :ok ZINCRBYs with known return values + ;; (3 and 6) constrain the admissible post-chain read set to {1,3,6}. + ;; A read of 999 is impossible under any linearization; the checker + ;; must flag it as :score-mismatch. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; Two concurrent ZINCRBYs. Windows overlap the read. + {:type :invoke :process 1 :f :zincrby :value ["m1" 2] :index 2} + {:type :invoke :process 2 :f :zincrby :value ["m1" 3] :index 3} + {:type :ok :process 1 :f :zincrby :value ["m1" 3.0] :index 4} + {:type :ok :process 2 :f :zincrby :value ["m1" 6.0] :index 5} + ;; Read observes an impossible score. + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 999.0]] :index 7}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected impossible score to be flagged, got: " result)) + (is (contains? kinds :score-mismatch) + (str "expected :score-mismatch, got kinds=" kinds)))) + +(deftest two-ok-concurrent-zincrbys-reject-superseded-return + ;; Same concurrent :ok ZINCRBY history, but both ops complete before the + ;; read. The returned scores identify 3.0 as an intermediate prefix and + ;; 6.0 as the chain tail. + (let [base [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" 2] :index 2} + {:type :invoke :process 2 :f :zincrby :value ["m1" 3] :index 3} + {:type :ok :process 1 :f :zincrby :value ["m1" 3.0] :index 4} + {:type :ok :process 2 :f :zincrby :value ["m1" 6.0] :index 5}] + read-6 (conj base + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 6.0]] :index 7}) + read-3 (conj base + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 3.0]] :index 7})] + (is (:valid? (run-checker read-6)) + "expected 6.0 (one linearization) to be accepted") + (is (not (:valid? (run-checker read-3))) + "expected 3.0 intermediate return value to be rejected"))) + +(deftest overlapping-base-mutation-is-not-forced-before-tail-candidates + ;; The first ZINCRBY completes before the read and before the second + ;; increment starts, but it still overlaps the earlier ZADD. It must stay in + ;; the same real-time enumeration as that ZADD; forcing it into a fixed base + ;; prefix would apply it to the absent state and reject this valid history. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :invoke :process 1 :f :zincrby :value ["m1" 1] :index 1} + {:type :ok :process 1 :f :zincrby :value ["m1" 2.0] :index 2} + {:type :invoke :process 2 :f :zincrby :value ["m1" 1] :index 3} + {:type :ok :process 2 :f :zincrby :value ["m1" 3.0] :index 4} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 5} + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 3.0]] :index 7}]] + (is (:valid? (run-checker history)) + "expected overlapping base mutation to remain reorderable with the tail"))) + +(deftest later-read-cannot-switch-overlapping-write-order + ;; Once all ambiguous writes completed before the first read, later reads + ;; cannot independently choose a different serialization unless another + ;; mutation could have taken effect between the reads. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :invoke :process 1 :f :zadd :value ["m1" 2] :index 1} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 2} + {:type :ok :process 1 :f :zadd :value ["m1" 2] :index 3} + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [["m1" 2.0]] :index 5} + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 1.0]] :index 7}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected read order switch to be rejected, got: " result)) + (is (contains? kinds :unstable-read-without-mutation) + (str "expected :unstable-read-without-mutation, got kinds=" kinds)))) + +(deftest zrange-all-uses-one-prefix-across-members + ;; Seeing m1's concurrent ZADD forces the read prefix past any successful + ;; mutation that completed before that ZADD was invoked. Omitting m2 would + ;; combine two different prefixes in one full snapshot. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m2" 2] :index 0} + {:type :invoke :process 1 :f :zrange-all :index 1} + {:type :ok :process 0 :f :zadd :value ["m2" 2] :index 2} + {:type :invoke :process 2 :f :zadd :value ["m1" 1] :index 3} + {:type :ok :process 2 :f :zadd :value ["m1" 1] :index 4} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 1.0]] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected fractured full read to be rejected, got: " result)) + (is (contains? kinds :fractured-read-prefix) + (str "expected :fractured-read-prefix, got kinds=" kinds)))) + +(deftest zrangebyscore-uses-one-prefix-across-members + ;; The same prefix rule applies to range reads when the omitted predecessor + ;; is forced to be present inside the requested score bounds. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m2" 2] :index 0} + {:type :invoke :process 1 :f :zrangebyscore + :value [0.0 10.0] :index 1} + {:type :ok :process 0 :f :zadd :value ["m2" 2] :index 2} + {:type :invoke :process 2 :f :zadd :value ["m1" 1] :index 3} + {:type :ok :process 2 :f :zadd :value ["m1" 1] :index 4} + {:type :ok :process 1 :f :zrangebyscore + :value {:bounds [0.0 10.0] + :members [["m1" 1.0]]} :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected fractured range read to be rejected, got: " result)) + (is (contains? kinds :fractured-read-prefix-range) + (str "expected :fractured-read-prefix-range, got kinds=" kinds)))) + +(deftest zrangebyscore-prefix-check-ignores-predecessor-outside-bounds + ;; A visible concurrent write still anchors the read prefix, but a + ;; predecessor whose forced state is outside the requested score bounds + ;; does not have to appear in the range result. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m2" -100] :index 0} + {:type :invoke :process 1 :f :zrangebyscore + :value [0.0 10.0] :index 1} + {:type :ok :process 0 :f :zadd :value ["m2" -100] :index 2} + {:type :invoke :process 2 :f :zadd :value ["m1" 1] :index 3} + {:type :ok :process 2 :f :zadd :value ["m1" 1] :index 4} + {:type :ok :process 1 :f :zrangebyscore + :value {:bounds [0.0 10.0] + :members [["m1" 1.0]]} :index 5}]] + (is (:valid? (run-checker history))))) + +(deftest zrem-omission-anchors-prefix-check + ;; If m1's absence is only explainable by a concurrent ZREM, that visible + ;; deletion still anchors the read prefix. A predecessor completed before + ;; the ZREM was invoked must be visible in the same full read. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zrange-all :index 2} + {:type :invoke :process 2 :f :zadd :value ["m2" 2] :index 3} + {:type :ok :process 2 :f :zadd :value ["m2" 2] :index 4} + {:type :invoke :process 3 :f :zrem :value "m1" :index 5} + {:type :ok :process 1 :f :zrange-all + :value [] :index 6} + {:type :ok :process 3 :f :zrem + :value ["m1" true] :index 7}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected ZREM-anchored fractured read to be rejected, got: " result)) + (is (contains? kinds :fractured-read-prefix) + (str "expected :fractured-read-prefix, got kinds=" kinds)))) + +(deftest info-zincrby-visible-score-anchors-prefix-check + ;; A response-lost ZINCRBY has a known delta. If the observed score is + ;; reachable only by applying that :info op, it anchors the prefix just like + ;; a visible concurrent ZADD. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zrange-all :index 2} + {:type :invoke :process 2 :f :zadd :value ["m2" 2] :index 3} + {:type :ok :process 2 :f :zadd :value ["m2" 2] :index 4} + {:type :invoke :process 3 :f :zincrby :value ["m1" 5] :index 5} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 6.0]] :index 6} + {:type :info :process 3 :f :zincrby + :value ["m1" 5] :error "conn reset" :index 7}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected :info ZINCRBY-anchored fractured read, got: " result)) + (is (contains? kinds :fractured-read-prefix) + (str "expected :fractured-read-prefix, got kinds=" kinds)))) + +(deftest forced-predecessor-state-is-validated-when-present + ;; Seeing m1's concurrent ZADD forces the read prefix past m2's later ZADD. + ;; Including m2 at its older score is still fractured; presence alone is not + ;; sufficient. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m2" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m2" 1] :index 1} + {:type :invoke :process 1 :f :zrange-all :index 2} + {:type :invoke :process 2 :f :zadd :value ["m2" 2] :index 3} + {:type :ok :process 2 :f :zadd :value ["m2" 2] :index 4} + {:type :invoke :process 3 :f :zadd :value ["m1" 1] :index 5} + {:type :ok :process 3 :f :zadd :value ["m1" 1] :index 6} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 1.0] ["m2" 1.0]] :index 7}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected stale forced predecessor to be rejected, got: " result)) + (is (contains? kinds :fractured-read-prefix) + (str "expected :fractured-read-prefix, got kinds=" kinds)))) + +(deftest info-plus-ok-zincrby-stays-bounded + ;; A :info ZINCRBY still has a known delta. Once the surrounding state is + ;; known, it admits the pre-info state or the delta-applied state, not an + ;; arbitrary numeric score. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; One :info ZINCRBY (unknown outcome). + {:type :invoke :process 1 :f :zincrby :value ["m1" 2] :index 2} + ;; One :ok ZINCRBY with known return value. + {:type :invoke :process 2 :f :zincrby :value ["m1" 3] :index 3} + {:type :ok :process 2 :f :zincrby :value ["m1" 4.0] :index 4} + {:type :info :process 1 :f :zincrby :value ["m1" 2] + :error "conn reset" :index 5} + ;; Read observes an arbitrary score: invalid. The possible + ;; scores are 4.0 (only the :ok ZINCRBY) or 6.0 (both). + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 42.0]] :index 7}]] + (is (not (:valid? (run-checker history))) + "expected arbitrary score rejected when :info ZINCRBY is bounded"))) + +;; --------------------------------------------------------------------------- +;; Infinity score parsing +;; --------------------------------------------------------------------------- + +;; --------------------------------------------------------------------------- +;; Client setup! / invoke! robustness +;; --------------------------------------------------------------------------- + +(deftest setup-bang-hard-fails-when-conn-spec-missing + ;; If open! failed to populate :conn-spec (unresolvable + ;; host, etc.), setup! MUST throw rather than silently proceed. + ;; Continuing with a no-op setup would leave stale data from a prior + ;; run under zset-key and risk false-positive checker verdicts from + ;; that dirty state. We want Jepsen to surface the failure. + (let [client (workload/->ElastickvRedisZSetSafetyClient {} nil)] + (is (thrown-with-msg? clojure.lang.ExceptionInfo + #":conn-spec is missing" + (client/setup! client {})) + "setup! must throw ex-info when :conn-spec is nil"))) + +(deftest setup-bang-hard-fails-when-cleanup-del-errors + ;; Even when :conn-spec is populated, if the actual + ;; cleanup (DEL zset-key) fails or errors, setup! must NOT silently + ;; proceed. Stale data surviving from a prior run under zset-key + ;; would cause false-positive safety verdicts. Propagate the + ;; exception so Jepsen aborts the run. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "127.0.0.1" + :port 1 ; guaranteed unreachable + :timeout-ms 100}})] + (is (thrown? Throwable + (client/setup! client {})) + "setup! must propagate cleanup failures, not swallow them"))) + +(deftest zincrby-invoke-handles-nil-response + ;; If car/wcar for ZINCRBY returns nil (error reply + ;; coerced, unexpected protocol edge), the op must complete as :info + ;; with a structured :error, not throw NumberFormatException from + ;; parse-double-safe swallowing (str nil) -> "nil". + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zincrby :value ["m1" 1.0] :process 0 :index 0}] + (with-redefs [workload/zincrby! (fn [& _] nil)] + (let [result (client/invoke! client {} op)] + (is (= :info (:type result)) + (str "expected :info on nil ZINCRBY reply, got: " result)) + (is (some? (:error result)) + (str "expected :error to be populated, got: " result)))))) + +(deftest zincrby-invoke-handles-unexpected-response + ;; Same guard, but for a non-string / non-number reply. + ;; Must complete :info rather than propagate a parse failure. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zincrby :value ["m1" 1.0] :process 0 :index 0}] + (with-redefs [workload/zincrby! (fn [& _] {:unexpected :map})] + (let [result (client/invoke! client {} op)] + (is (= :info (:type result)) + (str "expected :info on unexpected ZINCRBY reply, got: " result)))))) + +(deftest zincrby-invoke-accepts-numeric-response + ;; Sanity: some Carmine versions coerce integer scores to longs. + ;; Must parse cleanly to a Double and complete :ok. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zincrby :value ["m1" 1.0] :process 0 :index 0}] + (with-redefs [workload/zincrby! (fn [& _] 7)] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result)) + (str "expected :ok on numeric reply, got: " result)) + (is (= ["m1" 7.0] (:value result))))))) + +(deftest zincrby-invoke-accepts-byte-array-response + ;; Carmine may surface Redis bulk-string scores as raw bytes depending + ;; on protocol/config. The client must parse them as UTF-8, not via + ;; `(str bytes)`, which yields "[B@...". + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zincrby :value ["m1" 1.0] :process 0 :index 0}] + (with-redefs [workload/zincrby! (fn [& _] (.getBytes "7.5" "UTF-8"))] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result)) + (str "expected :ok on byte-array reply, got: " result)) + (is (= ["m1" 7.5] (:value result))))))) + +;; --------------------------------------------------------------------------- +;; Vacuous-pass guard +;; --------------------------------------------------------------------------- + +(deftest empty-history-is-unknown-not-valid + ;; An empty history (e.g. Redis unreachable, all ops + ;; downgraded to :info) produces zero successful reads. The checker + ;; MUST NOT return :valid? true in that case -- that would be a + ;; false-green. Expect :valid? :unknown plus a diagnostic :reason. + (let [result (run-checker [])] + (is (= :unknown (:valid? result)) + (str "expected :unknown on empty history, got: " result)) + (is (string? (:reason result)) + (str "expected :reason to be populated, got: " result)) + (is (zero? (:reads result))))) + +(deftest all-info-history-is-unknown-not-valid + ;; A run where every operation was downgraded to :info + ;; (Redis unreachable / every read timed out) still has read-pairs + ;; filtered down to zero :ok reads. Must surface as :valid? :unknown. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :info :process 0 :f :zadd :value ["m1" 1] :index 1 + :error "conn refused"} + {:type :invoke :process 0 :f :zrange-all :index 2} + {:type :info :process 0 :f :zrange-all :index 3 + :error "conn refused"}] + result (run-checker history)] + (is (= :unknown (:valid? result)) + (str "expected :unknown when all ops are :info, got: " result)) + (is (string? (:reason result))))) + +(deftest one-successful-read-is-enough-to-validate + ;; Sanity: the vacuous-pass guard must only kick in when there are + ;; ZERO successful reads. A single :ok read with no errors is a + ;; legitimate :valid? true. + (let [history [{:type :invoke :process 0 :f :zrange-all :index 0} + {:type :ok :process 0 :f :zrange-all :value [] :index 1}] + result (run-checker history)] + (is (true? (:valid? result)) + (str "expected :valid? true with one :ok read, got: " result)))) + +(deftest malformed-zrange-read-is-checker-failure + ;; A read whose Redis command returned successfully but produced a + ;; malformed WITHSCORES payload is safety evidence, not a timeout. The + ;; checker must fail it instead of filtering it out as non-:ok. + (let [history [{:type :invoke :process 0 :f :zrange-all :index 0} + {:type :ok :process 0 :f :zrange-all + :value {:malformed? true + :error "WITHSCORES reply has odd element count" + :payload ["m1" "1" "dangling"]} + :index 1}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected malformed ZRANGE read to fail, got: " result)) + (is (contains? kinds :malformed-read) + (str "expected :malformed-read, got kinds=" kinds)))) + +(deftest malformed-zrangebyscore-read-is-checker-failure + (let [history [{:type :invoke :process 0 :f :zrangebyscore + :value [0.0 10.0] :index 0} + {:type :ok :process 0 :f :zrangebyscore + :value {:bounds [0.0 10.0] + :malformed? true + :error "WITHSCORES reply has odd element count" + :payload ["m1" "1" "dangling"]} + :index 1}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected malformed ZRANGEBYSCORE read to fail, got: " result)) + (is (contains? kinds :malformed-read-range) + (str "expected :malformed-read-range, got kinds=" kinds)))) + +(deftest zrem-invoke-handles-nil-response + ;; If car/wcar for ZREM returns nil (protocol edge, + ;; closed connection, etc.), `(long nil)` would throw NPE and the + ;; op would be logged as a generic failure via the general Exception + ;; handler. Guard with `(or removed 0)` so the op resolves cleanly + ;; as :ok [member false]. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zrem :value "ghost" :process 0 :index 0}] + (with-redefs [workload/zrem! (fn [& _] nil)] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result)) + (str "expected :ok on nil ZREM reply, got: " result)) + (is (= ["ghost" false] (:value result)) + (str "expected removed? false on nil reply, got: " result)))))) + +(deftest zrem-invoke-handles-numeric-response + ;; Sanity: ZREM's normal reply is an integer count. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zrem :value "m1" :process 0 :index 0}] + (with-redefs [workload/zrem! (fn [& _] 1)] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result))) + (is (= ["m1" true] (:value result))))))) + +(deftest zrem-invoke-handles-string-response + ;; Some Carmine versions / RESP3 codepaths surface ZREM's count as a + ;; numeric string rather than a Long. `(long \"1\")` would throw + ;; ClassCastException; the coerce-zrem-count helper must parse the + ;; string and the op must still resolve as :ok with removed? true. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zrem :value "m1" :process 0 :index 0}] + (with-redefs [workload/zrem! (fn [& _] "1")] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result)) + (str "expected :ok on string ZREM reply, got: " result)) + (is (= ["m1" true] (:value result)) + (str "expected removed? true on string \"1\", got: " result)))))) + +(deftest zrem-invoke-handles-string-zero-response + ;; String "0" must be parsed as removed? false (not truthy because it + ;; is a non-empty string). + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zrem :value "ghost" :process 0 :index 0}] + (with-redefs [workload/zrem! (fn [& _] "0")] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result))) + (is (= ["ghost" false] (:value result))))))) + +(deftest zrem-invoke-handles-bytes-response + ;; Raw-bytes numeric reply (RESP binary-safe path) must be decoded as + ;; UTF-8 and parsed. "1" => removed? true. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zrem :value "m1" :process 0 :index 0}] + (with-redefs [workload/zrem! (fn [& _] (.getBytes "1" "UTF-8"))] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result))) + (is (= ["m1" true] (:value result))))))) + +(deftest zrem-invoke-handles-unparseable-response + ;; Totally unexpected reply shape: treat as 0 (nothing removed) rather + ;; than throw. Keeps the op :ok and records removed? false. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zrem :value "ghost" :process 0 :index 0}] + (with-redefs [workload/zrem! (fn [& _] :weird)] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result))) + (is (= ["ghost" false] (:value result))))))) + +(deftest parse-withscores-handles-inf-strings + ;; Redis returns "inf" / "+inf" / "-inf" for infinite + ;; ZSET scores. Double/parseDouble expects "Infinity"; the workload's + ;; parser must normalize both encodings instead of throwing. + (let [flat ["m-pos" "inf" + "m-pos2" "+inf" + "m-neg" "-inf" + "m-jvm" "Infinity" + "m-num" "3.5"] + parsed (#'workload/parse-withscores flat)] + (is (= [["m-pos" Double/POSITIVE_INFINITY] + ["m-pos2" Double/POSITIVE_INFINITY] + ["m-neg" Double/NEGATIVE_INFINITY] + ["m-jvm" Double/POSITIVE_INFINITY] + ["m-num" 3.5]] + parsed)))) + +(deftest parse-withscores-rejects-nil-payload + ;; `count` on nil is 0 in Clojure; nil must still be treated as a + ;; malformed successful Redis reply, not as an empty ZSET result. + (is (thrown-with-msg? + clojure.lang.ExceptionInfo + #"WITHSCORES reply is nil" + (#'workload/parse-withscores nil)))) + +(deftest parse-withscores-rejects-odd-length-payload + ;; A WITHSCORES reply with a dangling member (odd element count) is a + ;; protocol violation. The checker must surface it rather than let + ;; `(partition 2)` silently drop evidence of the anomaly. + (is (thrown-with-msg? + clojure.lang.ExceptionInfo + #"odd element count" + (#'workload/parse-withscores ["m1" "1.0" "m2-dangling"]))))