Skip to content

Commit e0838dc

Browse files
authored
feat(jobsdb): non-blocking dataset compaction (#6967)
🔒 Scanned for secrets using gitleaks 8.30.1 # Description Introduces a non-blocking compaction flow for the jobsdb migration loop, gated behind the `nonBlockingCompaction` flag. When enabled it dramatically shrinks the lock window taken during dataset migration: - **`dsMigrationLock` is no longer taken.** Concurrent readers (`getJobs`, `GetPileUpCounts`, `GetDistinctParameterValues`) are not blocked by an in-flight compaction. - **Caveat:** an `UpdateJobStatus` call that targets a source dataset *while it is being compacted* will block at the PG level on the `EXCLUSIVE` lock held against that source's status table, until the compaction TX commits — bounded by the per-source `COPY` duration. After commit, late writers landing on the old status table are fenced by the readonly trigger and routed to the new destination via the existing `ErrStaleDsList` retry path. Compared to the legacy path, the writer side of `dsListLock` is reduced from "entire migration TX, including bulk `COPY` of all non-terminal jobs and `DROP TABLE` of every source" down to "`COMMIT` + a single `getDSList` + a `MIN/MAX` scan of the new destination". ## New maintenance pool Introducing a new maintenance pool which can be used by maintenance operations, such as: - adding new dataset - refreshing dataset list - compacting datasets - compacting job status tables This pool helps closing a connection-pool deadlock vector where jobsdb readers and writers could fully exhaust the pool, blocking an active maintenance goroutine while it still held a mutex (e.g. post-commit compaction which requires acquiring a new connection to refresh the dataset list). In addition, all maintenance goroutines were updated to ensure they require no more than a single connection at any time, eliminating pool-related deadlock risks caused by previously nested connection usage. If no maintenance pool is injected, the calls fall back to `dbHandle` (backwards compatibility). ## Lock stats `dsListLock` and `dsMigrationLock` now emit timing metrics, tagged by lock type (`read`/`write`) and whether the acquisition was async: | Metric | Description | |---|---| | `jobsdb_lock_wait_time` | Time spent waiting to acquire the lock | | `jobsdb_lock_time` | Time the lock was held (excluding wait) | | `jobsdb_lock_total_time` | End-to-end time (wait + hold) | All metrics carry a `name` tag so lock contention can be tracked per jobsdb instance. ## Flags - `JobsDB.<prefix>.nonBlockingCompaction` (default `false`) — gates the new flow. When off, `doMigrateDS` falls back to the legacy in-TX migrate+drop path. - `JobsDB.<prefix>.getJobsRetryOnCompaction` (default `true`) — gates the `getJobs` snapshot revalidation. When on, if a dataset that `getJobs` queried was compacted mid-read, the call returns `ErrStaleDsList` and is retried against the freshly published list. **Why:** while `getJobs` is reading the source's status table, an `UpdateJobStatus` for one of the same jobs could commit against the new dataset. Without the retry, `getJobs` would miss that status update, risking out-of-order processing downstream. This option has no effect unless `nonBlockingCompaction` is also on. ## Testing #6979 runs all tests with non-blocking partition migration enabled ## Linear Ticket resolves PIPE-2997 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used. <!-- GitButler Footer Boundary Top --> --- This is **part 3 of 4 in a stack** made with GitButler: - <kbd>&nbsp;4&nbsp;</kbd> #6979 - <kbd>&nbsp;3&nbsp;</kbd> #6967 👈 - <kbd>&nbsp;2&nbsp;</kbd> #6962 - <kbd>&nbsp;1&nbsp;</kbd> #6963 <!-- GitButler Footer Boundary Bottom -->
1 parent d7fe1ae commit e0838dc

10 files changed

Lines changed: 1009 additions & 90 deletions

File tree

app/apphandlers/embeddedAppHandler.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,9 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, shutdownFn func(), op
143143
})
144144

