Skip to content

Commit c20dad2

Browse files
authored
chore(jobsdb): disable no results cache state filter optimization (#6964)
🔒 Scanned for secrets using gitleaks 8.30.1 # Description Makes the per-state `noResultsCache` optimization in `jobsdb` configurable, and **disables it by default**. The optimization has two parts in `getJobsDS`: 1. **Read-side pruning** — `stateFilters` are narrowed against the cache before querying, skipping states the cache reported as empty for the current `(ds, partitions, workspace, customVals, params)` key. 2. **Write-side commit predicate** — when a queried state is absent from the result set and no limit was reached, a "no jobs of state X" entry is committed to the cache. Both are now gated behind a single reloadable flag `noResultsCacheStateOptimization`: - `JobsDB.<tablePrefix>.noResultsCacheStateOptimization` - `JobsDB.noResultsCacheStateOptimization` Default is `false` (optimization disabled). Also fixes a variable-shadowing bug in `getJobsDS` where the outer `payloadSize` was never updated due to an inner shadow, making the post-loop `payloadSize >= PayloadSizeLimit` check dead code. As a result, `LimitsReached` was wrongly reported as `false` when the payload limit was reached at the boundary (exact total) or when a single oversize job exhausted the budget alone. This is one of the concrete mechanisms that can poison the `noResultsCache` — see the reasoning below. ## Reasoning We suspect the per-state optimization is the root cause of `noResultsCache` being populated with stale entries, which in turn leads to **out-of-order processing**: a state that actually has jobs is wrongly cached as empty, so subsequent reads skip those jobs from one dataset and process later jobs from another dataset first. The write-side commit predicate `(!ok && !limitsReached)` assumes that when a state is missing from the result set and limits were not reached, we have exhaustively scanned the dataset for that state. This invariant can be broken by silent row-dropping filters that are not part of the cache key, or incorrect calculation of when limits are reached, and the resulting cache entry then poisons later reads. The `payloadSize` shadowing bug fixed in this PR is a concrete instance of the second class: at the payload-budget boundary `limitsReached` was wrongly returned as `false`, so a state missing from the result set (e.g. because the budget was exhausted before any state-X row appeared) would satisfy `(!ok && !limitsReached)` and get committed to the cache as "no jobs for this state" — a stale entry that subsequent reads would honour, skipping real jobs and triggering out-of-order processing. Disabling the per-state optimization by default keeps only the coarser `len(jobList) == 0` commit path, which can only record "no rows at all" results and is therefore not vulnerable to per-state misattribution. The optimization can be re-enabled per-deployment via the reloadable flag while we investigate and harden the invariants. ## Linear Ticket resolves PIPE-2994 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 8395174 commit c20dad2

3 files changed

Lines changed: 162 additions & 37 deletions

File tree

cluster/migrator/partitionmigration/server/grpc_server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package server
33

44
import (
5+
"errors"
56
"fmt"
67
"net"
78
"sync"
@@ -42,7 +43,7 @@ func (s *GRPCServer) Start() error {
4243
return fmt.Errorf("failed to listen: %w", err)
4344
}
4445
s.wg.Go(func() {
45-
if err := s.server.Serve(lis); err != nil {
46+
if err := s.server.Serve(lis); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
4647
// This shouldn't really happen, only in very exceptional cases.
4748
// No error is returned during GracefulStop or Stop.
4849
panic(fmt.Errorf("failed to serve grpc server: %w", err))

jobsdb/jobsdb.go

Lines changed: 51 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -588,33 +588,34 @@ type Handle struct {
588588

589589
config *config.Config
590590
conf struct {
591-
payloadColumnType payloadColumnType
592-
maxTableSize config.ValueLoader[int64]
593-
cacheExpiration config.ValueLoader[time.Duration]
594-
addNewDSLoopSleepDuration config.ValueLoader[time.Duration]
595-
addNewDSTimeout config.ValueLoader[time.Duration]
596-
refreshDSListLoopSleepDuration config.ValueLoader[time.Duration]
597-
refreshDSTimeout config.ValueLoader[time.Duration]
598-
minDSRetentionPeriod config.ValueLoader[time.Duration]
599-
maxDSRetentionPeriod config.ValueLoader[time.Duration]
600-
jobMaxAge config.ValueLoader[time.Duration]
601-
writeCapacity chan struct{}
602-
readCapacity chan struct{}
603-
enableWriterQueue bool
604-
enableReaderQueue bool
605-
clearAll bool
606-
skipMaintenanceError bool
607-
dsLimit config.ValueLoader[int]
608-
maxReaders int
609-
maxWriters int
610-
maxOpenConnections int
611-
analyzeThreshold config.ValueLoader[int]
612-
MaxDSSize config.ValueLoader[int]
613-
numPartitions int // if zero or negative, no partitioning
614-
partitionFunction func(job *JobT) string
615-
warnOnStatusMissingPartitionID config.ValueLoader[bool]
616-
holdDSListLockDuringStore config.ValueLoader[bool] // escape hatch: hold the dsList read lock for the entire store callback
617-
dbTablesVersion int // version of the database tables schema (0 means latest)
591+
payloadColumnType payloadColumnType
592+
maxTableSize config.ValueLoader[int64]
593+
cacheExpiration config.ValueLoader[time.Duration]
594+
addNewDSLoopSleepDuration config.ValueLoader[time.Duration]
595+
addNewDSTimeout config.ValueLoader[time.Duration]
596+
refreshDSListLoopSleepDuration config.ValueLoader[time.Duration]
597+
refreshDSTimeout config.ValueLoader[time.Duration]
598+
minDSRetentionPeriod config.ValueLoader[time.Duration]
599+
maxDSRetentionPeriod config.ValueLoader[time.Duration]
600+
jobMaxAge config.ValueLoader[time.Duration]
601+
writeCapacity chan struct{}
602+
readCapacity chan struct{}
603+
enableWriterQueue bool
604+
enableReaderQueue bool
605+
clearAll bool
606+
skipMaintenanceError bool
607+
dsLimit config.ValueLoader[int]
608+
maxReaders int
609+
maxWriters int
610+
maxOpenConnections int
611+
analyzeThreshold config.ValueLoader[int]
612+
MaxDSSize config.ValueLoader[int]
613+
numPartitions int // if zero or negative, no partitioning
614+
partitionFunction func(job *JobT) string
615+
warnOnStatusMissingPartitionID config.ValueLoader[bool]
616+
holdDSListLockDuringStore config.ValueLoader[bool] // escape hatch: hold the dsList read lock for the entire store callback
617+
noResultsCacheStateOptimization config.ValueLoader[bool]
618+
dbTablesVersion int // version of the database tables schema (0 means latest)
618619

619620
migration struct {
620621
maxMigrateOnce, maxMigrateDSProbe config.ValueLoader[int]
@@ -1107,6 +1108,10 @@ func (jd *Handle) loadConfig() {
11071108
// so long-running stores don't block dsList writers. Flip to true to revert to holding the lock for the whole callback.
11081109
jd.conf.holdDSListLockDuringStore = jd.config.GetReloadableBoolVar(false, jd.configKeys("holdDSListLockDuringStore")...)
11091110

1111+
// when true, the per-state noResultsCache optimization is enabled: stateFilters are narrowed
1112+
// against the cache before querying, and (!ok && !limitsReached) is used as a commit predicate.
1113+
jd.conf.noResultsCacheStateOptimization = jd.config.GetReloadableBoolVar(false, jd.configKeys("noResultsCacheStateOptimization")...)
1114+
11101115
if jd.TriggerAddNewDS == nil {
11111116
jd.TriggerAddNewDS = func() <-chan time.Time {
11121117
return time.After(jd.conf.addNewDSLoopSleepDuration.Load())
@@ -2254,10 +2259,13 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
22542259
CustomValFilters: params.CustomValFilters,
22552260
WorkspaceID: workspaceID,
22562261
}
2262+
stateFilterOptimization := jd.conf.noResultsCacheStateOptimization.Load()
22572263

2258-
stateFilters = lo.Filter(stateFilters, func(state string, _ int) bool { // exclude states for which we already know that there are no jobs
2259-
return !jd.noResultsCache.Get(ds.Index, partitionFilters, workspaceID, customValFilters, []string{state}, parameterFilters)
2260-
})
2264+
if stateFilterOptimization {
2265+
stateFilters = lo.Filter(stateFilters, func(state string, _ int) bool { // exclude states for which we already know that there are no jobs
2266+
return !jd.noResultsCache.Get(ds.Index, partitionFilters, workspaceID, customValFilters, []string{state}, parameterFilters)
2267+
})
2268+
}
22612269

22622270
defer jd.getTimerStat("jobsdb_get_jobs_ds_time", &tags).RecordDuration()()
22632271

@@ -2390,17 +2398,22 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
23902398
}
23912399
defer func() { _ = rows.Close() }()
23922400

2393-
var runningEventCount int
2394-
var runningPayloadSize int64
2395-
23962401
var jobList []*JobT
23972402
var limitsReached bool
23982403
var eventCount int
23992404
var payloadSize int64
2405+
2406+
// we don't need the payload_size but still need to scan it because it is part of the resultset
2407+
// The query uses it for limits checking. The variable is declared before the for loop to avoid extra allocations,
2408+
// but if we were to actually use it in the future for returning in the result, we would need to move its declaration
2409+
// inside the loop
2410+
var discardRowPayloadSize int64
2411+
24002412
resultsetStates := map[string]struct{}{}
24012413
for rows.Next() {
2402-
var payloadSize int64
24032414
var job JobT
2415+
var runningEventCount int
2416+
var runningPayloadSize int64
24042417
var payload []byte
24052418
var jsState sql.NullString
24062419
var jsAttemptNum sql.NullInt64
@@ -2410,7 +2423,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
24102423
var jsErrorResponse []byte
24112424
var jsParameters []byte
24122425
err := rows.Scan(&job.JobID, &job.UUID, &job.UserID, &job.Parameters, &job.CustomVal,
2413-
&payload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PartitionID, &payloadSize, &runningEventCount, &runningPayloadSize,
2426+
&payload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PartitionID, &discardRowPayloadSize, &runningEventCount, &runningPayloadSize,
24142427
&jsState, &jsAttemptNum,
24152428
&jsExecTime, &jsRetryTime,
24162429
&jsErrorCode, &jsErrorResponse, &jsParameters)
@@ -2466,7 +2479,9 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
24662479
// we are committing the cache Tx only if
24672480
// (a) no jobs are returned by the query or
24682481
// (b) the state is not present in the resultset and limits have not been reached
2469-
if _, ok := resultsetStates[state]; len(jobList) == 0 || (!ok && !limitsReached) {
2482+
// (skipped when the noResultsCache state-filter optimization is disabled)
2483+
_, ok := resultsetStates[state]
2484+
if len(jobList) == 0 || (stateFilterOptimization && !ok && !limitsReached) {
24702485
if allEntriesCommitted := cacheTx.Commit(); !allEntriesCommitted {
24712486
tags := &statTags{
24722487
StateFilters: []string{state},

jobsdb/jobsdb_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1639,6 +1639,115 @@ func TestPayloadSizeColumnQueries(t *testing.T) {
16391639
jobsDB.TearDown()
16401640
}
16411641

1642+
// TestGetJobsLimitsReached verifies the LimitsReached flag and the number of jobs
1643+
// returned by GetToProcess for every combination of JobsLimit / EventsLimit /
1644+
// PayloadSizeLimit, including exact-boundary cases and single-oversize cases.
1645+
// It runs against both ASCII and UTF-8 multibyte payloads to validate that
1646+
// PayloadSizeLimit is measured in bytes (octet_length) and not characters.
1647+
func TestGetJobsLimitsReached(t *testing.T) {
1648+
pgContainer := startPostgres(t)
1649+
1650+
const (
1651+
payloadBytes = 100
1652+
eventsPerJob = 2
1653+
numJobs = 10
1654+
totalEvents = numJobs * eventsPerJob
1655+
totalPayload = int64(numJobs * payloadBytes)
1656+
singlePayload = int64(payloadBytes)
1657+
)
1658+
1659+
// Each payload is exactly payloadBytes bytes long.
1660+
// "🙂" is U+1F642, 4 bytes in UTF-8 → 25 × 4 = 100 bytes.
1661+
payloadVariants := []struct {
1662+
name string
1663+
data []byte
1664+
}{
1665+
{"ascii", []byte(strings.Repeat("x", payloadBytes))},
1666+
{"utf8_multibyte", []byte(strings.Repeat("🙂", payloadBytes/4))},
1667+
}
1668+
1669+
for _, pv := range payloadVariants {
1670+
t.Run(pv.name, func(t *testing.T) {
1671+
require.Len(t, pv.data, payloadBytes, "payload variant must be exactly %d bytes", payloadBytes)
1672+
1673+
customVal := strings.ToUpper(rsRand.String(8))
1674+
workspaceID := "workspaceID"
1675+
tablePrefix := strings.ToLower(rsRand.String(5))
1676+
1677+
t.Setenv("RSERVER_JOBS_DB_PAYLOAD_COLUMN_TYPE", "text")
1678+
jobsDB := &Handle{dbHandle: pgContainer.DB}
1679+
require.NoError(t, jobsDB.Setup(ReadWrite, true, tablePrefix))
1680+
t.Cleanup(jobsDB.TearDown)
1681+
1682+
jobs := make([]*JobT, numJobs)
1683+
for i := range numJobs {
1684+
jobs[i] = &JobT{
1685+
Parameters: []byte(`{}`),
1686+
EventPayload: pv.data,
1687+
UserID: "u",
1688+
UUID: uuid.New(),
1689+
CustomVal: customVal,
1690+
EventCount: eventsPerJob,
1691+
WorkspaceId: workspaceID,
1692+
}
1693+
}
1694+
require.NoError(t, jobsDB.Store(context.Background(), jobs))
1695+
1696+
testCases := []struct {
1697+
name string
1698+
jobsLimit int
1699+
eventsLimit int
1700+
payloadSizeLimit int64
1701+
expectedJobs int
1702+
expectedLimits bool
1703+
}{
1704+
// --- JobsLimit only ---
1705+
{"JobsLimit_under", 5, 0, 0, 5, true},
1706+
{"JobsLimit_exact_boundary", numJobs, 0, 0, numJobs, true},
1707+
{"JobsLimit_over", numJobs * 2, 0, 0, numJobs, false},
1708+
1709+
// --- EventsLimit ---
1710+
// running_events overflows after the 2nd job (running=6 > 4)
1711+
{"EventsLimit_under", 100, 4, 0, 2, true},
1712+
// running_events overflows after the 1st job (running=4 > 2)
1713+
{"EventsLimit_exact_one_job", 100, eventsPerJob, 0, 1, true},
1714+
// boundary: total events == limit, no in-loop overflow,
1715+
// post-loop check sets limitsReached because eventCount >= limit
1716+
{"EventsLimit_exact_total_boundary", 100, totalEvents, 0, numJobs, true},
1717+
{"EventsLimit_over", 100, totalEvents * 10, 0, numJobs, false},
1718+
// limit < any single job's events: must still return one oversize job
1719+
{"EventsLimit_smaller_than_single_job", 100, 1, 0, 1, true},
1720+
1721+
// --- PayloadSizeLimit ---
1722+
{"PayloadSizeLimit_under", 100, 0, singlePayload * 5, 5, true},
1723+
{"PayloadSizeLimit_exact_one_job", 100, 0, singlePayload, 1, true},
1724+
{"PayloadSizeLimit_exact_total_boundary", 100, 0, totalPayload, numJobs, true},
1725+
{"PayloadSizeLimit_over", 100, 0, totalPayload * 2, numJobs, false},
1726+
// limit < any single job's payload: must still return one oversize job
1727+
{"PayloadSizeLimit_smaller_than_single_job", 100, 0, singlePayload / 2, 1, true},
1728+
1729+
// --- Combined ---
1730+
{"Combined_events_binds_first", 100, 4, totalPayload, 2, true},
1731+
{"Combined_payload_binds_first", 100, 100, singlePayload * 3, 3, true},
1732+
}
1733+
1734+
for _, tc := range testCases {
1735+
t.Run(tc.name, func(t *testing.T) {
1736+
res, err := jobsDB.GetToProcess(context.Background(), GetQueryParams{
1737+
CustomValFilters: []string{customVal},
1738+
JobsLimit: tc.jobsLimit,
1739+
EventsLimit: tc.eventsLimit,
1740+
PayloadSizeLimit: tc.payloadSizeLimit,
1741+
}, nil)
1742+
require.NoError(t, err)
1743+
require.Len(t, res.Jobs, tc.expectedJobs, "unexpected number of jobs")
1744+
require.Equal(t, tc.expectedLimits, res.LimitsReached, "unexpected LimitsReached")
1745+
})
1746+
}
1747+
})
1748+
}
1749+
}
1750+
16421751
func TestUpdateJobStatus(t *testing.T) {
16431752
_ = startPostgres(t)
16441753
c := config.New()

0 commit comments

Comments
 (0)