diff --git a/connection.go b/connection.go index fb249a9..9cdd0c2 100644 --- a/connection.go +++ b/connection.go @@ -126,6 +126,21 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name log, ctx = client.LoggerAndContext(ctx, exStmtResp) stagingErr := c.execStagingOperation(exStmtResp, ctx) + // Telemetry: track statement execution + var statementID string + if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil { + statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID) + ctx = c.telemetry.BeforeExecute(ctx, statementID) + defer func() { + finalErr := err + if stagingErr != nil { + finalErr = stagingErr + } + c.telemetry.AfterExecute(ctx, finalErr) + c.telemetry.CompleteStatement(ctx, statementID, finalErr != nil) + }() + } + if exStmtResp != nil && exStmtResp.OperationHandle != nil { // since we have an operation handle we can close the operation if necessary alreadyClosed := exStmtResp.DirectResults != nil && exStmtResp.DirectResults.CloseOperation != nil @@ -171,6 +186,17 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam log, ctx = client.LoggerAndContext(ctx, exStmtResp) defer log.Duration(msg, start) + // Telemetry: track statement execution + var statementID string + if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil { + statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID) + ctx = c.telemetry.BeforeExecute(ctx, statementID) + defer func() { + c.telemetry.AfterExecute(ctx, err) + c.telemetry.CompleteStatement(ctx, statementID, err != nil) + }() + } + if err != nil { log.Err(err).Msg("databricks: failed to run query") // To log query we need to redact credentials return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp) diff --git a/telemetry/DESIGN.md b/telemetry/DESIGN.md index 6408b36..121346c 100644 --- a/telemetry/DESIGN.md +++ b/telemetry/DESIGN.md @@ -2069,34 +2069,34 @@ func BenchmarkInterceptor_Disabled(b *testing.B) { - [x] Test server error handling - [x] Test unreachable server scenarios -### Phase 6: Collection & Aggregation (PECOBLR-1381) -- [ ] Implement `interceptor.go` for metric collection - - [ ] Implement beforeExecute() and afterExecute() hooks - - [ ] Implement context-based metric tracking with metricContext - - [ ] Implement latency measurement (startTime, latencyMs calculation) - - [ ] Add tag collection methods (addTag) - - [ ] Implement error swallowing with panic recovery -- [ ] Implement `aggregator.go` for batching - - [ ] Implement statement-level aggregation (statementMetrics) - - [ ] Implement batch size and flush interval logic - - [ ] Implement background flush goroutine (flushLoop) - - [ ] Add thread-safe metric recording - - [ ] Implement completeStatement() for final aggregation -- [ ] Implement error classification in `errors.go` - - [ ] Implement error type classification (terminal vs retryable) - - [ ] Implement HTTP status code classification - - [ ] Add error pattern matching - - [ ] Implement isTerminalError() function -- [ ] Update `client.go` to integrate aggregator - - [ ] Wire up aggregator with exporter - - [ ] Implement background flush timer - - [ ] Update start() and close() methods -- [ ] Add unit tests for collection and aggregation - - [ ] Test interceptor metric collection and latency tracking - - [ ] Test aggregation logic - - [ ] Test batch flushing (size-based and time-based) - - [ ] Test error classification - - [ ] Test client with aggregator integration +### Phase 6: Collection & Aggregation (PECOBLR-1381) ✅ COMPLETED +- [x] Implement `interceptor.go` for metric collection + - [x] Implement beforeExecute() and afterExecute() hooks + - [x] Implement context-based metric tracking with metricContext + - [x] Implement latency measurement (startTime, latencyMs calculation) + - [x] Add tag collection methods (addTag) + - [x] Implement error swallowing with panic recovery +- [x] Implement `aggregator.go` for batching + - [x] Implement statement-level aggregation (statementMetrics) + - [x] Implement batch size and flush interval logic + - [x] Implement background flush goroutine (flushLoop) + - [x] Add thread-safe metric recording + - [x] Implement completeStatement() for final aggregation +- [x] Implement error classification in `errors.go` + - [x] Implement error type classification (terminal vs retryable) + - [x] Implement HTTP status code classification + - [x] Add error pattern matching + - [x] Implement isTerminalError() function +- [x] Update `client.go` to integrate aggregator + - [x] Wire up aggregator with exporter + - [x] Implement background flush timer + - [x] Update start() and close() methods +- [x] Add unit tests for collection and aggregation + - [x] Test interceptor metric collection and latency tracking + - [x] Test aggregation logic + - [x] Test batch flushing (size-based and time-based) + - [x] Test error classification + - [x] Test client with aggregator integration ### Phase 7: Driver Integration ✅ COMPLETED - [x] Add telemetry initialization to `connection.go` @@ -2120,9 +2120,12 @@ func BenchmarkInterceptor_Disabled(b *testing.B) { - [x] Test compilation with telemetry - [x] Test no breaking changes to existing tests - [x] Test graceful handling when disabled - -Note: Statement execution hooks (beforeExecute/afterExecute in statement.go) for -actual metric collection can be added as follow-up enhancement. +- [x] Statement execution hooks + - [x] Add beforeExecute() hook to QueryContext + - [x] Add afterExecute() and completeStatement() hooks to QueryContext + - [x] Add beforeExecute() hook to ExecContext + - [x] Add afterExecute() and completeStatement() hooks to ExecContext + - [x] Use operation handle GUID as statement ID ### Phase 8: Testing & Validation - [ ] Run benchmark tests diff --git a/telemetry/interceptor.go b/telemetry/interceptor.go index 2af851d..4e38b4f 100644 --- a/telemetry/interceptor.go +++ b/telemetry/interceptor.go @@ -44,9 +44,10 @@ func getMetricContext(ctx context.Context) *metricContext { return nil } -// beforeExecute is called before statement execution. +// BeforeExecute is called before statement execution. // Returns a new context with metric tracking attached. -func (i *Interceptor) beforeExecute(ctx context.Context, statementID string) context.Context { +// Exported for use by the driver package. +func (i *Interceptor) BeforeExecute(ctx context.Context, statementID string) context.Context { if !i.enabled { return ctx } @@ -60,9 +61,10 @@ func (i *Interceptor) beforeExecute(ctx context.Context, statementID string) con return withMetricContext(ctx, mc) } -// afterExecute is called after statement execution. +// AfterExecute is called after statement execution. // Records the metric with timing and error information. -func (i *Interceptor) afterExecute(ctx context.Context, err error) { +// Exported for use by the driver package. +func (i *Interceptor) AfterExecute(ctx context.Context, err error) { if !i.enabled { return } @@ -96,8 +98,9 @@ func (i *Interceptor) afterExecute(ctx context.Context, err error) { i.aggregator.recordMetric(ctx, metric) } -// addTag adds a tag to the current metric context. -func (i *Interceptor) addTag(ctx context.Context, key string, value interface{}) { +// AddTag adds a tag to the current metric context. +// Exported for use by the driver package. +func (i *Interceptor) AddTag(ctx context.Context, key string, value interface{}) { if !i.enabled { return } @@ -129,8 +132,9 @@ func (i *Interceptor) recordConnection(ctx context.Context, tags map[string]inte i.aggregator.recordMetric(ctx, metric) } -// completeStatement marks a statement as complete and flushes aggregated metrics. -func (i *Interceptor) completeStatement(ctx context.Context, statementID string, failed bool) { +// CompleteStatement marks a statement as complete and flushes aggregated metrics. +// Exported for use by the driver package. +func (i *Interceptor) CompleteStatement(ctx context.Context, statementID string, failed bool) { if !i.enabled { return }