Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,21 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
stagingErr := c.execStagingOperation(exStmtResp, ctx)

// Telemetry: track statement execution
var statementID string
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
ctx = c.telemetry.BeforeExecute(ctx, statementID)
defer func() {
finalErr := err
if stagingErr != nil {
finalErr = stagingErr
}
c.telemetry.AfterExecute(ctx, finalErr)
c.telemetry.CompleteStatement(ctx, statementID, finalErr != nil)
}()
}

if exStmtResp != nil && exStmtResp.OperationHandle != nil {
// since we have an operation handle we can close the operation if necessary
alreadyClosed := exStmtResp.DirectResults != nil && exStmtResp.DirectResults.CloseOperation != nil
Expand Down Expand Up @@ -171,6 +186,17 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
defer log.Duration(msg, start)

// Telemetry: track statement execution
var statementID string
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
ctx = c.telemetry.BeforeExecute(ctx, statementID)
defer func() {
c.telemetry.AfterExecute(ctx, err)
c.telemetry.CompleteStatement(ctx, statementID, err != nil)
}()
}

if err != nil {
log.Err(err).Msg("databricks: failed to run query") // To log query we need to redact credentials
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
Expand Down
65 changes: 34 additions & 31 deletions telemetry/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -2069,34 +2069,34 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
- [x] Test server error handling
- [x] Test unreachable server scenarios

### Phase 6: Collection & Aggregation (PECOBLR-1381)
- [ ] Implement `interceptor.go` for metric collection
- [ ] Implement beforeExecute() and afterExecute() hooks
- [ ] Implement context-based metric tracking with metricContext
- [ ] Implement latency measurement (startTime, latencyMs calculation)
- [ ] Add tag collection methods (addTag)
- [ ] Implement error swallowing with panic recovery
- [ ] Implement `aggregator.go` for batching
- [ ] Implement statement-level aggregation (statementMetrics)
- [ ] Implement batch size and flush interval logic
- [ ] Implement background flush goroutine (flushLoop)
- [ ] Add thread-safe metric recording
- [ ] Implement completeStatement() for final aggregation
- [ ] Implement error classification in `errors.go`
- [ ] Implement error type classification (terminal vs retryable)
- [ ] Implement HTTP status code classification
- [ ] Add error pattern matching
- [ ] Implement isTerminalError() function
- [ ] Update `client.go` to integrate aggregator
- [ ] Wire up aggregator with exporter
- [ ] Implement background flush timer
- [ ] Update start() and close() methods
- [ ] Add unit tests for collection and aggregation
- [ ] Test interceptor metric collection and latency tracking
- [ ] Test aggregation logic
- [ ] Test batch flushing (size-based and time-based)
- [ ] Test error classification
- [ ] Test client with aggregator integration
### Phase 6: Collection & Aggregation (PECOBLR-1381) ✅ COMPLETED
- [x] Implement `interceptor.go` for metric collection
- [x] Implement beforeExecute() and afterExecute() hooks
- [x] Implement context-based metric tracking with metricContext
- [x] Implement latency measurement (startTime, latencyMs calculation)
- [x] Add tag collection methods (addTag)
- [x] Implement error swallowing with panic recovery
- [x] Implement `aggregator.go` for batching
- [x] Implement statement-level aggregation (statementMetrics)
- [x] Implement batch size and flush interval logic
- [x] Implement background flush goroutine (flushLoop)
- [x] Add thread-safe metric recording
- [x] Implement completeStatement() for final aggregation
- [x] Implement error classification in `errors.go`
- [x] Implement error type classification (terminal vs retryable)
- [x] Implement HTTP status code classification
- [x] Add error pattern matching
- [x] Implement isTerminalError() function
- [x] Update `client.go` to integrate aggregator
- [x] Wire up aggregator with exporter
- [x] Implement background flush timer
- [x] Update start() and close() methods
- [x] Add unit tests for collection and aggregation
- [x] Test interceptor metric collection and latency tracking
- [x] Test aggregation logic
- [x] Test batch flushing (size-based and time-based)
- [x] Test error classification
- [x] Test client with aggregator integration

### Phase 7: Driver Integration ✅ COMPLETED
- [x] Add telemetry initialization to `connection.go`
Expand All @@ -2120,9 +2120,12 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
- [x] Test compilation with telemetry
- [x] Test no breaking changes to existing tests
- [x] Test graceful handling when disabled

Note: Statement execution hooks (beforeExecute/afterExecute in statement.go) for
actual metric collection can be added as follow-up enhancement.
- [x] Statement execution hooks
- [x] Add beforeExecute() hook to QueryContext
- [x] Add afterExecute() and completeStatement() hooks to QueryContext
- [x] Add beforeExecute() hook to ExecContext
- [x] Add afterExecute() and completeStatement() hooks to ExecContext
- [x] Use operation handle GUID as statement ID

### Phase 8: Testing & Validation
- [ ] Run benchmark tests
Expand Down
20 changes: 12 additions & 8 deletions telemetry/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ func getMetricContext(ctx context.Context) *metricContext {
return nil
}

// beforeExecute is called before statement execution.
// BeforeExecute is called before statement execution.
// Returns a new context with metric tracking attached.
func (i *Interceptor) beforeExecute(ctx context.Context, statementID string) context.Context {
// Exported for use by the driver package.
func (i *Interceptor) BeforeExecute(ctx context.Context, statementID string) context.Context {
if !i.enabled {
return ctx
}
Expand All @@ -60,9 +61,10 @@ func (i *Interceptor) beforeExecute(ctx context.Context, statementID string) con
return withMetricContext(ctx, mc)
}

// afterExecute is called after statement execution.
// AfterExecute is called after statement execution.
// Records the metric with timing and error information.
func (i *Interceptor) afterExecute(ctx context.Context, err error) {
// Exported for use by the driver package.
func (i *Interceptor) AfterExecute(ctx context.Context, err error) {
if !i.enabled {
return
}
Expand Down Expand Up @@ -96,8 +98,9 @@ func (i *Interceptor) afterExecute(ctx context.Context, err error) {
i.aggregator.recordMetric(ctx, metric)
}

// addTag adds a tag to the current metric context.
func (i *Interceptor) addTag(ctx context.Context, key string, value interface{}) {
// AddTag adds a tag to the current metric context.
// Exported for use by the driver package.
func (i *Interceptor) AddTag(ctx context.Context, key string, value interface{}) {
if !i.enabled {
return
}
Expand Down Expand Up @@ -129,8 +132,9 @@ func (i *Interceptor) recordConnection(ctx context.Context, tags map[string]inte
i.aggregator.recordMetric(ctx, metric)
}

// completeStatement marks a statement as complete and flushes aggregated metrics.
func (i *Interceptor) completeStatement(ctx context.Context, statementID string, failed bool) {
// CompleteStatement marks a statement as complete and flushes aggregated metrics.
// Exported for use by the driver package.
func (i *Interceptor) CompleteStatement(ctx context.Context, statementID string, failed bool) {
if !i.enabled {
return
}
Expand Down
Loading