Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend-config/account_association.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c *ConfigT) enrichDestinationWithAccounts(dest *DestinationT) {
obskit.DestinationType(dest.DestinationDefinition.Name),
)
if !dest.Enabled {
accountAssociationLogger.Infon("Skipping disabled destination from associating account")
accountAssociationLogger.Debugn("Skipping disabled destination from associating account")
return
}
// Check and set the delivery account if specified in the destination config
Expand Down
21 changes: 20 additions & 1 deletion gateway/webhook/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,25 @@ import (
"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
)

func transformerModuleVersion(t *testing.T) string {
t.Helper()
goMod, err := os.ReadFile("../../go.mod")
require.NoError(t, err, "failed to read go.mod")
for line := range strings.SplitSeq(string(goMod), "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "github.com/rudderlabs/rudder-transformer/go ") {
// version is like "v1.126.2-beta", strip "v" prefix and any pre-release suffix
v := strings.TrimPrefix(strings.Fields(line)[1], "v")
if i := strings.Index(v, "-"); i != -1 {
v = v[:i]
}
return v
}
}
t.Fatal("github.com/rudderlabs/rudder-transformer/go not found in go.mod")
return ""
}

func TestIntegrationWebhook(t *testing.T) {
ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
ctx, cancel := context.WithTimeout(ctx, 3*time.Minute)
Expand All @@ -68,7 +87,7 @@ func TestIntegrationWebhook(t *testing.T) {
})

