From e53a9bd6e9fd9635b87e2844f7c9f50e703eb816 Mon Sep 17 00:00:00 2001 From: Yoshi Automation Bot Date: Mon, 22 Jun 2026 23:30:07 +0000 Subject: [PATCH 1/3] feat(pubsub): implement streaming keep-alive logic --- .../cloud/pubsub/message_listener/stream.rb | 84 +++++++++- .../message_listener/acknowledge_test.rb | 6 +- .../message_listener/bug_regression_test.rb | 73 +++++++++ .../pubsub/message_listener/inventory_test.rb | 6 +- .../pubsub/message_listener/keepalive_test.rb | 146 ++++++++++++++++++ .../modify_ack_deadline_test.rb | 6 +- .../pubsub/message_listener/nack_test.rb | 6 +- 7 files changed, 311 insertions(+), 16 deletions(-) create mode 100644 google-cloud-pubsub/test/google/cloud/pubsub/message_listener/bug_regression_test.rb create mode 100644 google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb index fa563dd17f57..61cb481d051c 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb @@ -48,6 +48,7 @@ class Stream ## # @private exactly_once_delivery_enabled. attr_reader :exactly_once_delivery_enabled + attr_accessor :keepalive_interval, :pong_deadline ## # @private Create an empty Subscriber::Stream object. @@ -68,17 +69,49 @@ def initialize subscriber @callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads + @keepalive_interval = Float(ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] || 30) + @pong_deadline = Float(ENV["PUBSUB_TEST_PONG_DEADLINE"] || 15) + @last_ping_at = nil + @last_pong_at = nil + @stream_opened = false + @reconnect_delay = nil + @stream_keepalive_task = Concurrent::TimerTask.new( - execution_interval: 30 + execution_interval: @keepalive_interval ) do - # push empty request every 30 seconds to keep stream alive - unless inventory.empty? - subscriber.service.logger.log :info, "subscriber-streams" do - "sending keepAlive to stream for subscription #{@subscriber.subscription_name}" + synchronize do + # @request_queue feeds client requests (initial pull request and keep-alive pings) into gRPC. + # Note: ACKs are sent via unary RPCs (TimedUnaryBuffer), not over this stream. + # Check that @request_queue is initialized (not nil) before pushing unconditional keep-alive pings. + if @stream_opened && !@stopped && @request_queue + subscriber.service.logger.log :info, "subscriber-streams" do + "sending keepAlive to stream for subscription #{@subscriber.subscription_name}" + end + @last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at + push Google::Cloud::PubSub::V1::StreamingPullRequest.new end - push Google::Cloud::PubSub::V1::StreamingPullRequest.new end - end.execute + end + + @pong_monitor_task = Concurrent::TimerTask.new( + execution_interval: [@keepalive_interval / 5.0, 0.01].max + ) do + synchronize do + # Do not check pong deadline if @paused (client flow control inventory full). + # When @paused, background_run waits on condition variable and stops calling enum.next, + # so incoming server pongs sit buffered in gRPC and @last_pong_at stays un-updated. + if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at + subscriber.service.logger.log :error, "subscriber-streams" do + "Keep-alive pong not received within #{@pong_deadline}s; restarting stream." + end + @stream_opened = false + @background_thread&.raise RestartStream + end + end + end + end end def start @@ -86,6 +119,8 @@ def start break if @background_thread @inventory.start + @stream_keepalive_task.execute + @pong_monitor_task.execute start_streaming! end @@ -108,6 +143,9 @@ def stop @stopped = true @pause_cond.broadcast + @stream_keepalive_task.shutdown + @pong_monitor_task.shutdown + # Now that the reception thread is stopped, immediately stop the # callback thread pool. All queued callbacks will see the stream # is stopped and perform a noop. @@ -219,6 +257,13 @@ class RestartStream < StandardError; end # rubocop:disable all + def backoff_and_wait! + @reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0 + synchronize do + @pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped + end + end + def background_run synchronize do # Don't allow a stream to restart if already stopped @@ -245,11 +290,21 @@ def background_run # Call the StreamingPull API to get the response enumerator options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } } + synchronize do + @stream_opened = false + end enum = @subscriber.service.streaming_pull @request_queue.each, options subscriber.service.logger.log :info, "subscriber-streams" do "rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened" end + synchronize do + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @last_ping_at = now + @last_pong_at = now + @stream_opened = true + end + loop do synchronize do if @paused && !@stopped @@ -264,8 +319,17 @@ def background_run begin # Cannot synchronize the enumerator, causes deadlock response = enum.next - new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled + synchronize do + @last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + # Reset backoff delay only after successfully reading a frame from enum.next. + # If the connection drops immediately upon reading, @reconnect_delay is preserved. + @reconnect_delay = nil + end received_messages = response.received_messages + # Skip processing properties and inventory on Pong frames (empty received_messages). + # Subscription properties on keep-alive Pongs are not valid. + next if received_messages.empty? + new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled # Use synchronize so changes happen atomically synchronize do @@ -310,11 +374,13 @@ def background_run "#{status_code}; will be retried." end # Restart the stream with an incremental back for a retriable error. + backoff_and_wait! retry rescue RestartStream subscriber.service.logger.log :info, "subscriber-streams" do "Subscriber stream for subscription #{@subscriber.subscription_name} has ended; will be retried." end + backoff_and_wait! retry rescue StandardError => e subscriber.service.logger.log :error, "subscriber-streams" do @@ -322,6 +388,7 @@ def background_run end @subscriber.error! e + backoff_and_wait! retry end @@ -443,6 +510,7 @@ def initial_input_request req.client_id = @subscriber.service.client_id req.max_outstanding_messages = @inventory.limit req.max_outstanding_bytes = @inventory.bytesize + req.protocol_version = 1 end end diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/acknowledge_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/acknowledge_test.rb index 05b8eeb4ccb8..4f282be9dac1 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/acknowledge_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/acknowledge_test.rb @@ -71,7 +71,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] @@ -132,7 +133,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/bug_regression_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/bug_regression_test.rb new file mode 100644 index 000000000000..e6fecbfa56b3 --- /dev/null +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/bug_regression_test.rb @@ -0,0 +1,73 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require "helper" + +describe Google::Cloud::PubSub::MessageListener, :bug_regression, :mock_pubsub do + let(:topic_name) { "topic-name-goes-here" } + let(:sub_name) { "subscription-name-goes-here" } + let(:sub_hash) { subscription_hash topic_name, sub_name } + let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) } + let(:subscriber) { Google::Cloud::PubSub::Subscriber.from_grpc sub_grpc, pubsub.service } + + it "b/528401453: waits for exponential backoff before retrying on GRPC::Unavailable" do + attempts = [] + pull_res = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [] + response_groups = [[GRPC::Unavailable.new("simulated disconnect")], [pull_res]] + stub = StreamingPullStub.new response_groups + def stub.streaming_pull_internal request, options = nil + @attempts ||= [] + @attempts << Process.clock_gettime(Process::CLOCK_MONOTONIC) + super + end + stub.instance_variable_set(:@attempts, attempts) + subscriber.service.mocked_subscription_admin = stub + + listener = subscriber.listen streams: 1 do |msg| + end + listener.start + + retries = 0 + until attempts.count >= 2 + fail "stream did not retry" if retries > 200 + retries += 1 + sleep 0.05 + end + + listener.stop + listener.wait! + + elapsed = attempts[1] - attempts[0] + puts "\n[b/528401453 Test] Elapsed delay between attempts: #{elapsed.round(3)}s" + _(elapsed).must_be :>=, 1.0 + end + + it "b/528404815: shuts down keepalive TimerTask when stream is stopped" do + pull_res = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [] + stub = StreamingPullStub.new [[pull_res]] + subscriber.service.mocked_subscription_admin = stub + + listener = subscriber.listen streams: 1 do |msg| + end + listener.start + sleep 0.1 + listener.stop + listener.wait! + + stream = listener.instance_variable_get(:@stream_pool).first + keepalive_task = stream.instance_variable_get(:@stream_keepalive_task) + puts "\n[b/528404815 Test] Keepalive task running state after stop: #{keepalive_task.running?}" + _(keepalive_task.running?).must_equal false + end +end diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb index 2d16c67b0364..bdb3a202c879 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb @@ -76,7 +76,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] @@ -141,7 +142,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb new file mode 100644 index 000000000000..ccdef5193081 --- /dev/null +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb @@ -0,0 +1,146 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require "helper" + +describe Google::Cloud::PubSub::MessageListener, :keepalive, :mock_pubsub do + let(:topic_name) { "topic-name-goes-here" } + let(:sub_name) { "subscription-name-goes-here" } + let(:sub_hash) { subscription_hash topic_name, sub_name } + let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) } + let(:subscriber) { Google::Cloud::PubSub::Subscriber.from_grpc sub_grpc, pubsub.service } + let(:rec_msg1_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \ + rec_message_hash("rec_message1-msg-goes-here", 1111) } + + before do + ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] = "0.05" + ENV["PUBSUB_TEST_PONG_DEADLINE"] = "0.05" + end + + after do + ENV.delete "PUBSUB_TEST_KEEPALIVE_INTERVAL" + ENV.delete "PUBSUB_TEST_PONG_DEADLINE" + end + + it "sends protocol_version = 1 in initial streaming pull request" do + pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] + stub = StreamingPullStub.new [[pull_res1]] + subscriber.service.mocked_subscription_admin = stub + + called = false + listener = subscriber.listen streams: 1 do |msg| + called = true + end + listener.start + + listener_retries = 0 + until called + fail "callback was not called" if listener_retries > 100 + listener_retries += 1 + sleep 0.01 + end + + listener.stop + listener.wait! + + initial_req = stub.requests.first.to_a.first + _(initial_req.protocol_version).must_equal 1 + end + + it "sends keep-alive pings periodically even when inventory is empty" do + q = StreamingPullStub::RaisableEnumeratorQueue.new + stub = StreamingPullStub.new [[]] + def stub.streaming_pull_internal req, opt = nil + @requests << req + @my_q.each + end + stub.instance_variable_set(:@my_q, q) + subscriber.service.mocked_subscription_admin = stub + + listener = subscriber.listen streams: 1 do |msg| + end + listener.start + + pong_thread = Thread.new do + 10.times do + sleep 0.02 + q.push Google::Cloud::PubSub::V1::StreamingPullResponse.new(received_messages: []) + end + end + + sleep 0.18 + pong_thread.join + + listener.stop + listener.wait! + + reqs = stub.requests.first.to_a + _(reqs.count).must_be :>=, 2 + end + + it "restarts stream when keep-alive pong deadline is exceeded" do + pull_res2 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] + stub = StreamingPullStub.new [[], [pull_res2]] + subscriber.service.mocked_subscription_admin = stub + + called = false + listener = subscriber.listen streams: 1 do |msg| + called = true + end + listener.start + + listener_retries = 0 + until called + fail "stream did not restart and deliver message" if listener_retries > 200 + listener_retries += 1 + sleep 0.01 + end + + listener.stop + listener.wait! + + _(stub.requests.count).must_equal 2 + end + + it "does not restart stream when actively receiving keep-alive pongs" do + q = StreamingPullStub::RaisableEnumeratorQueue.new + stub = StreamingPullStub.new [[]] + def stub.streaming_pull_internal req, opt = nil + @requests << req + @my_q.each + end + stub.instance_variable_set(:@my_q, q) + subscriber.service.mocked_subscription_admin = stub + + listener = subscriber.listen streams: 1 do |msg| + end + listener.start + + pong_sender = Thread.new do + 8.times do + sleep 0.02 + empty_pong = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [] + q.push empty_pong + end + end + + sleep 0.15 + pong_sender.join + + listener.stop + listener.wait! + + _(stub.requests.count).must_equal 1 + end +end diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/modify_ack_deadline_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/modify_ack_deadline_test.rb index 190bbe8d6a78..acd1476f81cc 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/modify_ack_deadline_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/modify_ack_deadline_test.rb @@ -70,7 +70,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] @@ -126,7 +127,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/nack_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/nack_test.rb index 5b0744ef97de..9f6200a0126a 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/nack_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/nack_test.rb @@ -70,7 +70,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] @@ -126,7 +127,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] From 24055fd9c872c8ee9d0cb5c05e9d5bb30e8b289b Mon Sep 17 00:00:00 2001 From: Yoshi Automation Bot Date: Tue, 30 Jun 2026 15:46:48 +0000 Subject: [PATCH 2/3] fix(pubsub): remediate keep-alive unpause race and timer testability - Wrap unpause_streaming! in synchronize block and reset @last_pong_at to prevent false-positive stream restarts upon unblocking flow control. - Extract send_keepalive_ping! and check_liveness! helper methods on Stream and re-create TimerTask instances on start/stop to support clean restarts and deterministic unit testing. - Overhaul keepalive_test.rb with synchronous unit test coverage for timer helpers, unpause race condition, and TimerTask re-creation, while increasing CI polling timeouts to prevent flakiness. --- .../cloud/pubsub/message_listener/stream.rb | 93 ++++++------ .../message_listener/bug_regression_test.rb | 4 +- .../pubsub/message_listener/keepalive_test.rb | 135 ++++++++++++------ 3 files changed, 142 insertions(+), 90 deletions(-) diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb index 61cb481d051c..e6e242dd4f7a 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb @@ -76,42 +76,8 @@ def initialize subscriber @stream_opened = false @reconnect_delay = nil - @stream_keepalive_task = Concurrent::TimerTask.new( - execution_interval: @keepalive_interval - ) do - synchronize do - # @request_queue feeds client requests (initial pull request and keep-alive pings) into gRPC. - # Note: ACKs are sent via unary RPCs (TimedUnaryBuffer), not over this stream. - # Check that @request_queue is initialized (not nil) before pushing unconditional keep-alive pings. - if @stream_opened && !@stopped && @request_queue - subscriber.service.logger.log :info, "subscriber-streams" do - "sending keepAlive to stream for subscription #{@subscriber.subscription_name}" - end - @last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at - push Google::Cloud::PubSub::V1::StreamingPullRequest.new - end - end - end - - @pong_monitor_task = Concurrent::TimerTask.new( - execution_interval: [@keepalive_interval / 5.0, 0.01].max - ) do - synchronize do - # Do not check pong deadline if @paused (client flow control inventory full). - # When @paused, background_run waits on condition variable and stops calling enum.next, - # so incoming server pongs sit buffered in gRPC and @last_pong_at stays un-updated. - if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused - now = Process.clock_gettime(Process::CLOCK_MONOTONIC) - if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at - subscriber.service.logger.log :error, "subscriber-streams" do - "Keep-alive pong not received within #{@pong_deadline}s; restarting stream." - end - @stream_opened = false - @background_thread&.raise RestartStream - end - end - end - end + @stream_keepalive_task = nil + @pong_monitor_task = nil end def start @@ -119,7 +85,14 @@ def start break if @background_thread @inventory.start + @stream_keepalive_task = Concurrent::TimerTask.new( + execution_interval: @keepalive_interval + ) { send_keepalive_ping! } @stream_keepalive_task.execute + + @pong_monitor_task = Concurrent::TimerTask.new( + execution_interval: [@keepalive_interval / 5.0, 0.01].max + ) { check_liveness! } @pong_monitor_task.execute start_streaming! @@ -143,8 +116,10 @@ def stop @stopped = true @pause_cond.broadcast - @stream_keepalive_task.shutdown - @pong_monitor_task.shutdown + @stream_keepalive_task&.shutdown + @stream_keepalive_task = nil + @pong_monitor_task&.shutdown + @pong_monitor_task = nil # Now that the reception thread is stopped, immediately stop the # callback thread pool. All queued callbacks will see the stream @@ -483,15 +458,45 @@ def pause_streaming? @inventory.full? end + def send_keepalive_ping! + synchronize do + if @stream_opened && !@stopped && @request_queue + subscriber.service.logger.log :info, "subscriber-streams" do + "sending keepAlive to stream for subscription #{@subscriber.subscription_name}" + end + @last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at + push Google::Cloud::PubSub::V1::StreamingPullRequest.new + end + end + end + + def check_liveness! + synchronize do + if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at + subscriber.service.logger.log :error, "subscriber-streams" do + "Keep-alive pong not received within #{@pong_deadline}s; restarting stream." + end + @stream_opened = false + @background_thread&.raise RestartStream + end + end + end + end + def unpause_streaming! - return unless unpause_streaming? + synchronize do + return unless unpause_streaming? - @paused = nil - subscriber.service.logger.log :info, "subscriber-flow-control" do - "subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control" + @paused = nil + @last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + subscriber.service.logger.log :info, "subscriber-flow-control" do + "subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control" + end + # signal to the background thread that we are unpaused + @pause_cond.broadcast end - # signal to the background thread that we are unpaused - @pause_cond.broadcast end def unpause_streaming? diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/bug_regression_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/bug_regression_test.rb index e6fecbfa56b3..cda585f176e1 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/bug_regression_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/bug_regression_test.rb @@ -67,7 +67,7 @@ def stub.streaming_pull_internal request, options = nil stream = listener.instance_variable_get(:@stream_pool).first keepalive_task = stream.instance_variable_get(:@stream_keepalive_task) - puts "\n[b/528404815 Test] Keepalive task running state after stop: #{keepalive_task.running?}" - _(keepalive_task.running?).must_equal false + puts "\n[b/528404815 Test] Keepalive task running state after stop: #{keepalive_task&.running? || false}" + _(keepalive_task.nil? || !keepalive_task.running?).must_equal true end end diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb index ccdef5193081..667d03704261 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb @@ -58,29 +58,40 @@ _(initial_req.protocol_version).must_equal 1 end - it "sends keep-alive pings periodically even when inventory is empty" do - q = StreamingPullStub::RaisableEnumeratorQueue.new - stub = StreamingPullStub.new [[]] - def stub.streaming_pull_internal req, opt = nil - @requests << req - @my_q.each - end - stub.instance_variable_set(:@my_q, q) + it "restarts stream when keep-alive pong deadline is exceeded" do + pull_res2 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] + stub = StreamingPullStub.new [[], [pull_res2]] subscriber.service.mocked_subscription_admin = stub + called = false listener = subscriber.listen streams: 1 do |msg| + called = true end listener.start - pong_thread = Thread.new do - 10.times do - sleep 0.02 - q.push Google::Cloud::PubSub::V1::StreamingPullResponse.new(received_messages: []) - end + listener_retries = 0 + until called + fail "stream did not restart and deliver message" if listener_retries > 500 + listener_retries += 1 + sleep 0.01 end - sleep 0.18 - pong_thread.join + listener.stop + listener.wait! + + _(stub.requests.count).must_equal 2 + end + + it "sends keep-alive ping synchronously when stream is open and queue exists" do + stub = StreamingPullStub.new [[]] + subscriber.service.mocked_subscription_admin = stub + + listener = subscriber.listen streams: 1 do |msg| + end + listener.start + + stream = listener.instance_variable_get(:@stream_pool).first + stream.send(:send_keepalive_ping!) listener.stop listener.wait! @@ -89,58 +100,94 @@ def stub.streaming_pull_internal req, opt = nil _(reqs.count).must_be :>=, 2 end - it "restarts stream when keep-alive pong deadline is exceeded" do - pull_res2 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] - stub = StreamingPullStub.new [[], [pull_res2]] + it "does not restart stream when check_liveness! runs under active pongs" do + stub = StreamingPullStub.new [[]] subscriber.service.mocked_subscription_admin = stub - called = false listener = subscriber.listen streams: 1 do |msg| - called = true end listener.start - listener_retries = 0 - until called - fail "stream did not restart and deliver message" if listener_retries > 200 - listener_retries += 1 - sleep 0.01 - end + stream = listener.instance_variable_get(:@stream_pool).first + stream.instance_variable_set :@last_ping_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) + stream.instance_variable_set :@last_pong_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) + + stream.send(:check_liveness!) + _(stream.instance_variable_get(:@stream_opened)).must_equal true listener.stop listener.wait! - - _(stub.requests.count).must_equal 2 end - it "does not restart stream when actively receiving keep-alive pongs" do - q = StreamingPullStub::RaisableEnumeratorQueue.new + it "does not trigger false restarts on unpausing even if pause exceeded deadline" do stub = StreamingPullStub.new [[]] - def stub.streaming_pull_internal req, opt = nil - @requests << req - @my_q.each - end - stub.instance_variable_set(:@my_q, q) subscriber.service.mocked_subscription_admin = stub listener = subscriber.listen streams: 1 do |msg| end listener.start - pong_sender = Thread.new do - 8.times do - sleep 0.02 - empty_pong = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [] - q.push empty_pong - end + stream = listener.instance_variable_get(:@stream_pool).first + stream.instance_variable_set :@stream_opened, true + stream.instance_variable_set :@last_ping_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) - 30.0 + stream.instance_variable_set :@last_pong_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) - 35.0 + stream.instance_variable_set :@paused, true + stream.instance_variable_set :@stopped, false + + # 1. When paused, liveness checker should skip verification. + stream.send(:check_liveness!) + _(stream.instance_variable_get(:@stream_opened)).must_equal true + + # 2. Unpausing must reset @last_pong_at to prevent immediate restart. + stream.send(:unpause_streaming!) + + # 3. Direct liveness check immediately after unpausing (simulates the monitor thread racing the background thread). + # It should NOT close the stream because our reset made @last_pong_at newer than @last_ping_at. + stream.send(:check_liveness!) + _(stream.instance_variable_get(:@stream_opened)).must_equal true + + listener.stop + listener.wait! + end + + it "re-creates and executes timer tasks if stopped and restarted" do + stub = StreamingPullStub.new [[]] + subscriber.service.mocked_subscription_admin = stub + + listener = subscriber.listen streams: 1 do |msg| end + listener.start + + stream = listener.instance_variable_get(:@stream_pool).first + + # 1. Timers should be active on a started stream + refute_nil stream.instance_variable_get(:@stream_keepalive_task) + refute_nil stream.instance_variable_get(:@pong_monitor_task) - sleep 0.15 - pong_sender.join + first_keepalive = stream.instance_variable_get(:@stream_keepalive_task) + first_monitor = stream.instance_variable_get(:@pong_monitor_task) + # 2. Stopping the stream must shutdown and nilify the timers listener.stop listener.wait! + assert_nil stream.instance_variable_get(:@stream_keepalive_task) + assert_nil stream.instance_variable_get(:@pong_monitor_task) + assert first_keepalive.shutdown? + assert first_monitor.shutdown? + + # 3. Simulating a restart should create completely new timer instances + stream.instance_variable_set :@background_thread, nil + stream.instance_variable_set :@stopped, false + stream.start + + new_keepalive = stream.instance_variable_get(:@stream_keepalive_task) + new_monitor = stream.instance_variable_get(:@pong_monitor_task) + + refute_nil new_keepalive + refute_nil new_monitor + refute_equal first_keepalive, new_keepalive + refute_equal first_monitor, new_monitor - _(stub.requests.count).must_equal 1 + stream.stop end end From 541eb9d51ea7a5f8ca827935b526eaef4b84572f Mon Sep 17 00:00:00 2001 From: Yoshi Automation Bot Date: Tue, 30 Jun 2026 16:03:38 +0000 Subject: [PATCH 3/3] fix(pubsub): make keep-alive unit tests deterministic without starting real-time threads - Remove listener.start from synchronous unit tests (test_0003, test_0004, test_0005) so they execute on standalone Stream instances without spawning background threads or real-time TimerTask loops. - Isolate ENV['PUBSUB_TEST_PONG_DEADLINE'] = '0.05' from global before block so short 50ms timeouts only apply during asynchronous integration tests, eliminating flakiness on CI in slow build environments (Ubuntu Ruby 3.2 and Windows Ruby 4.0). --- .../pubsub/message_listener/keepalive_test.rb | 98 +++++++++---------- 1 file changed, 47 insertions(+), 51 deletions(-) diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb index 667d03704261..4f7108f6e4c7 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb @@ -23,16 +23,6 @@ let(:rec_msg1_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \ rec_message_hash("rec_message1-msg-goes-here", 1111) } - before do - ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] = "0.05" - ENV["PUBSUB_TEST_PONG_DEADLINE"] = "0.05" - end - - after do - ENV.delete "PUBSUB_TEST_KEEPALIVE_INTERVAL" - ENV.delete "PUBSUB_TEST_PONG_DEADLINE" - end - it "sends protocol_version = 1 in initial streaming pull request" do pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] stub = StreamingPullStub.new [[pull_res1]] @@ -59,27 +49,34 @@ end it "restarts stream when keep-alive pong deadline is exceeded" do - pull_res2 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] - stub = StreamingPullStub.new [[], [pull_res2]] - subscriber.service.mocked_subscription_admin = stub - - called = false - listener = subscriber.listen streams: 1 do |msg| - called = true - end - listener.start - - listener_retries = 0 - until called - fail "stream did not restart and deliver message" if listener_retries > 500 - listener_retries += 1 - sleep 0.01 + ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] = "0.05" + ENV["PUBSUB_TEST_PONG_DEADLINE"] = "0.05" + begin + pull_res2 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] + stub = StreamingPullStub.new [[], [pull_res2]] + subscriber.service.mocked_subscription_admin = stub + + called = false + listener = subscriber.listen streams: 1 do |msg| + called = true + end + listener.start + + listener_retries = 0 + until called + fail "stream did not restart and deliver message" if listener_retries > 500 + listener_retries += 1 + sleep 0.01 + end + + listener.stop + listener.wait! + + _(stub.requests.count).must_equal 2 + ensure + ENV.delete "PUBSUB_TEST_KEEPALIVE_INTERVAL" + ENV.delete "PUBSUB_TEST_PONG_DEADLINE" end - - listener.stop - listener.wait! - - _(stub.requests.count).must_equal 2 end it "sends keep-alive ping synchronously when stream is open and queue exists" do @@ -88,16 +85,17 @@ listener = subscriber.listen streams: 1 do |msg| end - listener.start - stream = listener.instance_variable_get(:@stream_pool).first - stream.send(:send_keepalive_ping!) - listener.stop - listener.wait! + queue = Google::Cloud::PubSub::MessageListener::EnumeratorQueue.new stream + stream.instance_variable_set :@request_queue, queue + stream.instance_variable_set :@stream_opened, true + stream.instance_variable_set :@stopped, false + stream.instance_variable_set :@last_ping_at, 0.0 + stream.instance_variable_set :@last_pong_at, 1.0 - reqs = stub.requests.first.to_a - _(reqs.count).must_be :>=, 2 + stream.send(:send_keepalive_ping!) + _(stream.instance_variable_get(:@last_ping_at)).must_be :>, 0.0 end it "does not restart stream when check_liveness! runs under active pongs" do @@ -106,17 +104,17 @@ listener = subscriber.listen streams: 1 do |msg| end - listener.start - stream = listener.instance_variable_get(:@stream_pool).first - stream.instance_variable_set :@last_ping_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) - stream.instance_variable_set :@last_pong_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) + + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + stream.instance_variable_set :@stream_opened, true + stream.instance_variable_set :@stopped, false + stream.instance_variable_set :@paused, false + stream.instance_variable_set :@last_ping_at, now - 5.0 + stream.instance_variable_set :@last_pong_at, now - 1.0 stream.send(:check_liveness!) _(stream.instance_variable_get(:@stream_opened)).must_equal true - - listener.stop - listener.wait! end it "does not trigger false restarts on unpausing even if pause exceeded deadline" do @@ -125,12 +123,12 @@ listener = subscriber.listen streams: 1 do |msg| end - listener.start - stream = listener.instance_variable_get(:@stream_pool).first + + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) stream.instance_variable_set :@stream_opened, true - stream.instance_variable_set :@last_ping_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) - 30.0 - stream.instance_variable_set :@last_pong_at, Process.clock_gettime(Process::CLOCK_MONOTONIC) - 35.0 + stream.instance_variable_set :@last_ping_at, now - 30.0 + stream.instance_variable_set :@last_pong_at, now - 35.0 stream.instance_variable_set :@paused, true stream.instance_variable_set :@stopped, false @@ -140,14 +138,12 @@ # 2. Unpausing must reset @last_pong_at to prevent immediate restart. stream.send(:unpause_streaming!) + _(stream.instance_variable_get(:@last_pong_at)).must_be :>=, now # 3. Direct liveness check immediately after unpausing (simulates the monitor thread racing the background thread). # It should NOT close the stream because our reset made @last_pong_at newer than @last_ping_at. stream.send(:check_liveness!) _(stream.instance_variable_get(:@stream_opened)).must_equal true - - listener.stop - listener.wait! end it "re-creates and executes timer tasks if stopped and restarted" do