Skip to content

Commit 398f902

Browse files
fix: continuous backend config cache cause postgres disk bloat (#6983)
# Description The backend config cache persists the workspace config to postgres on every config update. On every update the row was rewritten regardless of whether the config value had actually changed or not. In postgres an `UPDATE` always writes a new tuple version — even when the new values equal the old ones — leaving the previous one dead; and since the large `config` value lives in TOAST, each rewrite churns the TOAST table too. This becomes pathological when: - **There are a lot of continuous configuration changes**: every change is another full-row rewrite. - **Multiple pods try to cache the same config**: each pod independently writes the same config to the same row, so a single logical change becomes one write per pod, most of them re-writing a value identical to what another pod just stored. Dead tuples accumulated faster than autovacuum reclaimed them, steadily bloating postgres disk usage. ### Changes - **Conditional upsert via content hash** — a new `config_hash` column stores a SHA-256 hex digest of the marshalled config. The upsert now carries `WHERE config_cache.config_hash IS DISTINCT FROM excluded.config_hash`, so when the stored config is already identical the row is not rewritten and `updated_at` is not bumped. This de-duplicates both no-op updates and redundant cross-pod writes. - **Skip redundant work** — `set` does a cheap `SELECT config_hash` first and returns early on a match, so a pod that finds the config already cached skips encrypting and transferring the large config blob entirely. - **Debounce writes** — a burst of config updates is coalesced into a single write per pod, controlled by `BackendConfig.dbCacheWriteDebounce` (default `10s`). - **Aggressive TOAST autovacuum** — the migration tunes `config_cache` so dead tuples from any remaining rewrites are reclaimed promptly (`toast.autovacuum_vacuum_scale_factor = 0`, `toast.autovacuum_vacuum_threshold = 1`). ## Linear Ticket resolves PIPE-3018 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used. Co-authored-by: Aris Tzoumas <atzoumas@rudderstack.com>
1 parent b211809 commit 398f902

4 files changed

Lines changed: 144 additions & 7 deletions

File tree

backend-config/backend_config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,7 @@ func TestWaitForConfig(t *testing.T) {
454454
func TestCache(t *testing.T) {
455455
initBackendConfig()
456456
t.Setenv("RSERVER_BACKEND_CONFIG_POLL_INTERVAL", "10ms")
457+
t.Setenv("RSERVER_BACKEND_CONFIG_DB_CACHE_WRITE_DEBOUNCE", "10ms")
457458
var calls int32
458459
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
459460
defer atomic.AddInt32(&calls, 1)

backend-config/internal/cache/cache.go

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,14 @@ import (
66
"crypto/aes"
77
"crypto/cipher"
88
"crypto/rand"
9+
"crypto/sha256"
910
"database/sql"
11+
"encoding/hex"
1012
"fmt"
13+
"sync"
14+
"time"
15+
16+
"github.com/samber/lo"
1117

1218
"github.com/rudderlabs/rudder-go-kit/config"
1319
"github.com/rudderlabs/rudder-go-kit/jsonrs"
@@ -69,15 +75,37 @@ func Start(ctx context.Context, secret [32]byte, key string, channelProvider fun
6975
return nil, err
7076
}
7177

78+
// writeDebounce coalesces a burst of config updates into a single write
79+
writeDebounce := config.GetDurationVar(10, time.Second, "BackendConfig.dbCacheWriteDebounce")
7280
go func() {
73-
// subscribe to config and write to db
74-
for config := range channelProvider() {
75-
// persist to database
76-
err = dbStore.set(ctx, config.Data)
77-
if err != nil {
81+
var (
82+
latestConfig any
83+
gotConfig bool
84+
configMu sync.Mutex
85+
)
86+
// persist writes the most recently received config to the database
87+
persist := func() {
88+
configMu.Lock()
89+
data, ok := latestConfig, gotConfig
90+
configMu.Unlock()
91+
if !ok {
92+
return
93+
}
94+
if err := dbStore.set(ctx, data); err != nil {
7895
pkgLogger.Errorn("failed writing config to database", obskit.Error(err))
7996
}
8097
}
98+
debouncedPersist, cancelDebounce := lo.NewDebounce(writeDebounce, persist)
99+
// subscribe to config and debounce writes to db
100+
for update := range channelProvider() {
101+
configMu.Lock()
102+
latestConfig, gotConfig = update.Data, true
103+
configMu.Unlock()
104+
debouncedPersist()
105+
}
106+
// channel closed: flush the latest config before shutting down
107+
cancelDebounce()
108+
persist()
81109
dbStore.Close()
82110
}()
83111
return &dbStore, nil
@@ -89,6 +117,28 @@ func (db *cacheStore) set(ctx context.Context, config any) error {
89117
if err != nil {
90118
return fmt.Errorf("failed to marshal config: %w", err)
91119
}
120+
configSum := sha256.Sum256(configBytes)
121+
configHash := hex.EncodeToString(configSum[:])
122+
123+
// Skip the write if the stored config is already identical. This avoids
124+
// encrypting and transferring the (potentially large) config blob to the
125+
// database when nothing has changed.
126+
var storedHash sql.NullString
127+
err = db.QueryRowContext(
128+
ctx,
129+
`SELECT config_hash FROM config_cache WHERE key = $1`,
130+
db.key,
131+
).Scan(&storedHash)
132+
switch err {
133+
case nil:
134+
if storedHash.Valid && storedHash.String == configHash {
135+
return nil
136+
}
137+
case sql.ErrNoRows:
138+
default:
139+
return err
140+
}
141+
92142
// encrypt
93143
encrypted, err := db.encryptAES(configBytes)
94144
if err != nil {
@@ -97,13 +147,16 @@ func (db *cacheStore) set(ctx context.Context, config any) error {
97147
// write to config table
98148
_, err = db.ExecContext(
99149
ctx,
100-
`INSERT INTO config_cache (key, config) VALUES ($1, $2)
150+
`INSERT INTO config_cache (key, config, config_hash) VALUES ($1, $2, $3)
101151
on conflict (key)
102152
do update set
103153
config = $2,
104-
updated_at = NOW()`,
154+
config_hash = $3,
155+
updated_at = NOW()
156+
where config_cache.config_hash is distinct from excluded.config_hash`,
105157
db.key,
106158
encrypted,
159+
configHash,
107160
)
108161
return err
109162
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"encoding/hex"
7+
"testing"
8+
"time"
9+
10+
"github.com/ory/dockertest/v3"
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/rudderlabs/rudder-go-kit/jsonrs"
14+
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
15+
)
16+
17+
// TestCacheStoreSet exercises the upsert: it asserts that writing the
18+
// same config twice does not rewrite the row (updated_at stays put), while
19+
// writing a different config does.
20+
func TestCacheStoreSet(t *testing.T) {
21+
pool, err := dockertest.NewPool("")
22+
require.NoError(t, err)
23+
resourcePostgres, err := postgres.Setup(pool, t)
24+
require.NoError(t, err)
25+
26+
db := resourcePostgres.DB
27+
require.NoError(t, migrate(db))
28+
29+
store := &cacheStore{
30+
DB: db,
31+
secret: sha256.Sum256([]byte("secret")),
32+
key: "test-workspace",
33+
}
34+
ctx := context.Background()
35+
36+
// expectedHash mirrors the production hashing so the test verifies the
37+
// stored value independently.
38+
expectedHash := func(config any) string {
39+
b, err := jsonrs.Marshal(config)
40+
require.NoError(t, err)
41+
sum := sha256.Sum256(b)
42+
return hex.EncodeToString(sum[:])
43+
}
44+
45+
readRow := func() (updatedAt time.Time, hash string, config []byte) {
46+
t.Helper()
47+
require.NoError(t, db.QueryRowContext(ctx,
48+
`SELECT updated_at, config_hash, config FROM config_cache WHERE key = $1`,
49+
store.key,
50+
).Scan(&updatedAt, &hash, &config))
51+
return updatedAt, hash, config
52+
}
53+
54+
configA := map[string]any{"workspace": "A", "sources": []string{"s1", "s2"}}
55+
configB := map[string]any{"workspace": "B", "sources": []string{"s3"}}
56+
57+
// first write inserts the row
58+
require.NoError(t, store.set(ctx, configA))
59+
firstUpdatedAt, firstHash, firstConfig := readRow()
60+
require.Equal(t, expectedHash(configA), firstHash)
61+
62+
// writing an identical config must not rewrite the row
63+
require.NoError(t, store.set(ctx, configA))
64+
sameUpdatedAt, sameHash, sameConfig := readRow()
65+
require.Equal(t, firstUpdatedAt, sameUpdatedAt, "updated_at must not change for identical config")
66+
require.Equal(t, firstHash, sameHash)
67+
require.Equal(t, firstConfig, sameConfig, "config blob must not be rewritten for identical config")
68+
69+
// writing a different config must rewrite the row and bump updated_at
70+
time.Sleep(time.Millisecond) // ensure NOW() is observably later
71+
require.NoError(t, store.set(ctx, configB))
72+
changedUpdatedAt, changedHash, changedConfig := readRow()
73+
require.True(t, changedUpdatedAt.After(firstUpdatedAt), "updated_at must advance when config changes")
74+
require.Equal(t, expectedHash(configB), changedHash)
75+
require.NotEqual(t, firstConfig, changedConfig, "config blob must be rewritten when config changes")
76+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
ALTER TABLE config_cache ADD COLUMN IF NOT EXISTS config_hash text;
2+
3+
ALTER TABLE config_cache
4+
SET (
5+
toast.autovacuum_vacuum_scale_factor = 0.0,
6+
toast.autovacuum_vacuum_threshold = 1
7+
);

0 commit comments

Comments
 (0)