145145
var (
146-
jobsdbPool *sql.DB
147-
priorityPool *sql.DB
146+
jobsdbPool *sql.DB
147+
priorityPool *sql.DB
148+
maintenancePool *sql.DB
148149
)
149150
if config.GetBoolVar(true, "DB.embedded.Pool.enabled", "DB.Pool.enabled") {
150151
jobsdbPool, err = misc.NewDatabaseConnectionPool(ctx, "embedded", misc.DatabaseConnectionPoolConfig{
@@ -172,6 +173,19 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, shutdownFn func(), op
172173
}
173174
defer priorityPool.Close()
174175
}
176+
if config.GetBoolVar(true, "DB.embedded.MaintenancePool.enabled", "DB.MaintenancePool.enabled") {
177+
maintenancePool, err = misc.NewDatabaseConnectionPool(ctx, "emp", misc.DatabaseConnectionPoolConfig{
178+
MaxOpenConns: config.GetReloadableIntVar(20, 1, "DB.embedded.MaintenancePool.maxOpenConnections", "DB.MaintenancePool.maxOpenConnections"),
179+
MaxIdleConns: config.GetReloadableIntVar(2, 1, "DB.embedded.MaintenancePool.maxIdleConnections", "DB.MaintenancePool.maxIdleConnections"),
180+
ConnMaxIdleTime: config.GetReloadableDurationVar(15, time.Minute, "DB.embedded.MaintenancePool.maxIdleTime", "DB.MaintenancePool.maxIdleTime"),
181+
ConnMaxLifetime: config.GetReloadableDurationVar(0, time.Second, "DB.embedded.MaintenancePool.maxConnLifetime", "DB.MaintenancePool.maxConnLifetime"),
182+
UpdateInterval: config.GetDurationVar(60, time.Second, "DB.embedded.MaintenancePool.updateInterval", "DB.MaintenancePool.updateInterval"),
183+
}, config, statsFactory)
184+
if err != nil {
185+
return err
186+
}
187+
defer maintenancePool.Close()
188+
}
175189
partitionCount := config.GetIntVar(0, 1, "JobsDB.partitionCount")
176190

177191
pendingEventsRegistry := rmetrics.NewPendingEventsRegistry()
@@ -186,6 +200,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, shutdownFn func(), op
186200
jobsdb.WithDBHandle(jobsdbPool),
187201
jobsdb.WithNumPartitions(partitionCount),
188202
jobsdb.WithPriorityPoolDB(priorityPool),
203+
jobsdb.WithMaintenancePoolDB(maintenancePool),
189204
)
190205
defer gwWOHandle.Close()
191206
if err = gwWOHandle.Start(); err != nil {
@@ -201,6 +216,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, shutdownFn func(), op
201216
jobsdb.WithStats(statsFactory),
202217
jobsdb.WithDBHandle(jobsdbPool),
203218
jobsdb.WithPriorityPoolDB(priorityPool),
219+
jobsdb.WithMaintenancePoolDB(maintenancePool),
204220
jobsdb.WithNumPartitions(partitionCount),
205221
)
206222
defer gwROHandle.Close()
@@ -214,6 +230,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, shutdownFn func(), op
214230
jobsdb.WithStats(statsFactory),
215231
jobsdb.WithDBHandle(jobsdbPool),
216232
jobsdb.WithPriorityPoolDB(priorityPool),
233+
jobsdb.WithMaintenancePoolDB(maintenancePool),
217234
jobsdb.WithNumPartitions(partitionCount),
218235
)
219236
defer rtRWHandle.Close()
@@ -227,6 +244,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, shutdownFn func(), op
227244
jobsdb.WithStats(statsFactory),
228245
jobsdb.WithDBHandle(jobsdbPool),
229246
jobsdb.WithPriorityPoolDB(priorityPool),
247+
jobsdb.WithMaintenancePoolDB(maintenancePool),
230248
jobsdb.WithNumPartitions(partitionCount),
231249
)
232250
defer brtRWHandle.Close()
@@ -239,6 +257,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, shutdownFn func(), op
239257
jobsdb.WithSkipMaintenanceErr(config.GetBoolVar(false, "Processor.jobsDB.skipMaintenanceError")),
240258
jobsdb.WithStats(statsFactory),
241259
jobsdb.WithDBHandle(jobsdbPool),
260+
jobsdb.WithMaintenancePoolDB(maintenancePool),
242261
)
243262
defer eschRWDB.Close()
244263

