Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Stream
##
# @private exactly_once_delivery_enabled.
attr_reader :exactly_once_delivery_enabled
attr_accessor :keepalive_interval, :pong_deadline

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please look at other examples in our libraries, but I believe we should split these into multiple lines. It would be great to add docstrings as well.


##
# @private Create an empty Subscriber::Stream object.
Expand All @@ -68,24 +69,31 @@ def initialize subscriber

@callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads

@stream_keepalive_task = Concurrent::TimerTask.new(
execution_interval: 30
) do
# push empty request every 30 seconds to keep stream alive
unless inventory.empty?
subscriber.service.logger.log :info, "subscriber-streams" do
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
end
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
end
end.execute
@keepalive_interval = Float(ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] || 30)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be exposing these test-only environment variables (e.g. PUBSUB_TEST_KEEPALIVE_INTERVA) in production code.

Stream already has attr_accessor :keepalive_interval, :pong_deadline, we can customize these attributes directly on the stream objects in our test before calling listener.start

@pong_deadline = Float(ENV["PUBSUB_TEST_PONG_DEADLINE"] || 15)
@last_ping_at = nil
@last_pong_at = nil
@stream_opened = false
@reconnect_delay = nil

@stream_keepalive_task = nil
@pong_monitor_task = nil
end

def start
synchronize do
break if @background_thread

@inventory.start
@stream_keepalive_task = Concurrent::TimerTask.new(
execution_interval: @keepalive_interval
) { send_keepalive_ping! }
@stream_keepalive_task.execute

@pong_monitor_task = Concurrent::TimerTask.new(
execution_interval: [@keepalive_interval / 5.0, 0.01].max

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have some super light comments on logic / why we use certain values.

) { check_liveness! }
@pong_monitor_task.execute

start_streaming!
end
Expand All @@ -108,6 +116,11 @@ def stop
@stopped = true
@pause_cond.broadcast

@stream_keepalive_task&.shutdown
@stream_keepalive_task = nil
@pong_monitor_task&.shutdown
@pong_monitor_task = nil

# Now that the reception thread is stopped, immediately stop the
# callback thread pool. All queued callbacks will see the stream
# is stopped and perform a noop.
Expand Down Expand Up @@ -219,6 +232,13 @@ class RestartStream < StandardError; end

# rubocop:disable all

def backoff_and_wait!
@reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0
synchronize do
@pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a subtle timing gotcha here. When the stream disconnects due to a network error (e.g. GRPC::BadStatus raised inside the background_run loop), @pause_cond.wait is called here with @stream_opened still true.

If the @pong_monitor_task fires during this backoff sleep, it will call check_liveness! (L95), evaluate the long condition on L477 as true (since no pongs are arriving), log the warning, and raise RestartStream (L482).

This asynchronous exception immediately interrupts the sleep. The background thread wakes up early, exits the current rescue block, gets rescued by rescue RestartStream on L355, and enters a second backoff sleep (which multiplies @reconnect_delay again).

To prevent the monitor task from interrupting our backoff delays and causing double-backoffs, we should set @stream_opened = false under synchronization just above this line:

def backoff_and_wait!
  @reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0
  synchronize do
    @stream_opened = false # <--- Disable liveness checker during backoff
    @pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped
  end
end

end
end

def background_run
synchronize do
# Don't allow a stream to restart if already stopped
Expand All @@ -245,11 +265,21 @@ def background_run

# Call the StreamingPull API to get the response enumerator
options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } }
synchronize do
@stream_opened = false
end
enum = @subscriber.service.streaming_pull @request_queue.each, options
subscriber.service.logger.log :info, "subscriber-streams" do
"rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened"
end

synchronize do
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@last_ping_at = now
@last_pong_at = now
@stream_opened = true
end

loop do
synchronize do
if @paused && !@stopped
Expand All @@ -264,8 +294,17 @@ def background_run
begin
# Cannot synchronize the enumerator, causes deadlock
response = enum.next
new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled
synchronize do
@last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
# Reset backoff delay only after successfully reading a frame from enum.next.
# If the connection drops immediately upon reading, @reconnect_delay is preserved.
@reconnect_delay = nil
end
received_messages = response.received_messages
# Skip processing properties and inventory on Pong frames (empty received_messages).
# Subscription properties on keep-alive Pongs are not valid.
next if received_messages.empty?
new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled

