-
Notifications
You must be signed in to change notification settings - Fork 570
feat(pubsub): implement streaming keep-alive logic #34653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e53a9bd
24055fd
541eb9d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,6 +48,7 @@ class Stream | |
| ## | ||
| # @private exactly_once_delivery_enabled. | ||
| attr_reader :exactly_once_delivery_enabled | ||
| attr_accessor :keepalive_interval, :pong_deadline | ||
|
|
||
| ## | ||
| # @private Create an empty Subscriber::Stream object. | ||
|
|
@@ -68,24 +69,31 @@ def initialize subscriber | |
|
|
||
| @callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads | ||
|
|
||
| @stream_keepalive_task = Concurrent::TimerTask.new( | ||
| execution_interval: 30 | ||
| ) do | ||
| # push empty request every 30 seconds to keep stream alive | ||
| unless inventory.empty? | ||
| subscriber.service.logger.log :info, "subscriber-streams" do | ||
| "sending keepAlive to stream for subscription #{@subscriber.subscription_name}" | ||
| end | ||
| push Google::Cloud::PubSub::V1::StreamingPullRequest.new | ||
| end | ||
| end.execute | ||
| @keepalive_interval = Float(ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] || 30) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should be exposing these test-only environment variables (e.g.
|
||
| @pong_deadline = Float(ENV["PUBSUB_TEST_PONG_DEADLINE"] || 15) | ||
| @last_ping_at = nil | ||
| @last_pong_at = nil | ||
| @stream_opened = false | ||
| @reconnect_delay = nil | ||
|
|
||
| @stream_keepalive_task = nil | ||
| @pong_monitor_task = nil | ||
| end | ||
|
|
||
| def start | ||
| synchronize do | ||
| break if @background_thread | ||
|
|
||
| @inventory.start | ||
| @stream_keepalive_task = Concurrent::TimerTask.new( | ||
| execution_interval: @keepalive_interval | ||
| ) { send_keepalive_ping! } | ||
| @stream_keepalive_task.execute | ||
|
|
||
| @pong_monitor_task = Concurrent::TimerTask.new( | ||
| execution_interval: [@keepalive_interval / 5.0, 0.01].max | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be great to have some super light comments on logic / why we use certain values. |
||
| ) { check_liveness! } | ||
| @pong_monitor_task.execute | ||
|
|
||
| start_streaming! | ||
| end | ||
|
|
@@ -108,6 +116,11 @@ def stop | |
| @stopped = true | ||
| @pause_cond.broadcast | ||
|
|
||
| @stream_keepalive_task&.shutdown | ||
| @stream_keepalive_task = nil | ||
| @pong_monitor_task&.shutdown | ||
| @pong_monitor_task = nil | ||
|
|
||
| # Now that the reception thread is stopped, immediately stop the | ||
| # callback thread pool. All queued callbacks will see the stream | ||
| # is stopped and perform a noop. | ||
|
|
@@ -219,6 +232,13 @@ class RestartStream < StandardError; end | |
|
|
||
| # rubocop:disable all | ||
|
|
||
| def backoff_and_wait! | ||
| @reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0 | ||
| synchronize do | ||
| @pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a subtle timing gotcha here. When the stream disconnects due to a network error (e.g. If the This asynchronous exception immediately interrupts the sleep. The background thread wakes up early, exits the current rescue block, gets rescued by To prevent the monitor task from interrupting our backoff delays and causing double-backoffs, we should set def backoff_and_wait!
@reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0
synchronize do
@stream_opened = false # <--- Disable liveness checker during backoff
@pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped
end
end |
||
| end | ||
| end | ||
|
|
||
| def background_run | ||
| synchronize do | ||
| # Don't allow a stream to restart if already stopped | ||
|
|
@@ -245,11 +265,21 @@ def background_run | |
|
|
||
| # Call the StreamingPull API to get the response enumerator | ||
| options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } } | ||
| synchronize do | ||
| @stream_opened = false | ||
| end | ||
| enum = @subscriber.service.streaming_pull @request_queue.each, options | ||
| subscriber.service.logger.log :info, "subscriber-streams" do | ||
| "rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened" | ||
| end | ||
|
|
||
| synchronize do | ||
| now = Process.clock_gettime(Process::CLOCK_MONOTONIC) | ||
| @last_ping_at = now | ||
| @last_pong_at = now | ||
| @stream_opened = true | ||
| end | ||
|
|
||
| loop do | ||
| synchronize do | ||
| if @paused && !@stopped | ||
|
|
@@ -264,8 +294,17 @@ def background_run | |
| begin | ||
| # Cannot synchronize the enumerator, causes deadlock | ||
| response = enum.next | ||
| new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled | ||
| synchronize do | ||
| @last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) | ||
| # Reset backoff delay only after successfully reading a frame from enum.next. | ||
| # If the connection drops immediately upon reading, @reconnect_delay is preserved. | ||
| @reconnect_delay = nil | ||
| end | ||
| received_messages = response.received_messages | ||
| # Skip processing properties and inventory on Pong frames (empty received_messages). | ||
| # Subscription properties on keep-alive Pongs are not valid. | ||
| next if received_messages.empty? | ||
| new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled | ||
|
|
||
| # Use synchronize so changes happen atomically | ||
| synchronize do | ||
|
|
@@ -310,18 +349,21 @@ def background_run | |
| "#{status_code}; will be retried." | ||
| end | ||
| # Restart the stream with an incremental back for a retriable error. | ||
| backoff_and_wait! | ||
| retry | ||
| rescue RestartStream | ||
| subscriber.service.logger.log :info, "subscriber-streams" do | ||
| "Subscriber stream for subscription #{@subscriber.subscription_name} has ended; will be retried." | ||
| end | ||
| backoff_and_wait! | ||
| retry | ||
| rescue StandardError => e | ||
| subscriber.service.logger.log :error, "subscriber-streams" do | ||
| "error on stream for subscription #{@subscriber.subscription_name}: #{e.inspect}" | ||
| end | ||
| @subscriber.error! e | ||
|
|
||
| backoff_and_wait! | ||
| retry | ||
| end | ||
|
|
||
|
|
@@ -416,15 +458,45 @@ def pause_streaming? | |
| @inventory.full? | ||
| end | ||
|
|
||
| def send_keepalive_ping! | ||
| synchronize do | ||
| if @stream_opened && !@stopped && @request_queue | ||
| subscriber.service.logger.log :info, "subscriber-streams" do | ||
| "sending keepAlive to stream for subscription #{@subscriber.subscription_name}" | ||
| end | ||
| @last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at | ||
| push Google::Cloud::PubSub::V1::StreamingPullRequest.new | ||
| end | ||
| end | ||
| end | ||
|
|
||
| def check_liveness! | ||
| synchronize do | ||
| if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we may have lost a good multi-line comment that was above this long conditional. I'm always happy to see comments in concurrent code, explaining what is happening and why. In my opinion you can sprinkle more "play by play" comments throughout this production code. |
||
| now = Process.clock_gettime(Process::CLOCK_MONOTONIC) | ||
| if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at | ||
| subscriber.service.logger.log :error, "subscriber-streams" do | ||
| "Keep-alive pong not received within #{@pong_deadline}s; restarting stream." | ||
| end | ||
| @stream_opened = false | ||
| @background_thread&.raise RestartStream | ||
| end | ||
| end | ||
| end | ||
| end | ||
|
|
||
| def unpause_streaming! | ||
| return unless unpause_streaming? | ||
| synchronize do | ||
| return unless unpause_streaming? | ||
|
|
||
| @paused = nil | ||
| subscriber.service.logger.log :info, "subscriber-flow-control" do | ||
| "subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control" | ||
| @paused = nil | ||
| @last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) | ||
| subscriber.service.logger.log :info, "subscriber-flow-control" do | ||
| "subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control" | ||
| end | ||
| # signal to the background thread that we are unpaused | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Super nit, but capitalize like other comments. |
||
| @pause_cond.broadcast | ||
| end | ||
| # signal to the background thread that we are unpaused | ||
| @pause_cond.broadcast | ||
| end | ||
|
|
||
| def unpause_streaming? | ||
|
|
@@ -443,6 +515,7 @@ def initial_input_request | |
| req.client_id = @subscriber.service.client_id | ||
| req.max_outstanding_messages = @inventory.limit | ||
| req.max_outstanding_bytes = @inventory.bytesize | ||
| req.protocol_version = 1 | ||
| end | ||
| end | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| # Copyright 2026 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # https://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| require "helper" | ||
|
|
||
| describe Google::Cloud::PubSub::MessageListener, :bug_regression, :mock_pubsub do | ||
| let(:topic_name) { "topic-name-goes-here" } | ||
| let(:sub_name) { "subscription-name-goes-here" } | ||
| let(:sub_hash) { subscription_hash topic_name, sub_name } | ||
| let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) } | ||
| let(:subscriber) { Google::Cloud::PubSub::Subscriber.from_grpc sub_grpc, pubsub.service } | ||
|
|
||
| it "b/528401453: waits for exponential backoff before retrying on GRPC::Unavailable" do | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rather not leave the bug IDs unless needed. We should only really use as reminder for follow-ups. |
||
| attempts = [] | ||
| pull_res = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [] | ||
| response_groups = [[GRPC::Unavailable.new("simulated disconnect")], [pull_res]] | ||
| stub = StreamingPullStub.new response_groups | ||
| def stub.streaming_pull_internal request, options = nil | ||
| @attempts ||= [] | ||
| @attempts << Process.clock_gettime(Process::CLOCK_MONOTONIC) | ||
| super | ||
| end | ||
| stub.instance_variable_set(:@attempts, attempts) | ||
| subscriber.service.mocked_subscription_admin = stub | ||
|
|
||
| listener = subscriber.listen streams: 1 do |msg| | ||
| end | ||
| listener.start | ||
|
|
||
| retries = 0 | ||
| until attempts.count >= 2 | ||
| fail "stream did not retry" if retries > 200 | ||
| retries += 1 | ||
| sleep 0.05 | ||
| end | ||
|
|
||
| listener.stop | ||
| listener.wait! | ||
|
|
||
| elapsed = attempts[1] - attempts[0] | ||
| puts "\n[b/528401453 Test] Elapsed delay between attempts: #{elapsed.round(3)}s" | ||
| _(elapsed).must_be :>=, 1.0 | ||
| end | ||
|
|
||
| it "b/528404815: shuts down keepalive TimerTask when stream is stopped" do | ||
| pull_res = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [] | ||
| stub = StreamingPullStub.new [[pull_res]] | ||
| subscriber.service.mocked_subscription_admin = stub | ||
|
|
||
| listener = subscriber.listen streams: 1 do |msg| | ||
| end | ||
| listener.start | ||
| sleep 0.1 | ||
| listener.stop | ||
| listener.wait! | ||
|
|
||
| stream = listener.instance_variable_get(:@stream_pool).first | ||
| keepalive_task = stream.instance_variable_get(:@stream_keepalive_task) | ||
| puts "\n[b/528404815 Test] Keepalive task running state after stop: #{keepalive_task&.running? || false}" | ||
| _(keepalive_task.nil? || !keepalive_task.running?).must_equal true | ||
| end | ||
| end | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please look at other examples in our libraries, but I believe we should split these into multiple lines. It would be great to add docstrings as well.