From 8cad3c9cf746829be11c1f52c994ddb1cc0c48ef Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Fri, 23 Jan 2026 09:44:55 -0600 Subject: [PATCH 1/3] [sdk-1551] feat: Provide access to HTTP headers on connect or error --- lib/ld-eventsource/client.rb | 26 ++- lib/ld-eventsource/errors.rb | 14 +- spec/headers_spec.rb | 397 +++++++++++++++++++++++++++++++++++ 3 files changed, 432 insertions(+), 5 deletions(-) create mode 100644 spec/headers_spec.rb diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index f226f32..eaae071 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -162,7 +162,7 @@ def initialize(uri, reconnect_reset_interval: reconnect_reset_interval) @first_attempt = true - @on = { event: ->(_) {}, error: ->(_) {} } + @on = { event: ->(_) {}, error: ->(_) {}, connect: ->(_) {} } @last_id = last_event_id @query_params_callback = nil @@ -207,6 +207,24 @@ def on_error(&action) @on[:error] = action end + # + # Specifies a block or Proc to be called when a successful connection is established. This will + # be called with a single parameter containing the HTTP response headers as a Hash. It is called + # from the same worker thread that reads the stream, so no more events will be dispatched until + # it returns. + # + # This is called every time a connection is successfully established, including on reconnections + # after a failure. It allows you to inspect server response headers such as rate limits, custom + # metadata, or fallback directives (e.g., `X-LD-FD-FALLBACK`). + # + # Any previously specified connect handler will be replaced. + # + # @yieldparam headers [Hash] the HTTP response headers from the successful connection + # + def on_connect(&action) + @on[:connect] = action + end + # # Specifies a block or Proc to generate query parameters dynamically. This will be called before # each connection attempt (both initial connection and reconnections), allowing you to update @@ -323,13 +341,15 @@ def connect uri = build_uri_with_query_params @logger.info { "Connecting to event stream at #{uri}" } cxn = @http_client.request(@method, uri, build_opts) + headers_hash = cxn.headers.to_h if cxn.status.code == 200 content_type = cxn.content_type.mime_type if content_type && content_type.start_with?("text/event-stream") + @on[:connect].call(headers_hash) # Notify connect callback with headers return cxn # we're good to proceed else reset_http - err = Errors::HTTPContentTypeError.new(content_type) + err = Errors::HTTPContentTypeError.new(content_type, headers_hash) @on[:error].call(err) @logger.warn { "Event source returned unexpected content type '#{content_type}'" } end @@ -337,7 +357,7 @@ def connect body = cxn.to_s # grab the whole response body in case it has error details reset_http @logger.info { "Server returned error status #{cxn.status.code}" } - err = Errors::HTTPStatusError.new(cxn.status.code, body) + err = Errors::HTTPStatusError.new(cxn.status.code, body, headers_hash) @on[:error].call(err) end rescue diff --git a/lib/ld-eventsource/errors.rb b/lib/ld-eventsource/errors.rb index 6fe05ba..8807341 100644 --- a/lib/ld-eventsource/errors.rb +++ b/lib/ld-eventsource/errors.rb @@ -9,9 +9,10 @@ module Errors # handler specified in {Client#on_error}. # class HTTPStatusError < StandardError - def initialize(status, message) + def initialize(status, message, headers = nil) @status = status @message = message + @headers = headers super("HTTP error #{status}") end @@ -22,6 +23,10 @@ def initialize(status, message) # The response body, if any. # @return [String] attr_reader :message + + # The HTTP response headers, if any. + # @return [Hash, nil] + attr_reader :headers end # @@ -29,14 +34,19 @@ def initialize(status, message) # handler specified in {Client#on_error}. # class HTTPContentTypeError < StandardError - def initialize(type) + def initialize(type, headers = nil) @content_type = type + @headers = headers super("invalid content type \"#{type}\"") end # The HTTP content type. # @return [String] attr_reader :type + + # The HTTP response headers, if any. + # @return [Hash, nil] + attr_reader :headers end # diff --git a/spec/headers_spec.rb b/spec/headers_spec.rb new file mode 100644 index 0000000..6a3c2bd --- /dev/null +++ b/spec/headers_spec.rb @@ -0,0 +1,397 @@ +require "ld-eventsource" +require "http_stub" + +# +# Tests for HTTP header exposure across all connection states +# +describe "Header Exposure" do + before(:each) do + skip("end-to-end HTTP tests are disabled because they're unreliable on this platform") unless stub_http_server_available? + end + + subject { SSE::Client } + + let(:reconnect_asap) { 0.01 } + + def with_client(client) + begin + yield client + ensure + client.close + end + end + + def send_stream_content(res, content, keep_open:) + res.content_type = "text/event-stream" + res.status = 200 + res.chunked = true + if keep_open + rd, wr = IO.pipe + res.body = rd + wr.write(content) + wr + else + res.body = proc { |out| out.write(content) } + nil + end + end + + describe "HTTPStatusError" do + it "exposes headers on error responses" do + with_server do |server| + server.setup_response("/") do |req,res| + res.status = 401 + res['X-Custom-Header'] = 'custom-value' + res['X-LD-FD-Fallback'] = 'true' + res.body = "unauthorized" + res.keep_alive = false + end + + error_sink = Queue.new + client = subject.new(server.base_uri, retry_enabled: false) do |c| + c.on_error { |error| error_sink << error } + end + + sleep(0.25) # Give time for error to occur + client.close + + error = error_sink.pop + expect(error).to be_a(SSE::Errors::HTTPStatusError) + expect(error.status).to eq(401) + expect(error.headers).to be_a(Hash) + expect(error.headers['x-custom-header']).to eq('custom-value') + expect(error.headers['x-ld-fd-fallback']).to eq('true') + end + end + + it "handles nil headers gracefully (backward compatibility)" do + # Create error without headers (old-style) + error = SSE::Errors::HTTPStatusError.new(500, "error") + expect(error.headers).to be_nil + end + + it "can extract FDv1 fallback header from error response" do + with_server do |server| + server.setup_response("/") do |req,res| + res.status = 503 + res['X-LaunchDarkly-FD-Fallback'] = '1' + res.body = "service unavailable" + res.keep_alive = false + end + + error_sink = Queue.new + client = subject.new(server.base_uri, retry_enabled: false) do |c| + c.on_error do |error| + error_sink << error + end + end + + sleep(0.25) + client.close + + error = error_sink.pop + expect(error.headers).not_to be_nil + # Headers should be case-insensitive accessible + fallback_header = error.headers.find { |k, v| k.downcase == 'x-launchdarkly-fd-fallback' } + expect(fallback_header).not_to be_nil + expect(fallback_header[1]).to eq('1') + end + end + end + + describe "HTTPContentTypeError" do + it "exposes headers on content type errors" do + with_server do |server| + server.setup_response("/") do |req,res| + res.status = 200 + res.content_type = "application/json" + res['X-Custom-Header'] = 'test-value' + res['X-LD-FD-Fallback'] = 'true' + res.body = '{"error": "wrong content type"}' + res.keep_alive = false + end + + error_sink = Queue.new + client = subject.new(server.base_uri, retry_enabled: false) do |c| + c.on_error { |error| error_sink << error } + end + + sleep(0.25) + client.close + + error = error_sink.pop + expect(error).to be_a(SSE::Errors::HTTPContentTypeError) + expect(error.headers).to be_a(Hash) + expect(error.headers['x-custom-header']).to eq('test-value') + expect(error.headers['x-ld-fd-fallback']).to eq('true') + end + end + + it "handles nil headers gracefully (backward compatibility)" do + # Create error without headers (old-style) + error = SSE::Errors::HTTPContentTypeError.new("text/html") + expect(error.headers).to be_nil + end + + it "can extract FDv1 fallback header from content type error" do + with_server do |server| + server.setup_response("/") do |req,res| + res.status = 200 + res.content_type = "text/plain" + res['X-LaunchDarkly-FD-Fallback'] = '1' + res.body = "wrong type" + res.keep_alive = false + end + + error_sink = Queue.new + client = subject.new(server.base_uri, retry_enabled: false) do |c| + c.on_error { |error| error_sink << error } + end + + sleep(0.25) + client.close + + error = error_sink.pop + expect(error.headers).not_to be_nil + fallback_header = error.headers.find { |k, v| k.downcase == 'x-launchdarkly-fd-fallback' } + expect(fallback_header).not_to be_nil + end + end + end + + describe "on_connect callback" do + it "exposes headers on successful connection" do + with_server do |server| + server.setup_response("/") do |req,res| + res.content_type = "text/event-stream" + res.status = 200 + res['X-Custom-Header'] = 'success-value' + res['X-LD-Env-Id'] = 'test-env-123' + res.chunked = true + res.body = proc { |out| out.write("data: test\n\n") } + end + + connect_sink = Queue.new + client = subject.new(server.base_uri) do |c| + c.on_connect { |headers| connect_sink << headers } + end + + with_client(client) do |_| + headers = connect_sink.pop + expect(headers).to be_a(Hash) + expect(headers['x-custom-header']).to eq('success-value') + expect(headers['x-ld-env-id']).to eq('test-env-123') + end + end + end + + it "fires on_connect before first event" do + with_server do |server| + server.setup_response("/") do |req,res| + send_stream_content(res, "data: test\n\n", keep_open: true) + end + + order_sink = Queue.new + client = subject.new(server.base_uri) do |c| + c.on_connect { |headers| order_sink << [:connect, headers] } + c.on_event { |event| order_sink << [:event, event] } + end + + with_client(client) do |_| + first = order_sink.pop + expect(first[0]).to eq(:connect) + expect(first[1]).to be_a(Hash) + + second = order_sink.pop + expect(second[0]).to eq(:event) + end + end + end + + it "fires on_connect on reconnection" do + with_server do |server| + attempt = 0 + server.setup_response("/") do |req,res| + attempt += 1 + res['X-Attempt'] = attempt.to_s + if attempt == 1 + send_stream_content(res, "data: first\n\n", keep_open: false) + else + send_stream_content(res, "data: second\n\n", keep_open: true) + end + end + + connect_sink = Queue.new + client = subject.new(server.base_uri, reconnect_time: reconnect_asap) do |c| + c.on_connect { |headers| connect_sink << headers } + end + + with_client(client) do |_| + headers1 = connect_sink.pop + expect(headers1['x-attempt']).to eq('1') + + headers2 = connect_sink.pop + expect(headers2['x-attempt']).to eq('2') + end + end + end + + it "can extract FDv1 fallback header from successful connection" do + with_server do |server| + server.setup_response("/") do |req,res| + res.content_type = "text/event-stream" + res.status = 200 + res['X-LaunchDarkly-FD-Fallback'] = '1' + res.chunked = true + res.body = proc { |out| out.write("data: test\n\n") } + end + + connect_sink = Queue.new + client = subject.new(server.base_uri) do |c| + c.on_connect { |headers| connect_sink << headers } + end + + with_client(client) do |_| + headers = connect_sink.pop + fallback_header = headers.find { |k, v| k.downcase == 'x-launchdarkly-fd-fallback' } + expect(fallback_header).not_to be_nil + expect(fallback_header[1]).to eq('1') + end + end + end + + it "works when on_connect handler is not set (backward compatibility)" do + with_server do |server| + server.setup_response("/") do |req,res| + send_stream_content(res, "data: test\n\n", keep_open: true) + end + + event_sink = Queue.new + # No on_connect handler specified + client = subject.new(server.base_uri) do |c| + c.on_event { |event| event_sink << event } + end + + with_client(client) do |_| + event = event_sink.pop + expect(event.data).to eq("test") + end + end + end + + it "does not block event processing if on_connect raises exception" do + with_server do |server| + server.setup_response("/") do |req,res| + send_stream_content(res, "data: test\n\n", keep_open: true) + end + + event_sink = Queue.new + client = subject.new(server.base_uri) do |c| + c.on_connect { |headers| raise "Test exception in on_connect" } + c.on_event { |event| event_sink << event } + end + + # Exception in on_connect should be caught and logged, but not prevent events + # This behavior depends on error handling implementation + begin + with_client(client) do |_| + # If exception is caught, events should still flow + # If exception propagates, the test will fail appropriately + sleep(0.25) + end + rescue => e + # Exception in on_connect should cause connection to fail and retry + expect(e.message).to include("Test exception") + end + end + end + end + + describe "Combined scenarios" do + it "exposes headers in both error and success scenarios" do + with_server do |server| + attempt = 0 + server.setup_response("/") do |req,res| + attempt += 1 + if attempt == 1 + # First attempt: error with headers + res.status = 503 + res['X-Status'] = 'error' + res['X-Attempt'] = '1' + res.body = "service unavailable" + res.keep_alive = false + else + # Second attempt: success with headers + res.status = 200 + res.content_type = "text/event-stream" + res['X-Status'] = 'success' + res['X-Attempt'] = '2' + res.chunked = true + res.body = proc { |out| out.write("data: recovered\n\n") } + end + end + + error_sink = Queue.new + connect_sink = Queue.new + event_sink = Queue.new + + client = subject.new(server.base_uri, reconnect_time: reconnect_asap) do |c| + c.on_error { |error| error_sink << error } + c.on_connect { |headers| connect_sink << headers } + c.on_event { |event| event_sink << event } + end + + with_client(client) do |_| + # First: error with headers + error = error_sink.pop + expect(error.headers['x-status']).to eq('error') + expect(error.headers['x-attempt']).to eq('1') + + # Then: successful connection with headers + headers = connect_sink.pop + expect(headers['x-status']).to eq('success') + expect(headers['x-attempt']).to eq('2') + + # Finally: event + event = event_sink.pop + expect(event.data).to eq('recovered') + end + end + end + + it "handles all three callback types without conflicts" do + with_server do |server| + server.setup_response("/") do |req,res| + res.status = 200 + res.content_type = "text/event-stream" + res['X-Test'] = 'multi-callback' + res.chunked = true + res.body = proc { |out| out.write("data: message\n\n") } + end + + callbacks_fired = [] + + client = subject.new(server.base_uri) do |c| + c.on_connect do |headers| + callbacks_fired << :connect + expect(headers['x-test']).to eq('multi-callback') + end + + c.on_event do |event| + callbacks_fired << :event + expect(event.data).to eq('message') + end + + c.on_error do |error| + callbacks_fired << :error + end + end + + with_client(client) do |_| + sleep(0.25) # Give time for callbacks to fire + expect(callbacks_fired).to eq([:connect, :event]) + end + end + end + end +end From 16fe163d037170b6039bbe0a5fb90883e535b73b Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Fri, 23 Jan 2026 16:08:13 +0000 Subject: [PATCH 2/3] return http::headers to ensure case-insensitive lookups --- lib/ld-eventsource/client.rb | 12 ++++++------ lib/ld-eventsource/errors.rb | 4 ++-- spec/headers_spec.rb | 14 +++++++------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index eaae071..0ada447 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -209,7 +209,7 @@ def on_error(&action) # # Specifies a block or Proc to be called when a successful connection is established. This will - # be called with a single parameter containing the HTTP response headers as a Hash. It is called + # be called with a single parameter containing the HTTP response headers. It is called # from the same worker thread that reads the stream, so no more events will be dispatched until # it returns. # @@ -219,7 +219,7 @@ def on_error(&action) # # Any previously specified connect handler will be replaced. # - # @yieldparam headers [Hash] the HTTP response headers from the successful connection + # @yieldparam headers [HTTP::Headers] the HTTP response headers from the successful connection. # def on_connect(&action) @on[:connect] = action @@ -341,15 +341,15 @@ def connect uri = build_uri_with_query_params @logger.info { "Connecting to event stream at #{uri}" } cxn = @http_client.request(@method, uri, build_opts) - headers_hash = cxn.headers.to_h + headers = cxn.headers if cxn.status.code == 200 content_type = cxn.content_type.mime_type if content_type && content_type.start_with?("text/event-stream") - @on[:connect].call(headers_hash) # Notify connect callback with headers + @on[:connect].call(headers) # Notify connect callback with headers return cxn # we're good to proceed else reset_http - err = Errors::HTTPContentTypeError.new(content_type, headers_hash) + err = Errors::HTTPContentTypeError.new(content_type, headers) @on[:error].call(err) @logger.warn { "Event source returned unexpected content type '#{content_type}'" } end @@ -357,7 +357,7 @@ def connect body = cxn.to_s # grab the whole response body in case it has error details reset_http @logger.info { "Server returned error status #{cxn.status.code}" } - err = Errors::HTTPStatusError.new(cxn.status.code, body, headers_hash) + err = Errors::HTTPStatusError.new(cxn.status.code, body, headers) @on[:error].call(err) end rescue diff --git a/lib/ld-eventsource/errors.rb b/lib/ld-eventsource/errors.rb index 8807341..671d980 100644 --- a/lib/ld-eventsource/errors.rb +++ b/lib/ld-eventsource/errors.rb @@ -25,7 +25,7 @@ def initialize(status, message, headers = nil) attr_reader :message # The HTTP response headers, if any. - # @return [Hash, nil] + # @return [HTTP::Headers, nil] attr_reader :headers end @@ -45,7 +45,7 @@ def initialize(type, headers = nil) attr_reader :type # The HTTP response headers, if any. - # @return [Hash, nil] + # @return [HTTP::Headers, nil] attr_reader :headers end diff --git a/spec/headers_spec.rb b/spec/headers_spec.rb index 6a3c2bd..007f25b 100644 --- a/spec/headers_spec.rb +++ b/spec/headers_spec.rb @@ -58,7 +58,7 @@ def send_stream_content(res, content, keep_open:) error = error_sink.pop expect(error).to be_a(SSE::Errors::HTTPStatusError) expect(error.status).to eq(401) - expect(error.headers).to be_a(Hash) + expect(error.headers).to be_a(HTTP::Headers) expect(error.headers['x-custom-header']).to eq('custom-value') expect(error.headers['x-ld-fd-fallback']).to eq('true') end @@ -92,7 +92,7 @@ def send_stream_content(res, content, keep_open:) error = error_sink.pop expect(error.headers).not_to be_nil # Headers should be case-insensitive accessible - fallback_header = error.headers.find { |k, v| k.downcase == 'x-launchdarkly-fd-fallback' } + fallback_header = error.headers.detect { |k, v| k.downcase == 'x-launchdarkly-fd-fallback' } expect(fallback_header).not_to be_nil expect(fallback_header[1]).to eq('1') end @@ -121,7 +121,7 @@ def send_stream_content(res, content, keep_open:) error = error_sink.pop expect(error).to be_a(SSE::Errors::HTTPContentTypeError) - expect(error.headers).to be_a(Hash) + expect(error.headers).to be_a(HTTP::Headers) expect(error.headers['x-custom-header']).to eq('test-value') expect(error.headers['x-ld-fd-fallback']).to eq('true') end @@ -153,7 +153,7 @@ def send_stream_content(res, content, keep_open:) error = error_sink.pop expect(error.headers).not_to be_nil - fallback_header = error.headers.find { |k, v| k.downcase == 'x-launchdarkly-fd-fallback' } + fallback_header = error.headers.detect { |k, v| k.downcase == 'x-launchdarkly-fd-fallback' } expect(fallback_header).not_to be_nil end end @@ -178,7 +178,7 @@ def send_stream_content(res, content, keep_open:) with_client(client) do |_| headers = connect_sink.pop - expect(headers).to be_a(Hash) + expect(headers).to be_a(HTTP::Headers) expect(headers['x-custom-header']).to eq('success-value') expect(headers['x-ld-env-id']).to eq('test-env-123') end @@ -200,7 +200,7 @@ def send_stream_content(res, content, keep_open:) with_client(client) do |_| first = order_sink.pop expect(first[0]).to eq(:connect) - expect(first[1]).to be_a(Hash) + expect(first[1]).to be_a(HTTP::Headers) second = order_sink.pop expect(second[0]).to eq(:event) @@ -253,7 +253,7 @@ def send_stream_content(res, content, keep_open:) with_client(client) do |_| headers = connect_sink.pop - fallback_header = headers.find { |k, v| k.downcase == 'x-launchdarkly-fd-fallback' } + fallback_header = headers.detect { |k, v| k.downcase == 'x-launchdarkly-fd-fallback' } expect(fallback_header).not_to be_nil expect(fallback_header[1]).to eq('1') end From 1a7119ae0e0c68a42969e540aa712437b229134c Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Fri, 23 Jan 2026 16:23:35 +0000 Subject: [PATCH 3/3] type to a hash --- lib/ld-eventsource/client.rb | 4 +++- lib/ld-eventsource/errors.rb | 10 ++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index 0ada447..ea5f1bf 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -219,7 +219,9 @@ def on_error(&action) # # Any previously specified connect handler will be replaced. # - # @yieldparam headers [HTTP::Headers] the HTTP response headers from the successful connection. + # @yieldparam headers [Hash, nil] the HTTP response headers from the successful connection, + # or nil if not available. The headers object uses case-insensitive keys (via the http gem's + # HTTP::Headers). # def on_connect(&action) @on[:connect] = action diff --git a/lib/ld-eventsource/errors.rb b/lib/ld-eventsource/errors.rb index 671d980..612c2da 100644 --- a/lib/ld-eventsource/errors.rb +++ b/lib/ld-eventsource/errors.rb @@ -25,7 +25,10 @@ def initialize(status, message, headers = nil) attr_reader :message # The HTTP response headers, if any. - # @return [HTTP::Headers, nil] + # + # The headers object uses case-insensitive keys (via the http gem's HTTP::Headers). + # + # @return [Hash, nil] the response headers, or nil if not available attr_reader :headers end @@ -45,7 +48,10 @@ def initialize(type, headers = nil) attr_reader :type # The HTTP response headers, if any. - # @return [HTTP::Headers, nil] + # + # The headers object uses case-insensitive keys (via the http gem's HTTP::Headers). + # + # @return [Hash, nil] the response headers, or nil if not available attr_reader :headers end