Skip to content

Unitree connection module refactor#2643

Open
mustafab0 wants to merge 8 commits into
mainfrom
mustafa/unitree-connection-refactor
Open

Unitree connection module refactor#2643
mustafab0 wants to merge 8 commits into
mainfrom
mustafa/unitree-connection-refactor

Conversation

@mustafab0

Copy link
Copy Markdown
Contributor

Problem

Multiple connection.py files exist in the robot/unitree folder each doing its thing + a lot of dependency on each other.
Closes DIM-XXX

Solution

We rename files for less confusion and also refactor to make holistic connection module for go2 and g1

How to Test

all unitree-go2 and g1 blueprints should run without any problem

Contributor License Agreement

  • [ x] I have read and approved the CLA.

@greptile-apps

greptile-apps Bot commented Jun 28, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR refactors the Unitree connection layer by splitting the monolithic connection.py into three files: a generic UnitreeWebRTCConnection transport base (unitree_webrtc.py) and per-robot subclasses Go2WebRTCConnection (go2/go2_webrtc.py) and G1WebRTCConnection (g1/g1_webrtc.py). All callers are updated to import from the new locations.

  • New TwistMode enum lets blueprints choose between true SPORT Move velocity commands and legacy joystick-axis mode; rage mode automatically overrides to joystick when active.
  • Auto-stop timer is migrated from threading.Timer to asyncio.call_later on the event loop, eliminating a per-command thread.
  • publish() helper centralises thread-safe datachannel sends with correct on-loop / off-loop detection; disconnect() is removed and stop() now handles full teardown.
  • Test suite is substantially expanded to cover twist-mode dispatch, rage-mode state tracking, auto-stop behaviour, and safety-zero on stop().

Confidence Score: 4/5

The refactor is structurally sound and most paths are safe, but the subclass stop() overrides introduce a deadlock if stop() is called more than once.

Both Go2WebRTCConnection.stop() and G1WebRTCConnection.stop() call publish() before the base class guards run. When the event loop is already stopped, publish() reaches asyncio.run_coroutine_threadsafe(...).result() with no timeout; the future is never resolved and the calling thread hangs indefinitely. The base class stop() was specifically designed and tested to be idempotent, but the subclass overrides break that property on every double-stop path.

dimos/robot/unitree/go2/go2_webrtc.py and dimos/robot/unitree/g1/g1_webrtc.py — the stop() overrides in both files need a self.loop.is_running() guard before calling publish().

Important Files Changed

Filename Overview
dimos/robot/unitree/unitree_webrtc.py New base transport class: clean loop+thread lifecycle, thread-safe publish/subscribe API. Minor docstring inaccuracy on publish(); the off-loop path blocks without a timeout.
dimos/robot/unitree/go2/go2_webrtc.py Go2-specific WebRTC subclass with TwistMode, rage-mode tracking, and loop-timer auto-stop. stop() override calls publish() before checking loop state, which deadlocks on a second stop() call.
dimos/robot/unitree/g1/g1_webrtc.py G1-specific WebRTC subclass. Contains the same stop() idempotency issue as Go2WebRTCConnection — _publish_joystick is called before loop-is-running is checked.
dimos/robot/unitree/go2/connection.py Module layer updated to use Go2WebRTCConnection + TwistMode; ReplayConnection.connect() is a no-op so the base init never runs and loop/conn are never set — safe because stop() calls CompositeResource.stop() directly.
dimos/robot/unitree/go2/test_connection.py Significantly expanded test suite covering TwistMode dispatch, rage-mode state tracking, auto-stop timer behaviour, and safety zero on stop().
dimos/robot/unitree/test_unitree_webrtc.py Renamed from test_connection.py; adds publish/publish-with-msg_type and idempotent-stop tests. Removes test_empty_string_key_forwarded_as_falsy, which had no functional replacement.

Class Diagram