g.Go(func() (err error) {
transformerContainer, err = transformertest.Setup(pool, t)
transformerContainer, err = transformertest.Setup(pool, t, transformertest.WithDockerImageTag(transformerModuleVersion(t)))
if err != nil {
return fmt.Errorf("starting transformer: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ require (
github.com/rudderlabs/rudder-go-kit v0.74.0
github.com/rudderlabs/rudder-observability-kit v0.0.6
github.com/rudderlabs/rudder-schemas v0.10.0
github.com/rudderlabs/rudder-transformer/go v1.122.0
github.com/rudderlabs/rudder-transformer/go v1.126.2-beta

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ItsSudip Is the github action not yet fixed ?

github.com/rudderlabs/sql-tunnels v0.1.7
github.com/rudderlabs/sqlconnect-go v1.20.3
github.com/samber/lo v1.52.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1211,8 +1211,8 @@ github.com/rudderlabs/rudder-observability-kit v0.0.6 h1:xIA/1Sp38B542EYzxR7qUfN
github.com/rudderlabs/rudder-observability-kit v0.0.6/go.mod h1:nR3GvY7HvuBaBqOKFfzLP9uYZu7OpzMqW2eeT2ikXtU=
github.com/rudderlabs/rudder-schemas v0.10.0 h1:OzhmoV3wIyZG4cH7RtixYEF23VwWaxyIGPKiVZ0XCT0=
github.com/rudderlabs/rudder-schemas v0.10.0/go.mod h1:mb7XxrdYj6iqz5GNbhv4cvmGjMGYgirpQnfm7WPXmyE=
github.com/rudderlabs/rudder-transformer/go v1.122.0 h1:XmCKiSbhj5SMRPPQzyXnx6RnP2FjPnZ4ARXGR4ByrPk=
github.com/rudderlabs/rudder-transformer/go v1.122.0/go.mod h1:3NGitPz4pYRRZ6Xt09S+8hb0tHK/9pZcKJ3OgOTaSmE=
github.com/rudderlabs/rudder-transformer/go v1.126.2-beta h1:HIu5s0RUFGNyQS5Z2tPIWyegkajFgPCTzOfK91gqt4Y=
github.com/rudderlabs/rudder-transformer/go v1.126.2-beta/go.mod h1:3NGitPz4pYRRZ6Xt09S+8hb0tHK/9pZcKJ3OgOTaSmE=
github.com/rudderlabs/sonnet v1.0.2 h1:nPfmDKD9gUwT571Dwtcsx0VIglSchvyNjuRLju4Xs3s=
github.com/rudderlabs/sonnet v1.0.2/go.mod h1:tjQmKEGAo/xwmhw9AwLkazP5b5m8VpUvWNzPSx4ve0g=
github.com/rudderlabs/sql-tunnels v0.1.7 h1:wDCRl6zY4M5gfWazf7XkSTGQS3yjBzUiUgEMBIfHNDA=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ import (
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var (
errAuthz = errors.New("snowpipe authorization error")
errBackoff = errors.New("snowpipe backoff error")
errAbort = errors.New("abort error")
)
var errAbort = errors.New("abort error")

// initializeChannelWithSchema creates a new channel for the given table if it doesn't exist.
// If the channel already exists, it checks for new columns and adds them to the table.
Expand Down Expand Up @@ -73,7 +69,7 @@ func (m *Manager) addColumns(ctx context.Context, namespace, tableName string, c
snowflakeManager.Cleanup(ctx)
}()
if err = snowflakeManager.AddColumns(ctx, tableName, columns); err != nil {
return fmt.Errorf("adding column: %w, %w", errAuthz, err)
return fmt.Errorf("adding column: %w, %w", errAbort, err)
}
return nil
}
Expand Down Expand Up @@ -125,7 +121,7 @@ func (m *Manager) createChannel(
m.channelCache.Store(tableName, resp)
return resp, nil
case internalapi.ErrValidationError, internalapi.ErrAuthenticationFailed, internalapi.ErrRoleDoesNotExistOrNotAuthorized, internalapi.ErrDatabaseDoesNotExistOrNotAuthorized:
return nil, fmt.Errorf("%w, %w", errAuthz, err)
return nil, fmt.Errorf("%w: validation or authorization error", errAbort)
default:
if resp.SnowflakeAPIHttpCode == internalapi.ApiStatusUnsupportedColumn {
return nil, fmt.Errorf("%w: creating channel with code %s, message: %s and error: %s", errAbort, resp.Code, resp.SnowflakeAPIMessage, resp.Error)
Expand Down Expand Up @@ -154,10 +150,10 @@ func (m *Manager) handleSchemaError(
snowflakeManager.Cleanup(ctx)
}()
if err := snowflakeManager.CreateSchema(ctx); err != nil {
return nil, fmt.Errorf("creating schema: %w, %w", errAuthz, err)
return nil, fmt.Errorf("creating schema: %w, %w", errAbort, err)
}
if err := snowflakeManager.CreateTable(ctx, channelReq.TableConfig.Table, eventSchema); err != nil {
return nil, fmt.Errorf("creating table: %w, %w", errAuthz, err)
return nil, fmt.Errorf("creating table: %w, %w", errAbort, err)
}
return m.api.CreateChannel(ctx, channelReq)
}
Expand All @@ -182,7 +178,7 @@ func (m *Manager) handleTableError(
snowflakeManager.Cleanup(ctx)
}()
if err := snowflakeManager.CreateTable(ctx, channelReq.TableConfig.Table, eventSchema); err != nil {
return nil, fmt.Errorf("creating table: %w, %w", errAuthz, err)
return nil, fmt.Errorf("creating table: %w, %w", errAbort, err)
}
return m.api.CreateChannel(ctx, channelReq)
}
Expand Down Expand Up @@ -227,9 +223,6 @@ func (m *Manager) deleteChannelFromCache(tableName string) {
}

func (m *Manager) createSnowflakeManager(ctx context.Context, namespace string) (manager.Manager, error) {
if m.isInBackoff() {
return nil, fmt.Errorf("skipping snowflake manager creation due to backoff with error %s: %w", m.backoff.error, errBackoff)
}
modelWarehouse := whutils.ModelWarehouse{
WorkspaceID: m.destination.WorkspaceID,
Destination: *m.destination,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v5"
"github.com/google/uuid"
"github.com/hashicorp/go-retryablehttp"
"github.com/samber/lo"
Expand All @@ -37,7 +36,6 @@ import (
"github.com/rudderlabs/rudder-server/utils/timeutil"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"github.com/rudderlabs/rudder-server/warehouse/validations"
)

func New(
Expand All @@ -59,7 +57,6 @@ func New(
now: timeutil.Now,
channelCache: sync.Map{},
polledImportInfoMap: make(map[string]*importInfo),
validator: validations.NewDestinationValidator(),
}

m.config.client.url = conf.GetStringVar("http://localhost:9078", "SnowpipeStreaming.Client.URL")
Expand All @@ -73,9 +70,6 @@ func New(
m.config.client.retryMax = conf.GetIntVar(5, 1, "SnowpipeStreaming.Client.retryMax")
m.config.instanceID = conf.GetStringVar("1", "INSTANCE_ID")
m.config.maxBufferCapacity = conf.GetReloadableInt64Var(512*bytesize.KB, bytesize.B, "SnowpipeStreaming.maxBufferCapacity")
m.config.backoff.initialInterval = conf.GetReloadableDurationVar(1, time.Second, "SnowpipeStreaming.backoffInitialIntervalInSeconds")
m.config.backoff.multiplier = conf.GetReloadableFloat64Var(2.0, "SnowpipeStreaming.backoffMultiplier")
m.config.backoff.maxInterval = conf.GetReloadableDurationVar(1, time.Hour, "SnowpipeStreaming.backoffMaxIntervalInHours")
m.config.stuckPipelineThreshold = conf.GetReloadableDurationVar(15, time.Minute, "SnowpipeStreaming.stuckPipelineThresholdInMinutes")

tags := stats.Tags{
Expand Down Expand Up @@ -148,14 +142,6 @@ func (m *Manager) retryableClient() *retryablehttp.Client {
return client
}

func (m *Manager) validateConfig(ctx context.Context, dest *backendconfig.DestinationT) error {
response := m.validator.Validate(ctx, dest)
if response.Success {
return nil
}
return errors.New(response.Error)
}

func (m *Manager) Now() time.Time {
return m.now()
}
Expand Down Expand Up @@ -199,19 +185,10 @@ func (m *Manager) Upload(ctx context.Context, asyncDest *common.AsyncDestination
obskit.Error(err),
)

switch {
case errors.Is(err, errAuthz):
m.setBackOff(err)
validationError := m.validateConfig(ctx, asyncDest.Destination)
if validationError != nil {
err = fmt.Errorf("failed to validate snowpipe credentials: %s", validationError.Error())
}
return m.failedJobs(asyncDest, err.Error())
case errors.Is(err, errBackoff):
return m.failedJobs(asyncDest, err.Error())
default:
return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
if errors.Is(err, errAbort) {
return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", errAbort).Error())
}
return m.failedJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
}
m.logger.Infon("Prepared discards channel")

Expand Down Expand Up @@ -248,8 +225,6 @@ func (m *Manager) Upload(ctx context.Context, asyncDest *common.AsyncDestination
abortedJobIDs []int64
abortReason string
)
shouldResetBackoff := true // backoff should be reset if authz error is not encountered for any of the tables
isBackoffSet := false // should not be set again if already set
for _, info := range uploadInfos {
fields := []logger.Field{
logger.NewStringField("table", info.tableName),
Expand All @@ -263,18 +238,6 @@ func (m *Manager) Upload(ctx context.Context, asyncDest *common.AsyncDestination
log.Warnn("Failed to send events to Snowpipe", obskit.Error(err))

switch {
case errors.Is(err, errAuthz):
shouldResetBackoff = false
if !isBackoffSet {
isBackoffSet = true
m.setBackOff(err)
validationError := m.validateConfig(ctx, asyncDest.Destination)
if validationError != nil && failedReason == "" {
failedReason = fmt.Sprintf("failed to validate snowpipe credentials: %s", validationError.Error())
}
}
case errors.Is(err, errBackoff):
shouldResetBackoff = false
case errors.Is(err, errAbort):
abortedJobIDs = append(abortedJobIDs, info.jobIDs...)
if abortReason == "" {
Expand All @@ -299,9 +262,6 @@ func (m *Manager) Upload(ctx context.Context, asyncDest *common.AsyncDestination
discardImportInfo.Offset = discardImInfo.Offset
}
}
if shouldResetBackoff {
m.resetBackoff()
}
if discardImportInfo != nil {
importInfos = append(importInfos, discardImportInfo)
}
Expand Down Expand Up @@ -988,36 +948,6 @@ func (m *Manager) GetUploadStats(input common.GetUploadStatsInput) common.GetUpl
}
}

func (m *Manager) isInBackoff() bool {
if m.backoff.next.IsZero() {
return false
}
return m.Now().Before(m.backoff.next)
}

func (m *Manager) resetBackoff() {
m.backoff.next = time.Time{}
m.backoff.attempts = 0
m.backoff.error = ""
}

func (m *Manager) setBackOff(err error) {
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = m.config.backoff.initialInterval.Load()
bo.Multiplier = m.config.backoff.multiplier.Load()
bo.MaxInterval = m.config.backoff.maxInterval.Load()
bo.RandomizationFactor = 0
bo.Reset()
m.backoff.attempts++
m.backoff.error = err.Error()

var d time.Duration
for index := int64(0); index < int64(m.backoff.attempts); index++ {
d = bo.NextBackOff()
}
m.backoff.next = m.Now().Add(d)
}

func buildCreateChannelRequest(destinationID, partition string, destConf *destConfig, tableName string) *model.CreateChannelRequest {
return &model.CreateChannelRequest{
RudderIdentifier: destinationID,
Expand Down
Loading
Loading