feat(pubsub): implement streaming keep-alive logic#34653
Conversation
cf5df9b to
b1acc8a
Compare
94e9f14 to
ad59cd2
Compare
robertvoinescu-work
left a comment
There was a problem hiding this comment.
LGTM from a functional perspective. Just a few minor comments.
ad59cd2 to
ca925d3
Compare
ca925d3 to
e53a9bd
Compare
|
Great PR! ❤️ Three things for you to look into: 1. Unpause Race ConditionIn the monitor task check, you disable the liveness checker when flow control pauses the stream ( However, there is a potential race condition when the stream is unpaused:
Assuming a background thread wake-up and gRPC read delay of around 10ms alongside a 6-second monitor interval, there is roughly a 0.17% chance of encountering this race on any given unpause event. In a busy production environment where streams pause and unpause frequently, this could potentially scale up to thousands of false-positive stream restarts daily in aggregate. That in turn might lead to duplicate message redeliveries, CPU overhead from renegotiations, and unnecessary warning logs. Suggested Fix: Inside def unpause_streaming!
synchronize do
return unless unpause_streaming?
@paused = nil
@last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
subscriber.service.logger.log :info, "subscriber-flow-control" do
"subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control"
end
# signal to the background thread that we are unpaused
@pause_cond.broadcast
end
endThis lock-safe reset ensures that the "last pong" appears newer than the stale ping sent before/during the pause, preventing immediate false-positive restarts while the background thread wakes up to read the buffered pongs. If the stream was indeed dead, the detection will proceed as follows:
This delay of at most 15–45 seconds in detecting a dead connection after a long pause is a negligible trade-off to eliminate false-positive restarts. 2. TimerTask Lifecycle & TestabilityCurrently, There are two main issues with this setup:
Suggested Fix: To resolve both issues:
For example: def start
synchronize do
break if @background_thread
@inventory.start
@stream_keepalive_task = Concurrent::TimerTask.new(
execution_interval: @keepalive_interval
) { send_keepalive_ping! }
@stream_keepalive_task.execute
@pong_monitor_task = Concurrent::TimerTask.new(
execution_interval: [@keepalive_interval / 5.0, 0.01].max
) { check_liveness! }
@pong_monitor_task.execute
start_streaming!
end
self
end
def stop
synchronize do
break if @stopped
# ... other stop logic ...
@stream_keepalive_task&.shutdown
@stream_keepalive_task = nil
@pong_monitor_task&.shutdown
@pong_monitor_task = nil
end
end
def send_keepalive_ping!
synchronize do
if @stream_opened && !@stopped && @request_queue
subscriber.service.logger.log :info, "subscriber-streams" do
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
end
@last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
end
end
end
def check_liveness!
synchronize do
if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at
subscriber.service.logger.log :error, "subscriber-streams" do
"Keep-alive pong not received within #{@pong_deadline}s; restarting stream."
end
@stream_opened = false
@background_thread&.raise RestartStream
end
end
end
endBy extracting the execution logic to private methods 3. Test Coverage & CI FlakinessThe current tests in Suggested testing strategy:
Additionally, we currently have no coverage for the edge cases described above (the unpause race condition, or the reusability of the timer tasks). Once the logic is refactored to the Here is an example for the unpause race condition: it "does not trigger false restarts on unpausing even if pause exceeded deadline" do
# Setup: stream opened, ping sent 30s ago, pong received 35s ago (stale), paused.
stream = listener.stream_pool.first
stream.instance_variable_set :@stream_opened, true
stream.instance_variable_set :@last_ping_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) - 30.0
stream.instance_variable_set :@last_pong_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) - 35.0
stream.instance_variable_set :@paused, true
stream.instance_variable_set :@stopped, false
# 1. When paused, liveness checker should skip verification.
stream.check_liveness!
assert stream.instance_variable_get(:@stream_opened) # Stream remains open
# 2. Unpausing must reset @last_pong_at to prevent immediate restart.
stream.unpause_streaming!
# 3. Direct liveness check immediately after unpausing (simulates the monitor thread racing the background thread).
# It should NOT close the stream because our reset made @last_pong_at newer than @last_ping_at.
stream.check_liveness!
assert stream.instance_variable_get(:@stream_opened) # Stream remains open, no restart!
endAnd here is an example for the it "re-creates and executes timer tasks if stopped and restarted" do
stream = listener.stream_pool.first
# 1. Timers should be active on a started stream
refute_nil stream.instance_variable_get(:@stream_keepalive_task)
refute_nil stream.instance_variable_get(:@pong_monitor_task)
first_keepalive = stream.instance_variable_get(:@stream_keepalive_task)
first_monitor = stream.instance_variable_get(:@pong_monitor_task)
# 2. Stopping the stream must shutdown and nilify the timers
stream.stop
assert_nil stream.instance_variable_get(:@stream_keepalive_task)
assert_nil stream.instance_variable_get(:@pong_monitor_task)
assert first_keepalive.shutdown?
assert first_monitor.shutdown?
# 3. Simulating a restart should create completely new timer instances
stream.instance_variable_set :@background_thread, nil
stream.instance_variable_set :@stopped, false
stream.start
new_keepalive = stream.instance_variable_get(:@stream_keepalive_task)
new_monitor = stream.instance_variable_get(:@pong_monitor_task)
refute_nil new_keepalive
refute_nil new_monitor
refute_equal first_keepalive, new_keepalive
refute_equal first_monitor, new_monitor
end |
- Wrap unpause_streaming! in synchronize block and reset @last_pong_at to prevent false-positive stream restarts upon unblocking flow control. - Extract send_keepalive_ping! and check_liveness! helper methods on Stream and re-create TimerTask instances on start/stop to support clean restarts and deterministic unit testing. - Overhaul keepalive_test.rb with synchronous unit test coverage for timer helpers, unpause race condition, and TimerTask re-creation, while increasing CI polling timeouts to prevent flakiness.
26bcbb1 to
24055fd
Compare
…g real-time threads - Remove listener.start from synchronous unit tests (test_0003, test_0004, test_0005) so they execute on standalone Stream instances without spawning background threads or real-time TimerTask loops. - Isolate ENV['PUBSUB_TEST_PONG_DEADLINE'] = '0.05' from global before block so short 50ms timeouts only apply during asynchronous integration tests, eliminating flakiness on CI in slow build environments (Ubuntu Ruby 3.2 and Windows Ruby 4.0).
@quartzmo Thanks a ton for the thorough review; you identified several areas where I was able to make significant reliability improvements and clarify the keep-alive API. I addressed all 3 points that you identified; of note, I refactored the ping and pong logic into two private methods and added tests on those private methods explicitly in 'keepalive_test.rb', reset '@last_pong_at' when we unpause, and initialize the timerTasks as instance variables that we set/clear in |
| push Google::Cloud::PubSub::V1::StreamingPullRequest.new | ||
| end | ||
| end.execute | ||
| @keepalive_interval = Float(ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] || 30) |
There was a problem hiding this comment.
I don't think we should be exposing these test-only environment variables (e.g. PUBSUB_TEST_KEEPALIVE_INTERVA) in production code.
Stream already has attr_accessor :keepalive_interval, :pong_deadline, we can customize these attributes directly on the stream objects in our test before calling listener.start
| @@ -48,6 +48,7 @@ class Stream | |||
| ## | |||
| # @private exactly_once_delivery_enabled. | |||
| attr_reader :exactly_once_delivery_enabled | |||
| attr_accessor :keepalive_interval, :pong_deadline | |||
There was a problem hiding this comment.
Please look at other examples in our libraries, but I believe we should split these into multiple lines. It would be great to add docstrings as well.
| let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) } | ||
| let(:subscriber) { Google::Cloud::PubSub::Subscriber.from_grpc sub_grpc, pubsub.service } | ||
|
|
||
| it "b/528401453: waits for exponential backoff before retrying on GRPC::Unavailable" do |
There was a problem hiding this comment.
I'd rather not leave the bug IDs unless needed. We should only really use as reminder for follow-ups.
| @stream_keepalive_task.execute | ||
|
|
||
| @pong_monitor_task = Concurrent::TimerTask.new( | ||
| execution_interval: [@keepalive_interval / 5.0, 0.01].max |
There was a problem hiding this comment.
It would be great to have some super light comments on logic / why we use certain values.
| subscriber.service.logger.log :info, "subscriber-flow-control" do | ||
| "subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control" | ||
| end | ||
| # signal to the background thread that we are unpaused |
There was a problem hiding this comment.
Super nit, but capitalize like other comments.
|
|
||
| def check_liveness! | ||
| synchronize do | ||
| if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused |
There was a problem hiding this comment.
I think we may have lost a good multi-line comment that was above this long conditional. I'm always happy to see comments in concurrent code, explaining what is happening and why. In my opinion you can sprinkle more "play by play" comments throughout this production code.
| def backoff_and_wait! | ||
| @reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0 | ||
| synchronize do | ||
| @pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped |
There was a problem hiding this comment.
There is a subtle timing gotcha here. When the stream disconnects due to a network error (e.g. GRPC::BadStatus raised inside the background_run loop), @pause_cond.wait is called here with @stream_opened still true.
If the @pong_monitor_task fires during this backoff sleep, it will call check_liveness! (L95), evaluate the long condition on L477 as true (since no pongs are arriving), log the warning, and raise RestartStream (L482).
This asynchronous exception immediately interrupts the sleep. The background thread wakes up early, exits the current rescue block, gets rescued by rescue RestartStream on L355, and enters a second backoff sleep (which multiplies @reconnect_delay again).
To prevent the monitor task from interrupting our backoff delays and causing double-backoffs, we should set @stream_opened = false under synchronization just above this line:
def backoff_and_wait!
@reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0
synchronize do
@stream_opened = false # <--- Disable liveness checker during backoff
@pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped
end
end
Overview
Implements proactive streaming keep-alive logic and connection health monitoring in
Google::Cloud::PubSub::MessageListener::Stream, mirroring the design implemented in the .NET Pub/Sub client (dotnet#15649).Long-running bi-directional gRPC streaming pull connections (
StreamingPull) can experience silent TCP drops, intermediary network timeouts, or read deadlocks during periods of low message volume. This change introduces background timer tasks to push regular keep-alive requests and actively monitor server Pong timestamps.Key Changes
protocol_version = 1on the initialStreamingPullRequestprotobuf to enable bi-directional stream keep-alive support.@stream_keepalive_task) to dispatch emptyStreamingPullRequestpings at regular intervals (default 30 seconds), regardless of current lease inventory volume.@pong_monitor_taskto inspect timestamps (@last_ping_at,@last_pong_at). If a keep-alive response is overdue by more thanpong_deadlineseconds (default 15 seconds), the monitor raisesRestartStreamto safely recycle the connection and back off.@last_ping_at = now if @last_pong_at >= @last_ping_at) to ensure consecutive un-ponged pings cannot overwrite the timestamp of an overdue request.Testing & Validation
keepalive_test.rb): Added targeted unit test coverage asserting protocol version flags, timer intervals, deadline timeouts, and non-disruptive Pong handling.helical-zone-771) across simulated TCP socket hangs, sub-millisecond deadline starvation, and post-recovery downstream message delivery.Fixes b/427319802