Skip to content

Commit f45aa06

Browse files
rudderstack-github-actions[bot]achettyiitrAkashrudder-devbox[bot]atzoum
authored
chore: sync release v1.75.3 to main branch (#6985)
# Description Syncing patch release v1.75.3 to main branch **↓↓ Please review and edit commit overrides before merging ↓↓** BEGIN_COMMIT_OVERRIDE fix: continuous backend config cache cause postgres disk bloat (#6983) END_COMMIT_OVERRIDE --------- Co-authored-by: Akash Chetty <achetty.iitr@gmail.com> Co-authored-by: Akash <achetty@rudderstack.com> Co-authored-by: rudderstack-github-actions[bot] <236995729+rudderstack-github-actions[bot]@users.noreply.github.com> Co-authored-by: rudder-harness[bot] <278693763+rudder-harness[bot]@users.noreply.github.com> Co-authored-by: Aris Tzoumas <atzoumas@rudderstack.com>
1 parent d788760 commit f45aa06

5 files changed

Lines changed: 151 additions & 7 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## [1.75.3](https://github.com/rudderlabs/rudder-server/compare/v1.75.2...v1.75.3) (2026-05-20)
4+
5+
6+
### Bug Fixes
7+
8+
* continuous backend config cache cause postgres disk bloat ([#6983](https://github.com/rudderlabs/rudder-server/issues/6983)) ([398f902](https://github.com/rudderlabs/rudder-server/commit/398f9028eb48c1a59fd515a686b38b573f1da6e7))
9+
310
## [1.75.2](https://github.com/rudderlabs/rudder-server/compare/v1.75.1...v1.75.2) (2026-05-19)
411

512

backend-config/backend_config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,7 @@ func TestWaitForConfig(t *testing.T) {
454454
func TestCache(t *testing.T) {
455455
initBackendConfig()
456456
t.Setenv("RSERVER_BACKEND_CONFIG_POLL_INTERVAL", "10ms")
457+
t.Setenv("RSERVER_BACKEND_CONFIG_DB_CACHE_WRITE_DEBOUNCE", "10ms")
457458
var calls int32
458459
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
459460
defer atomic.AddInt32(&calls, 1)

backend-config/internal/cache/cache.go

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,14 @@ import (
66
"crypto/aes"
77
"crypto/cipher"
88
"crypto/rand"
9+
"crypto/sha256"
910
"database/sql"
11+
"encoding/hex"
1012
"fmt"
13+
"sync"
14+
"time"
15+
16+
"github.com/samber/lo"
1117

1218
"github.com/rudderlabs/rudder-go-kit/config"
1319
"github.com/rudderlabs/rudder-go-kit/jsonrs"
@@ -69,15 +75,37 @@ func Start(ctx context.Context, secret [32]byte, key string, channelProvider fun
6975
return nil, err
7076
}
7177

78+
// writeDebounce coalesces a burst of config updates into a single write
79+
writeDebounce := config.GetDurationVar(10, time.Second, "BackendConfig.dbCacheWriteDebounce")
7280
go func() {
73-
// subscribe to config and write to db
74-
for config := range channelProvider() {
75-
// persist to database
76-
err = dbStore.set(ctx, config.Data)
77-
if err != nil {
81+
var (
82+
latestConfig any
83+
gotConfig bool
84+
configMu sync.Mutex
85+
)
86+
// persist writes the most recently received config to the database
87+
persist := func() {
88+
configMu.Lock()
89+
data, ok := latestConfig, gotConfig
90+
configMu.Unlock()
91+
if !ok {
92+
return
93+
}
94+
if err := dbStore.set(ctx, data); err != nil {
7895
pkgLogger.Errorn("failed writing config to database", obskit.Error(err))
7996
}
8097
}
98+
debouncedPersist, cancelDebounce := lo.NewDebounce(writeDebounce, persist)
99+
// subscribe to config and debounce writes to db
100+
for update := range channelProvider() {
101+
configMu.Lock()
102+
latestConfig, gotConfig = update.Data, true
103+
configMu.Unlock()
104+
debouncedPersist()
105+
}
106+
// channel closed: flush the latest config before shutting down
107+
cancelDebounce()
108+
persist()
81109
dbStore.Close()
82110
}()
83111
return &dbStore, nil
@@ -89,6 +117,28 @@ func (db *cacheStore) set(ctx context.Context, config any) error {
89117
if err != nil {
90118
return fmt.Errorf("failed to marshal config: %w", err)
91119
}
120+
configSum := sha256.Sum256(configBytes)
121+
configHash := hex.EncodeToString(configSum[:])
122+
123+
// Skip the write if the stored config is already identical. This avoids
124+
// encrypting and transferring the (potentially large) config blob to the
125+
// database when nothing has changed.
126+
var storedHash sql.NullString
127+
err = db.QueryRowContext(
128+
ctx,
129+
`SELECT config_hash FROM config_cache WHERE key = $1`,
130+
db.key,
131+
).Scan(&storedHash)
132+
switch err {
133+
case nil:
134+
if storedHash.Valid && storedHash.String == configHash {
135+
return nil
136+
}
137+
case sql.ErrNoRows:
138+
default:
139+
return err
140+
}
141+
92142
// encrypt
93143
encrypted, err := db.encryptAES(configBytes)
94144
if err != nil {
@@ -97,13 +147,16 @@ func (db *cacheStore) set(ctx context.Context, config any) error {
97147
// write to config table
98148
_, err = db.ExecContext(
99149
ctx,
100-
`INSERT INTO config_cache (key, config) VALUES ($1, $2)
150+
`INSERT INTO config_cache (key, config, config_hash) VALUES ($1, $2, $3)
101151
on conflict (key)
102152
do update set
103153
config = $2,
104-
updated_at = NOW()`,
154+
config_hash = $3,
155+
updated_at = NOW()
156+
where config_cache.config_hash is distinct from excluded.config_hash`,
105157
db.key,
106158
encrypted,
159+
configHash,
107160
)
108161
return err
109162
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"encoding/hex"
7+
"testing"
8+
"time"
9+
10+
"github.com/ory/dockertest/v3"
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/rudderlabs/rudder-go-kit/jsonrs"
14+
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
15+
)
16+
17+
// TestCacheStoreSet exercises the upsert: it asserts that writing the
18+
// same config twice does not rewrite the row (updated_at stays put), while
19+
// writing a different config does.
20+
func TestCacheStoreSet(t *testing.T) {
21+
pool, err := dockertest.NewPool("")
22+
require.NoError(t, err)
23+
resourcePostgres, err := postgres.Setup(pool, t)
24+
require.NoError(t, err)
25+
26+
db := resourcePostgres.DB
27+
require.NoError(t, migrate(db))
28+
29+
store := &cacheStore{
30+
DB: db,
31+
secret: sha256.Sum256([]byte("secret")),
32+
key: "test-workspace",
33+
}
34+
ctx := context.Background()
35+
36+
// expectedHash mirrors the production hashing so the test verifies the
37+
// stored value independently.
38+
expectedHash := func(config any) string {
39+
b, err := jsonrs.Marshal(config)
40+
require.NoError(t, err)
41+
sum := sha256.Sum256(b)
42+
return hex.EncodeToString(sum[:])
43+
}
44+
45+
readRow := func() (updatedAt time.Time, hash string, config []byte) {
46+
t.Helper()
47+
require.NoError(t, db.QueryRowContext(ctx,
48+
`SELECT updated_at, config_hash, config FROM config_cache WHERE key = $1`,
49+
store.key,
50+
).Scan(&updatedAt, &hash, &config))
51+
return updatedAt, hash, config
52+
}
53+
54+
configA := map[string]any{"workspace": "A", "sources": []string{"s1", "s2"}}
55+
configB := map[string]any{"workspace": "B", "sources": []string{"s3"}}
56+
57+
// first write inserts the row
58+
require.NoError(t, store.set(ctx, configA))
59+
firstUpdatedAt, firstHash, firstConfig := readRow()
60+
require.Equal(t, expectedHash(configA), firstHash)
61+
62+
// writing an identical config must not rewrite the row
63+
require.NoError(t, store.set(ctx, configA))
64+
sameUpdatedAt, sameHash, sameConfig := readRow()
65+
require.Equal(t, firstUpdatedAt, sameUpdatedAt, "updated_at must not change for identical config")
66+
require.Equal(t, firstHash, sameHash)
67+
require.Equal(t, firstConfig, sameConfig, "config blob must not be rewritten for identical config")
68+
69+
// writing a different config must rewrite the row and bump updated_at
70+
time.Sleep(time.Millisecond) // ensure NOW() is observably later
71+
require.NoError(t, store.set(ctx, configB))
72+
changedUpdatedAt, changedHash, changedConfig := readRow()
73+
require.True(t, changedUpdatedAt.After(firstUpdatedAt), "updated_at must advance when config changes")
74+
require.Equal(t, expectedHash(configB), changedHash)
75+
require.NotEqual(t, firstConfig, changedConfig, "config blob must be rewritten when config changes")
76+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
ALTER TABLE config_cache ADD COLUMN IF NOT EXISTS config_hash text;
2+
3+
ALTER TABLE config_cache
4+
SET (
5+
toast.autovacuum_vacuum_scale_factor = 0.0,
6+
toast.autovacuum_vacuum_threshold = 1
7+
);

0 commit comments

Comments
 (0)