%%{init: {'theme': 'neutral'}}%%
classDiagram
    class Resource {
        +start()
        +stop()
    }
    class UnitreeWebRTCConnection {
        +ip: str
        +mode: str
        +conn: LegionConnection
        +loop: asyncio.EventLoop
        +thread: Thread
        +connect()
        +stop()
        +publish(topic, data, msg_type)
        +publish_request(topic, data)
        +subscribe(topic_name)
    }
    class Go2WebRTCConnection {
        +twist_mode: TwistMode
        +_move_seq: int
        +_rage_active: bool
        +_stop_handle: TimerHandle
        +move(twist, duration)
        +stop()
        +set_rage_mode(enable)
        +raw_video_stream()
        +lidar_stream()
        +odom_stream()
    }
    class G1WebRTCConnection {
        +_stop_handle: TimerHandle
        +move(twist, duration)
        +stop()
        +standup()
        +liedown()
    }
    class ReplayConnection {
        +dataset: str
        +connect()
        +stop()
        +lidar_stream()
        +odom_stream()
    }
    class CompositeResource {
        +stop()
    }

    Resource <|-- UnitreeWebRTCConnection
    UnitreeWebRTCConnection <|-- Go2WebRTCConnection
    UnitreeWebRTCConnection <|-- G1WebRTCConnection
    Go2WebRTCConnection <|-- ReplayConnection
    CompositeResource <|-- ReplayConnection
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
classDiagram
    class Resource {
        +start()
        +stop()
    }
    class UnitreeWebRTCConnection {
        +ip: str
        +mode: str
        +conn: LegionConnection
        +loop: asyncio.EventLoop
        +thread: Thread
        +connect()
        +stop()
        +publish(topic, data, msg_type)
        +publish_request(topic, data)
        +subscribe(topic_name)
    }
    class Go2WebRTCConnection {
        +twist_mode: TwistMode
        +_move_seq: int
        +_rage_active: bool
        +_stop_handle: TimerHandle
        +move(twist, duration)
        +stop()
        +set_rage_mode(enable)
        +raw_video_stream()
        +lidar_stream()
        +odom_stream()
    }
    class G1WebRTCConnection {
        +_stop_handle: TimerHandle
        +move(twist, duration)
        +stop()
        +standup()
        +liedown()
    }
    class ReplayConnection {
        +dataset: str
        +connect()
        +stop()
        +lidar_stream()
        +odom_stream()
    }
    class CompositeResource {
        +stop()
    }

    Resource <|-- UnitreeWebRTCConnection
    UnitreeWebRTCConnection <|-- Go2WebRTCConnection
    UnitreeWebRTCConnection <|-- G1WebRTCConnection
    Go2WebRTCConnection <|-- ReplayConnection
    CompositeResource <|-- ReplayConnection
Loading

Comments Outside Diff (1)

  1. dimos/robot/unitree/go2/go2_webrtc.py, line 925-932 (link)

    P1 stop() deadlocks on second call — loop-stopped path blocks forever

    _send_twist(0, 0, 0)_publish_joystickpublish()asyncio.run_coroutine_threadsafe(_acoro(), self.loop).result() (line 129 of unitree_webrtc.py). When stop() is called a second time the loop is already stopped but not closed, so call_soon_threadsafe queues the coroutine without draining it and .result() (which has no timeout) blocks forever. The try/except Exception wrapper cannot catch an infinite wait. The base class UnitreeWebRTCConnection.stop() is intentionally idempotent (if self.loop.is_running() guards all loop access), but this override breaks that guarantee by touching publish() before calling super().stop(). The same pattern is present in G1WebRTCConnection.stop() (line 638 of g1_webrtc.py).

    To fix, guard the pre-stop send with a self.loop.is_running() check, consistent with the base class.

Reviews (4): Last reviewed commit: "cleanup after rebase" | Re-trigger Greptile

Comment thread dimos/robot/unitree/unitree_webrtc.py Outdated
Comment thread dimos/robot/unitree/unitree_webrtc.py Outdated
Comment thread dimos/robot/unitree/go2/go2_webrtc.py Outdated
Comment thread dimos/robot/unitree/unitree_webrtc.py
@codecov

codecov Bot commented Jun 28, 2026