@@ -250,6 +269,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, shutdownFn func(), op
250269
jobsdb.WithStats(statsFactory),
251270
jobsdb.WithJobMaxAge(config.GetReloadableDurationVar(24, time.Hour, "archival.jobRetention")),
252271
jobsdb.WithDBHandle(jobsdbPool),
272+
jobsdb.WithMaintenancePoolDB(maintenancePool),
253273
)
254274
defer arcRWDB.Close()
255275

@@ -271,7 +291,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, shutdownFn func(), op
271291
}
272292

273293
// setup partition migrator
274-
ppmSetup, err := setupProcessorPartitionMigrator(ctx, shutdownFn, jobsdbPool, priorityPool,
294+
ppmSetup, err := setupProcessorPartitionMigrator(ctx, shutdownFn, jobsdbPool, priorityPool, maintenancePool,
275295
config, statsFactory,
276296
gwRODB, gwWODB,
277297
rtRWDB, brtRWDB,

app/apphandlers/gatewayAppHandler.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, _ func(), options *app
6969
}
7070
defer sourceHandle.Stop()
7171

72-
var jobsdbPool *sql.DB
72+
var (
73+
jobsdbPool *sql.DB
74+
maintenancePool *sql.DB
75+
)
7376
if config.GetBoolVar(true, "DB.gateway.Pool.enabled", "DB.Pool.enabled") {
7477
jobsdbPool, err = misc.NewDatabaseConnectionPool(ctx, "gateway", misc.DatabaseConnectionPoolConfig{
7578
MaxOpenConns: config.GetReloadableIntVar(20, 1, "DB.gateway.Pool.maxOpenConnections", "DB.Pool.maxOpenConnections"),
@@ -83,6 +86,19 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, _ func(), options *app
8386
}
8487
defer jobsdbPool.Close()
8588
}
89+
if config.GetBoolVar(true, "DB.gateway.MaintenancePool.enabled", "DB.MaintenancePool.enabled") {
90+
maintenancePool, err = misc.NewDatabaseConnectionPool(ctx, "gmp", misc.DatabaseConnectionPoolConfig{
91+
MaxOpenConns: config.GetReloadableIntVar(6, 1, "DB.gateway.MaintenancePool.maxOpenConnections", "DB.MaintenancePool.maxOpenConnections"),
92+
MaxIdleConns: config.GetReloadableIntVar(1, 1, "DB.gateway.MaintenancePool.maxIdleConnections", "DB.MaintenancePool.maxIdleConnections"),
93+
ConnMaxIdleTime: config.GetReloadableDurationVar(15, time.Minute, "DB.gateway.MaintenancePool.maxIdleTime", "DB.MaintenancePool.maxIdleTime"),
94+
ConnMaxLifetime: config.GetReloadableDurationVar(0, time.Second, "DB.gateway.MaintenancePool.maxConnLifetime", "DB.MaintenancePool.maxConnLifetime"),
95+
UpdateInterval: config.GetDurationVar(60, time.Second, "DB.gateway.MaintenancePool.updateInterval", "DB.MaintenancePool.updateInterval"),
96+
}, config, statsFactory)
97+
if err != nil {
98+
return err
99+
}
100+
defer maintenancePool.Close()
101+
}
86102
partitionCount := config.GetIntVar(0, 1, "JobsDB.partitionCount")
87103

88104
gwWOHandle := jobsdb.NewForWrite(
@@ -91,6 +107,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, _ func(), options *app
91107
jobsdb.WithSkipMaintenanceErr(config.GetBoolVar(true, "Gateway.jobsDB.skipMaintenanceError")),
92108
jobsdb.WithStats(statsFactory),
93109
jobsdb.WithDBHandle(jobsdbPool),
110+
jobsdb.WithMaintenancePoolDB(maintenancePool),
94111
jobsdb.WithNumPartitions(partitionCount),
95112
)
96113
defer gwWOHandle.Close()
@@ -112,7 +129,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, _ func(), options *app
112129
if err != nil {
113130
return fmt.Errorf("resolving mode provider: %w", err)
114131
}
115-
partitionMigrator, gwDB, err := setupGatewayPartitionMigrator(ctx, jobsdbPool, config, statsFactory, gwWODB, modeProvider.EtcdClient)
132+
partitionMigrator, gwDB, err := setupGatewayPartitionMigrator(ctx, jobsdbPool, maintenancePool, config, statsFactory, gwWODB, modeProvider.EtcdClient)
116133
if err != nil {
117134
return fmt.Errorf("setting up partition migrator: %w", err)
118135
}

app/apphandlers/processorAppHandler.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,9 @@ func (a *processorApp) StartRudderCore(ctx context.Context, shutdownFn func(), o
150150
})
151151

152152
var (
153-
jobsdbPool *sql.DB
154-
priorityPool *sql.DB
153+
jobsdbPool *sql.DB
154+
priorityPool *sql.DB
155+
maintenancePool *sql.DB
155156
)
156157
if config.GetBoolVar(true, "DB.processor.Pool.enabled", "DB.Pool.enabled") {
157158
jobsdbPool, err = misc.NewDatabaseConnectionPool(ctx, "processor", misc.DatabaseConnectionPoolConfig{
@@ -179,6 +180,19 @@ func (a *processorApp) StartRudderCore(ctx context.Context, shutdownFn func(), o
179180
}
180181
defer priorityPool.Close()
181182
}
183+
if config.GetBoolVar(true, "DB.processor.MaintenancePool.enabled", "DB.MaintenancePool.enabled") {
184+
maintenancePool, err = misc.NewDatabaseConnectionPool(ctx, "pmp", misc.DatabaseConnectionPoolConfig{
185+
MaxOpenConns: config.GetReloadableIntVar(20, 1, "DB.processor.MaintenancePool.maxOpenConnections", "DB.MaintenancePool.maxOpenConnections"),
186+
MaxIdleConns: config.GetReloadableIntVar(2, 1, "DB.processor.MaintenancePool.maxIdleConnections", "DB.MaintenancePool.maxIdleConnections"),
187+
ConnMaxIdleTime: config.GetReloadableDurationVar(15, time.Minute, "DB.processor.MaintenancePool.maxIdleTime", "DB.MaintenancePool.maxIdleTime"),
188+
ConnMaxLifetime: config.GetReloadableDurationVar(0, time.Second, "DB.processor.MaintenancePool.maxConnLifetime", "DB.MaintenancePool.maxConnLifetime"),
189+
UpdateInterval: config.GetDurationVar(60, time.Second, "DB.processor.MaintenancePool.updateInterval", "DB.MaintenancePool.updateInterval"),
190+
}, config, statsFactory)
191+
if err != nil {
192+
return err
193+
}
194+
defer maintenancePool.Close()
195+
}
182196
partitionCount := config.GetIntVar(0, 1, "JobsDB.partitionCount")
183197
pendingEventsRegistry := rmetrics.NewPendingEventsRegistry()
184198

@@ -189,6 +203,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, shutdownFn func(), o
189203
jobsdb.WithStats(statsFactory),
190204
jobsdb.WithDBHandle(jobsdbPool),
191205
jobsdb.WithPriorityPoolDB(priorityPool),
206+
jobsdb.WithMaintenancePoolDB(maintenancePool),
192207
jobsdb.WithNumPartitions(partitionCount),
193208
)
194209
defer gwROHandle.Close()
@@ -201,6 +216,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, shutdownFn func(), o
201216
jobsdb.WithStats(statsFactory),
202217
jobsdb.WithDBHandle(jobsdbPool),
203218
jobsdb.WithPriorityPoolDB(priorityPool),
219+
jobsdb.WithMaintenancePoolDB(maintenancePool),
204220
jobsdb.WithNumPartitions(partitionCount),
205221
)
206222
defer rtRWHandle.Close()
@@ -214,6 +230,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, shutdownFn func(), o
214230
jobsdb.WithStats(statsFactory),
215231
jobsdb.WithDBHandle(jobsdbPool),
216232
jobsdb.WithPriorityPoolDB(priorityPool),
233+
jobsdb.WithMaintenancePoolDB(maintenancePool),
217234
jobsdb.WithNumPartitions(partitionCount),
218235
)
219236
defer brtRWHandle.Close()
@@ -224,6 +241,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, shutdownFn func(), o
224241
jobsdb.WithDSLimit(a.config.eschDSLimit),
225242
jobsdb.WithStats(statsFactory),
226243
jobsdb.WithDBHandle(jobsdbPool),
244+
jobsdb.WithMaintenancePoolDB(maintenancePool),
227245
)
228246
defer eschRWDB.Close()
229247