# Use synchronize so changes happen atomically
synchronize do
Expand Down Expand Up @@ -310,18 +349,21 @@ def background_run
"#{status_code}; will be retried."
end
# Restart the stream with an incremental back for a retriable error.
backoff_and_wait!
retry
rescue RestartStream
subscriber.service.logger.log :info, "subscriber-streams" do
"Subscriber stream for subscription #{@subscriber.subscription_name} has ended; will be retried."
end
backoff_and_wait!
retry
rescue StandardError => e
subscriber.service.logger.log :error, "subscriber-streams" do
"error on stream for subscription #{@subscriber.subscription_name}: #{e.inspect}"
end
@subscriber.error! e

backoff_and_wait!
retry
end

Expand Down Expand Up @@ -416,15 +458,45 @@ def pause_streaming?
@inventory.full?
end

def send_keepalive_ping!
synchronize do
if @stream_opened && !@stopped && @request_queue
subscriber.service.logger.log :info, "subscriber-streams" do
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
end
@last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
end
end
end

def check_liveness!
synchronize do
if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may have lost a good multi-line comment that was above this long conditional. I'm always happy to see comments in concurrent code, explaining what is happening and why. In my opinion you can sprinkle more "play by play" comments throughout this production code.

now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at
subscriber.service.logger.log :error, "subscriber-streams" do
"Keep-alive pong not received within #{@pong_deadline}s; restarting stream."
end
@stream_opened = false
@background_thread&.raise RestartStream
end
end
end
end

def unpause_streaming!
return unless unpause_streaming?
synchronize do
return unless unpause_streaming?

@paused = nil
subscriber.service.logger.log :info, "subscriber-flow-control" do
"subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control"
@paused = nil
@last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
subscriber.service.logger.log :info, "subscriber-flow-control" do
"subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control"
end
# signal to the background thread that we are unpaused

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit, but capitalize like other comments.

@pause_cond.broadcast
end
# signal to the background thread that we are unpaused
@pause_cond.broadcast
end

def unpause_streaming?
Expand All @@ -443,6 +515,7 @@ def initial_input_request
req.client_id = @subscriber.service.client_id
req.max_outstanding_messages = @inventory.limit
req.max_outstanding_bytes = @inventory.bytesize
req.protocol_version = 1
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down Expand Up @@ -132,7 +133,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require "helper"

describe Google::Cloud::PubSub::MessageListener, :bug_regression, :mock_pubsub do
let(:topic_name) { "topic-name-goes-here" }
let(:sub_name) { "subscription-name-goes-here" }
let(:sub_hash) { subscription_hash topic_name, sub_name }
let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) }
let(:subscriber) { Google::Cloud::PubSub::Subscriber.from_grpc sub_grpc, pubsub.service }

it "b/528401453: waits for exponential backoff before retrying on GRPC::Unavailable" do

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not leave the bug IDs unless needed. We should only really use as reminder for follow-ups.

attempts = []
pull_res = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: []
response_groups = [[GRPC::Unavailable.new("simulated disconnect")], [pull_res]]
stub = StreamingPullStub.new response_groups
def stub.streaming_pull_internal request, options = nil
@attempts ||= []
@attempts << Process.clock_gettime(Process::CLOCK_MONOTONIC)
super
end
stub.instance_variable_set(:@attempts, attempts)
subscriber.service.mocked_subscription_admin = stub

listener = subscriber.listen streams: 1 do |msg|
end
listener.start

retries = 0
until attempts.count >= 2
fail "stream did not retry" if retries > 200
retries += 1
sleep 0.05
end

listener.stop
listener.wait!

elapsed = attempts[1] - attempts[0]
puts "\n[b/528401453 Test] Elapsed delay between attempts: #{elapsed.round(3)}s"
_(elapsed).must_be :>=, 1.0
end

it "b/528404815: shuts down keepalive TimerTask when stream is stopped" do
pull_res = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: []
stub = StreamingPullStub.new [[pull_res]]
subscriber.service.mocked_subscription_admin = stub

listener = subscriber.listen streams: 1 do |msg|
end
listener.start
sleep 0.1
listener.stop
listener.wait!

stream = listener.instance_variable_get(:@stream_pool).first
keepalive_task = stream.instance_variable_get(:@stream_keepalive_task)
puts "\n[b/528404815 Test] Keepalive task running state after stop: #{keepalive_task&.running? || false}"
_(keepalive_task.nil? || !keepalive_task.running?).must_equal true
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down Expand Up @@ -141,7 +142,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down
Loading
Loading