Copy link
Copy Markdown

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
2394 1 2393 67
View the top 1 failed test(s) by shortest run time
dimos.protocol.pubsub.test_spec::test_high_volume_messages[ros_context-topic1-values1]
Stack Traces | 3.95s run time
pubsub_context = <function ros_context at 0x75f6dc58a520>
topic = RawROSTopic(topic='/test_ros_topic', ros_type=<class 'geometry_msgs.msg._vector3.Vector3'>, qos=<rclpy.qos.QoSProfile object at 0x75f6dc5a2960>)
values = [geometry_msgs.msg.Vector3(x=1.0, y=2.0, z=3.0), geometry_msgs.msg.Vector3(x=4.0, y=5.0, z=6.0), geometry_msgs.msg.Vector3(x=7.0, y=8.0, z=9.0)]

    @pytest.mark.self_hosted
    @pytest.mark.skipif_macos_bug
    @pytest.mark.parametrize("pubsub_context, topic, values", testdata)
    def test_high_volume_messages(
        pubsub_context: Callable[[], Any], topic: Any, values: list[Any]
    ) -> None:
        """Test that all 5k messages are received correctly.
        Limited to 5k because ros transport cannot handle more.
        Might want to have separate expectations per transport later
        """
        with pubsub_context() as x:
            # Create a list to capture received messages
            received_messages: list[Any] = []
            last_message_time = [time.time()]  # Use list to allow modification in callback
    
            # Define callback function
            def callback(message: Any, topic: Any) -> None:
                received_messages.append(message)
                last_message_time[0] = time.time()
    
            # Subscribe to the topic
            x.subscribe(topic, callback)
    
            # Publish 5000 messages
            num_messages = 5000
            for _ in range(num_messages):
                x.publish(topic, values[0])
    
            # Wait until no messages received for 0.5 seconds
            timeout = 2.0  # Maximum time to wait
            stable_duration = 0.1  # Time without new messages to consider done
            start_time = time.time()
    
            while time.time() - start_time < timeout:
                if time.time() - last_message_time[0] >= stable_duration:
                    break
                time.sleep(0.1)
    
            # Capture count and clear list to avoid printing huge list on failure
            received_len = len(received_messages)
            received_messages.clear()
>           assert received_len == num_messages, f"Expected {num_messages} messages, got {received_len}"
E           AssertionError: Expected 5000 messages, got 4566
E           assert 4566 == 5000

_          = 4999
callback   = <function test_high_volume_messages.<locals>.callback at 0x75f6a700df80>
last_message_time = [1782960820.0023646]
num_messages = 5000
pubsub_context = <function ros_context at 0x75f6dc58a520>
received_len = 4566
received_messages = [geometry_msgs.msg.Vector3(x=1.0, y=2.0, z=3.0), geometry_msgs.msg.Vector3(x=1.0, y=2.0, z=3.0), geometry_msgs.msg.Vec....0, y=2.0, z=3.0), geometry_msgs.msg.Vector3(x=1.0, y=2.0, z=3.0), geometry_msgs.msg.Vector3(x=1.0, y=2.0, z=3.0), ...]
stable_duration = 0.1
start_time = 1782960817.8046172
timeout    = 2.0
topic      = RawROSTopic(topic='/test_ros_topic', ros_type=<class 'geometry_msgs.msg._vector3.Vector3'>, qos=<rclpy.qos.QoSProfile object at 0x75f6dc5a2960>)
values     = [geometry_msgs.msg.Vector3(x=1.0, y=2.0, z=3.0), geometry_msgs.msg.Vector3(x=4.0, y=5.0, z=6.0), geometry_msgs.msg.Vector3(x=7.0, y=8.0, z=9.0)]
x          = <dimos.protocol.pubsub.impl.rospubsub.RawROS object at 0x75f6ac2b9b80>

.../protocol/pubsub/test_spec.py:325: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Comment thread dimos/robot/unitree/g1/effectors/high_level/webrtc.py
Comment thread dimos/robot/unitree/unitree_webrtc.py
Comment thread dimos/robot/unitree/go2/go2_webrtc.py Outdated
Comment thread dimos/robot/unitree/g1/g1_webrtc.py
Comment thread dimos/robot/unitree/unitree_webrtc.py
Comment thread dimos/robot/unitree/unitree_webrtc.py Outdated
Comment thread dimos/robot/unitree/go2/go2_webrtc.py Outdated
Comment thread dimos/robot/unitree/g1/g1_webrtc.py Outdated
@mustafab0 mustafab0 force-pushed the mustafa/unitree-connection-refactor branch 2 times, most recently from 5db442f to 33f904f Compare July 1, 2026 18:06
@mustafab0 mustafab0 added the backport:skip Skip creating a backport to any release branches label Jul 1, 2026
@mustafab0 mustafab0 force-pushed the mustafa/unitree-connection-refactor branch from 33f904f to 2a29be8 Compare July 2, 2026 02:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport:skip Skip creating a backport to any release branches

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants