Skip to content

feat(pubsub): implement streaming keep-alive logic#34653

Open
torreypayne wants to merge 3 commits into
mainfrom
pubsub-streaming-keepalive
Open

feat(pubsub): implement streaming keep-alive logic#34653
torreypayne wants to merge 3 commits into
mainfrom
pubsub-streaming-keepalive

Conversation

@torreypayne

@torreypayne torreypayne commented Jun 22, 2026

Copy link
Copy Markdown
Member

Overview

Implements proactive streaming keep-alive logic and connection health monitoring in Google::Cloud::PubSub::MessageListener::Stream, mirroring the design implemented in the .NET Pub/Sub client (dotnet#15649).

Long-running bi-directional gRPC streaming pull connections (StreamingPull) can experience silent TCP drops, intermediary network timeouts, or read deadlocks during periods of low message volume. This change introduces background timer tasks to push regular keep-alive requests and actively monitor server Pong timestamps.

Key Changes

  • Protocol Version Initialization: Explicitly initializes protocol_version = 1 on the initial StreamingPullRequest protobuf to enable bi-directional stream keep-alive support.
  • Unconditional Keep-Alive Pings: Configures a background timer task (@stream_keepalive_task) to dispatch empty StreamingPullRequest pings at regular intervals (default 30 seconds), regardless of current lease inventory volume.
  • Pong Monitoring & Automatic Reconnection: Introduces @pong_monitor_task to inspect timestamps (@last_ping_at, @last_pong_at). If a keep-alive response is overdue by more than pong_deadline seconds (default 15 seconds), the monitor raises RestartStream to safely recycle the connection and back off.
  • Concurrency Timestamp Guard: Guards ping timestamp updates (@last_ping_at = now if @last_pong_at >= @last_ping_at) to ensure consecutive un-ponged pings cannot overwrite the timestamp of an overdue request.

Testing & Validation

  • Unit Test Suite (keepalive_test.rb): Added targeted unit test coverage asserting protocol version flags, timer intervals, deadline timeouts, and non-disruptive Pong handling.
  • Resiliency & Robustness Suite: Validated against live GCP test instances (helical-zone-771) across simulated TCP socket hangs, sub-millisecond deadline starvation, and post-recovery downstream message delivery.

Fixes b/427319802

@torreypayne torreypayne force-pushed the pubsub-streaming-keepalive branch 2 times, most recently from cf5df9b to b1acc8a Compare June 23, 2026 04:52
@torreypayne torreypayne marked this pull request as ready for review June 23, 2026 15:54
@torreypayne torreypayne requested review from a team and yoshi-approver as code owners June 23, 2026 15:54
@torreypayne torreypayne force-pushed the pubsub-streaming-keepalive branch 2 times, most recently from 94e9f14 to ad59cd2 Compare June 23, 2026 17:29

@robertvoinescu-work robertvoinescu-work left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM from a functional perspective. Just a few minor comments.

Comment thread google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb Outdated
Comment thread google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb Outdated
@torreypayne torreypayne force-pushed the pubsub-streaming-keepalive branch from ad59cd2 to ca925d3 Compare June 25, 2026 17:06
@torreypayne torreypayne force-pushed the pubsub-streaming-keepalive branch from ca925d3 to e53a9bd Compare June 26, 2026 20:41
@quartzmo

Copy link
Copy Markdown
Member

@torreypayne

Great PR! ❤️

Three things for you to look into:

1. Unpause Race Condition

In the monitor task check, you disable the liveness checker when flow control pauses the stream (!@paused). The read path is blocked and incoming pongs will sit in the gRPC buffers.

However, there is a potential race condition when the stream is unpaused:

  1. While paused, the keepalive task still sends pings but @last_ping_at is frozen at the first unponged ping.
  2. When unpause_streaming! is called, it sets @paused = nil and signals @pause_cond.
  3. If the @pong_monitor_task thread checks the state (every 6s) after @paused becomes nil but before the background thread has had time to wake up, call enum.next, read the buffered pong, and update @last_pong_at, it will detect a false-positive timeout as follows:
    • @paused is nil (satisfying the !@paused condition at stream.rb#L103).
    • The elapsed time check (now - @last_ping_at >= @pong_deadline) is true (at stream.rb#L105) because now - @last_ping_at is large (representing the pause duration).
    • @last_pong_at < @last_ping_at is true (at stream.rb#L105).
  4. This will trigger an immediate, false-positive stream restart.

Assuming a background thread wake-up and gRPC read delay of around 10ms alongside a 6-second monitor interval, there is roughly a 0.17% chance of encountering this race on any given unpause event. In a busy production environment where streams pause and unpause frequently, this could potentially scale up to thousands of false-positive stream restarts daily in aggregate. That in turn might lead to duplicate message redeliveries, CPU overhead from renegotiations, and unnecessary warning logs.

Suggested Fix:

Inside Stream#unpause_streaming!, reset @last_pong_at to the current monotonic time. Additionally, since the method broadcasts to the @pause_cond condition variable, we should wrap its body in a synchronize block to guarantee thread safety (and to avoid ThreadError: current fiber not owner when calling it from tests):

def unpause_streaming!
  synchronize do
    return unless unpause_streaming?

    @paused = nil
    @last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    subscriber.service.logger.log :info, "subscriber-flow-control" do
      "subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control"
    end
    # signal to the background thread that we are unpaused
    @pause_cond.broadcast
  end
end

This lock-safe reset ensures that the "last pong" appears newer than the stale ping sent before/during the pause, preventing immediate false-positive restarts while the background thread wakes up to read the buffered pongs.

If the stream was indeed dead, the detection will proceed as follows:

  1. At the next keepalive task execution (line 90 in stream.rb), since @last_pong_at >= @last_ping_at is true (due to our reset), @last_ping_at is updated to the current time, and a new ping is sent.
  2. Because the connection is dead, no pong will arrive, leaving @last_pong_at stale.
  3. Once pong_deadline (15s) has passed, the monitor task (line 105 in stream.rb) will evaluate now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at as true.
  4. It will then call @background_thread&.raise RestartStream (line 110 in stream.rb), which restarts the stream in background_run's rescue block (line 379 in stream.rb).

This delay of at most 15–45 seconds in detecting a dead connection after a long pause is a negligible trade-off to eliminate false-positive restarts.

2. TimerTask Lifecycle & Testability

Currently, @stream_keepalive_task and @pong_monitor_task are initialized inside the constructor initialize, executed in start, and shut down in stop.

There are two main issues with this setup:

  1. Reusability: Calling Concurrent::TimerTask#shutdown permanently terminates its executor thread pool. Once shut down, the task cannot be re-run by calling .execute again. If the stream is ever stopped and subsequently restarted, calling start again will fail to execute the timers.
  2. Testability: The core logic for sending pings and monitoring liveness is embedded directly inside anonymous blocks passed to Concurrent::TimerTask.new, where it is very difficult to test in isolation. There is an opportunity here to add fine-grained unit tests in stream_test.rb to cover this logic.

Suggested Fix:

To resolve both issues:

  1. Extract the timer block contents into two private, synchronous helper methods on Stream: send_keepalive_ping! and check_liveness!.
  2. Initialize both timer instance variables to nil in the constructor.
  3. In start, instantiate and run the timers directly inline.
  4. In stop, call shutdown and set both variables back to nil.

For example:

def start
  synchronize do
    break if @background_thread

    @inventory.start

    @stream_keepalive_task = Concurrent::TimerTask.new(
      execution_interval: @keepalive_interval
    ) { send_keepalive_ping! }
    @stream_keepalive_task.execute

    @pong_monitor_task = Concurrent::TimerTask.new(
      execution_interval: [@keepalive_interval / 5.0, 0.01].max
    ) { check_liveness! }
    @pong_monitor_task.execute

    start_streaming!
  end

  self
end

def stop
  synchronize do
    break if @stopped

    # ... other stop logic ...

    @stream_keepalive_task&.shutdown
    @stream_keepalive_task = nil
    @pong_monitor_task&.shutdown
    @pong_monitor_task = nil
  end
end

def send_keepalive_ping!
  synchronize do
    if @stream_opened && !@stopped && @request_queue
      subscriber.service.logger.log :info, "subscriber-streams" do
        "sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
      end
      @last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at
      push Google::Cloud::PubSub::V1::StreamingPullRequest.new
    end
  end
end

def check_liveness!
  synchronize do
    if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused
      now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at
        subscriber.service.logger.log :error, "subscriber-streams" do
          "Keep-alive pong not received within #{@pong_deadline}s; restarting stream."
        end
        @stream_opened = false
        @background_thread&.raise RestartStream
      end
    end
  end
end

By extracting the execution logic to private methods send_keepalive_ping! and check_liveness!, you can now write direct, synchronous unit tests. In your unit tests, you can instantiate a mock stream, set up specific states (e.g., setting @last_ping_at, @last_pong_at, and @paused to various values), and invoke check_liveness! or send_keepalive_ping! synchronously to verify their behavior directly, without needing to manage real-time timer threads.

3. Test Coverage & CI Flakiness

The current tests in keepalive_test.rb are a mix of serialization checks and timing-based integration tests.

Suggested testing strategy:

  1. Retain serialization checks: The test verifying that protocol_version = 1 is sent on the initial request is valuable and does not rely on timing or sleeps. This should be kept as a high-level integration test.
  2. Retain one end-to-end integration test for stream restart: The test restarts stream when keep-alive pong deadline is exceeded is valuable because it verifies the async timer scheduling, deadline check, and asynchronous thread interruption (@background_thread&.raise RestartStream). However, we should increase the polling timeout (e.g., changing 200 to 500 at keepalive_test.rb#L105) to wait up to 5 seconds instead of 2 seconds, preventing false-positive test flakes in slow CI environments.
  3. Replace the other timing-dependent tests: The remaining two timing-dependent integration tests (sends keep-alive pings periodically... and does not restart stream when actively receiving...) can be replaced with direct, synchronous unit tests on the new private methods. This allows us to verify ping frequency logic and normal liveness checks deterministically without relying on real-time sleeps and thread scheduling.

Additionally, we currently have no coverage for the edge cases described above (the unpause race condition, or the reusability of the timer tasks).

Once the logic is refactored to the send_keepalive_ping! and check_liveness! methods, you can cover them with unit tests.

Here is an example for the unpause race condition:

it "does not trigger false restarts on unpausing even if pause exceeded deadline" do
  # Setup: stream opened, ping sent 30s ago, pong received 35s ago (stale), paused.
  stream = listener.stream_pool.first
  stream.instance_variable_set :@stream_opened, true
  stream.instance_variable_set :@last_ping_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) - 30.0
  stream.instance_variable_set :@last_pong_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) - 35.0
  stream.instance_variable_set :@paused, true
  stream.instance_variable_set :@stopped, false
  
  # 1. When paused, liveness checker should skip verification.
  stream.check_liveness!
  assert stream.instance_variable_get(:@stream_opened) # Stream remains open
  
  # 2. Unpausing must reset @last_pong_at to prevent immediate restart.
  stream.unpause_streaming!
  
  # 3. Direct liveness check immediately after unpausing (simulates the monitor thread racing the background thread).
  # It should NOT close the stream because our reset made @last_pong_at newer than @last_ping_at.
  stream.check_liveness!
  assert stream.instance_variable_get(:@stream_opened) # Stream remains open, no restart!
end

And here is an example for the TimerTask lifecycle:

it "re-creates and executes timer tasks if stopped and restarted" do
  stream = listener.stream_pool.first
  
  # 1. Timers should be active on a started stream
  refute_nil stream.instance_variable_get(:@stream_keepalive_task)
  refute_nil stream.instance_variable_get(:@pong_monitor_task)
  
  first_keepalive = stream.instance_variable_get(:@stream_keepalive_task)
  first_monitor = stream.instance_variable_get(:@pong_monitor_task)
  
  # 2. Stopping the stream must shutdown and nilify the timers
  stream.stop
  assert_nil stream.instance_variable_get(:@stream_keepalive_task)
  assert_nil stream.instance_variable_get(:@pong_monitor_task)
  assert first_keepalive.shutdown?
  assert first_monitor.shutdown?
  
  # 3. Simulating a restart should create completely new timer instances
  stream.instance_variable_set :@background_thread, nil
  stream.instance_variable_set :@stopped, false
  stream.start
  
  new_keepalive = stream.instance_variable_get(:@stream_keepalive_task)
  new_monitor = stream.instance_variable_get(:@pong_monitor_task)
  
  refute_nil new_keepalive
  refute_nil new_monitor
  refute_equal first_keepalive, new_keepalive
  refute_equal first_monitor, new_monitor
end

- Wrap unpause_streaming! in synchronize block and reset @last_pong_at to prevent false-positive stream restarts upon unblocking flow control.
- Extract send_keepalive_ping! and check_liveness! helper methods on Stream and re-create TimerTask instances on start/stop to support clean restarts and deterministic unit testing.
- Overhaul keepalive_test.rb with synchronous unit test coverage for timer helpers, unpause race condition, and TimerTask re-creation, while increasing CI polling timeouts to prevent flakiness.
@torreypayne torreypayne force-pushed the pubsub-streaming-keepalive branch from 26bcbb1 to 24055fd Compare June 30, 2026 15:53
…g real-time threads

- Remove listener.start from synchronous unit tests (test_0003, test_0004, test_0005) so they execute on standalone Stream instances without spawning background threads or real-time TimerTask loops.
- Isolate ENV['PUBSUB_TEST_PONG_DEADLINE'] = '0.05' from global before block so short 50ms timeouts only apply during asynchronous integration tests, eliminating flakiness on CI in slow build environments (Ubuntu Ruby 3.2 and Windows Ruby 4.0).
@torreypayne torreypayne added kokoro:run Add this label to force Kokoro to re-run the tests. kokoro:force-run Add this label to force Kokoro to re-run the tests. labels Jun 30, 2026
@yoshi-kokoro yoshi-kokoro removed kokoro:run Add this label to force Kokoro to re-run the tests. kokoro:force-run Add this label to force Kokoro to re-run the tests. labels Jun 30, 2026
@torreypayne

torreypayne commented Jul 1, 2026

Copy link
Copy Markdown
Member Author

@torreypayne

Great PR! ❤️

Three things for you to look into:
...

@quartzmo Thanks a ton for the thorough review; you identified several areas where I was able to make significant reliability improvements and clarify the keep-alive API. I addressed all 3 points that you identified; of note, I refactored the ping and pong logic into two private methods and added tests on those private methods explicitly in 'keepalive_test.rb', reset '@last_pong_at' when we unpause, and initialize the timerTasks as instance variables that we set/clear in #start and #shutdown. Please take another look when you have a moment and LMK how this looks.

push Google::Cloud::PubSub::V1::StreamingPullRequest.new
end
end.execute
@keepalive_interval = Float(ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] || 30)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be exposing these test-only environment variables (e.g. PUBSUB_TEST_KEEPALIVE_INTERVA) in production code.

Stream already has attr_accessor :keepalive_interval, :pong_deadline, we can customize these attributes directly on the stream objects in our test before calling listener.start

@@ -48,6 +48,7 @@ class Stream
##
# @private exactly_once_delivery_enabled.
attr_reader :exactly_once_delivery_enabled
attr_accessor :keepalive_interval, :pong_deadline

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please look at other examples in our libraries, but I believe we should split these into multiple lines. It would be great to add docstrings as well.

let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) }
let(:subscriber) { Google::Cloud::PubSub::Subscriber.from_grpc sub_grpc, pubsub.service }