@@ -235,6 +253,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, shutdownFn func(), o
235253
jobsdb.WithStats(statsFactory),
236254
jobsdb.WithJobMaxAge(config.GetReloadableDurationVar(24, time.Hour, "archival.jobRetention")),
237255
jobsdb.WithDBHandle(jobsdbPool),
256+
jobsdb.WithMaintenancePoolDB(maintenancePool),
238257
)
239258
defer arcRWDB.Close()
240259

@@ -256,7 +275,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, shutdownFn func(), o
256275
}
257276

258277
// setup partition migrator
259-
ppmSetup, err := setupProcessorPartitionMigrator(ctx, shutdownFn, jobsdbPool, priorityPool,
278+
ppmSetup, err := setupProcessorPartitionMigrator(ctx, shutdownFn, jobsdbPool, priorityPool, maintenancePool,
260279
config, statsFactory,
261280
gwRODB, nil,
262281
rtRWDB, brtRWDB,

app/apphandlers/setup_partitionmigration.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func setupProcessorPartitionMigrator(ctx context.Context,
4141
shutdownFn func(), // called to initiate shutdown of the app
4242
dbPool *sql.DB, // database handle
4343
priorityPool *sql.DB, // priority database handle
44+
maintenancePool *sql.DB, // maintenance database handle (may be nil; jobsdb falls back to dbPool)
4445
config *config.Config, stats stats.Stats,
4546
gwRODB, gwWODB, // gateway reader and writer jobsDB handles. if gwWODB is nil, gwRODB is used for reading and a new writer gw DB is created internally
4647
rtRWDB, brtRWDB jobsdb.JobsDB,
@@ -95,6 +96,7 @@ func setupProcessorPartitionMigrator(ctx context.Context,
9596
jobsdb.WithStats(stats),
9697
jobsdb.WithDBHandle(dbPool),
9798
jobsdb.WithPriorityPoolDB(priorityPool),
99+
jobsdb.WithMaintenancePoolDB(maintenancePool),
98100
)
99101
if err := gwBuffRWHandle.Start(); err != nil {
100102
return ppmSetup, fmt.Errorf("starting gw buffer jobsdb handle: %w", err)
@@ -112,6 +114,7 @@ func setupProcessorPartitionMigrator(ctx context.Context,
112114
jobsdb.WithStats(stats),
113115
jobsdb.WithDBHandle(dbPool),
114116
jobsdb.WithPriorityPoolDB(priorityPool),
117+
jobsdb.WithMaintenancePoolDB(maintenancePool),
115118
jobsdb.WithNumPartitions(partitionCount),
116119
)
117120
gwBuffROHandle := jobsdb.NewForRead(
@@ -121,6 +124,7 @@ func setupProcessorPartitionMigrator(ctx context.Context,
121124
jobsdb.WithStats(stats),
122125
jobsdb.WithDBHandle(dbPool),
123126
jobsdb.WithPriorityPoolDB(priorityPool),
127+
jobsdb.WithMaintenancePoolDB(maintenancePool),
124128
)
125129
gwSetupOpt = partitionbuffer.WithReaderOnlyAndFlushJobsDBs(gwRODB, gwBuffROHandle, gwWODB)
126130
}
@@ -150,6 +154,7 @@ func setupProcessorPartitionMigrator(ctx context.Context,
150154
jobsdb.WithStats(stats),
151155
jobsdb.WithDBHandle(dbPool),
152156
jobsdb.WithPriorityPoolDB(priorityPool),
157+
jobsdb.WithMaintenancePoolDB(maintenancePool),
153158
)
154159
rtPartitionBuffer, err := partitionbuffer.NewJobsDBPartitionBuffer(ctx,
155160
partitionbuffer.WithReadWriteJobsDBs(rtRWDB, rtBuffRWHandle),
@@ -176,6 +181,7 @@ func setupProcessorPartitionMigrator(ctx context.Context,
176181
jobsdb.WithStats(stats),
177182
jobsdb.WithDBHandle(dbPool),
178183
jobsdb.WithPriorityPoolDB(priorityPool),
184+
jobsdb.WithMaintenancePoolDB(maintenancePool),
179185
)
180186
brtPartitionBuffer, err := partitionbuffer.NewJobsDBPartitionBuffer(ctx,
181187
partitionbuffer.WithReadWriteJobsDBs(brtRWDB, brtBuffRWHandle),
@@ -272,6 +278,7 @@ func setupProcessorPartitionMigrator(ctx context.Context,
272278

273279
func setupGatewayPartitionMigrator(ctx context.Context,
274280
dbPool *sql.DB, // database handle pool
281+
maintenancePool *sql.DB, // maintenance database handle (may be nil; jobsdb falls back to dbPool)
275282
config *config.Config, stats stats.Stats,
276283
gwWODB jobsdb.JobsDB,
277284
etcdClientProvider func() (etcdclient.Client, error),
@@ -291,6 +298,7 @@ func setupGatewayPartitionMigrator(ctx context.Context,
291298
jobsdb.WithSkipMaintenanceErr(config.GetBoolVar(true, "JobsDB.gw_buf.skipMaintenanceError", "JobsDB.buff.skipMaintenanceError", "JobsDB.skipMaintenanceError")),
292299
jobsdb.WithStats(stats),
293300
jobsdb.WithDBHandle(dbPool),
301+
jobsdb.WithMaintenancePoolDB(maintenancePool),
294302
)
295303
if err := gwBuffWOHandle.Start(); err != nil {
296304
return nil, nil, fmt.Errorf("starting gw buffer jobsdb handle: %w", err)

jobsdb/admin.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func (jd *Handle) doCleanup(ctx context.Context, l lock.LockToken) error {
152152
{
153153
deleteStmt := "DELETE FROM %s_journal WHERE start_time < NOW() - INTERVAL '%d DAY'"
154154
var journalEntryCount int64
155-
res, err := jd.getDB(ctx).ExecContext(
155+
res, err := jd.maintenanceDB().ExecContext(
156156
ctx,
157157
fmt.Sprintf(
158158
deleteStmt,
@@ -180,7 +180,7 @@ func (jd *Handle) abortOldJobs(ctx context.Context, dsList []dataSetT) error {
180180
maxAgeStatusResponse := `{"reason": "job max age exceeded"}`
181181
maxAge := jd.conf.jobMaxAge.Load()
182182
for _, ds := range dsList {
183-
res, err := jd.getDB(ctx).ExecContext(
183+
res, err := jd.maintenanceDB().ExecContext(
184184
ctx,
185185
fmt.Sprintf(
186186
`INSERT INTO %[1]q (job_id, job_state, error_response)

0 commit comments

Comments
 (0)