it "b/528401453: waits for exponential backoff before retrying on GRPC::Unavailable" do

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not leave the bug IDs unless needed. We should only really use as reminder for follow-ups.

@stream_keepalive_task.execute

@pong_monitor_task = Concurrent::TimerTask.new(
execution_interval: [@keepalive_interval / 5.0, 0.01].max

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have some super light comments on logic / why we use certain values.

subscriber.service.logger.log :info, "subscriber-flow-control" do
"subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control"
end
# signal to the background thread that we are unpaused

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit, but capitalize like other comments.


def check_liveness!
synchronize do
if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may have lost a good multi-line comment that was above this long conditional. I'm always happy to see comments in concurrent code, explaining what is happening and why. In my opinion you can sprinkle more "play by play" comments throughout this production code.

def backoff_and_wait!
@reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0
synchronize do
@pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a subtle timing gotcha here. When the stream disconnects due to a network error (e.g. GRPC::BadStatus raised inside the background_run loop), @pause_cond.wait is called here with @stream_opened still true.

If the @pong_monitor_task fires during this backoff sleep, it will call check_liveness! (L95), evaluate the long condition on L477 as true (since no pongs are arriving), log the warning, and raise RestartStream (L482).

This asynchronous exception immediately interrupts the sleep. The background thread wakes up early, exits the current rescue block, gets rescued by rescue RestartStream on L355, and enters a second backoff sleep (which multiplies @reconnect_delay again).

To prevent the monitor task from interrupting our backoff delays and causing double-backoffs, we should set @stream_opened = false under synchronization just above this line:

def backoff_and_wait!
  @reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0
  synchronize do
    @stream_opened = false # <--- Disable liveness checker during backoff
    @pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped
  end
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants