From c5e79645c29684e422ed9d60bde170eadda6be1e Mon Sep 17 00:00:00 2001 From: Daisuke MAKIUCHI Date: Wed, 17 Jun 2026 13:06:56 +0900 Subject: [PATCH 1/3] =?UTF-8?q?=E5=86=8D=E6=8E=A5=E7=B6=9A=E6=99=82?= =?UTF-8?q?=E3=81=AB=E5=8F=A4=E3=81=84Sender=E3=81=8CmsgPool=E3=82=92?= =?UTF-8?q?=E6=8E=B4=E3=82=93=E3=81=A7=E3=81=97=E3=81=BE=E3=81=86=E3=82=B1?= =?UTF-8?q?=E3=83=BC=E3=82=B9=E3=81=8C=E3=81=82=E3=82=8B=E3=81=AE=E3=81=A7?= =?UTF-8?q?=E5=AF=BE=E7=AD=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Assets/WSNet2/Scripts/Core/Connection.cs | 6 +++++- .../Assets/WSNet2/Scripts/Core/Msg/MsgPool.cs | 13 ++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs b/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs index f8c5a00e..5c15a506 100644 --- a/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs +++ b/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs @@ -358,7 +358,11 @@ private async Task Sender(ClientWebSocket ws, int seqNum, CancellationToken ct) { while (true) { - msgPool.Wait(ct); + if (!msgPool.Wait(ct)) + { + // ctがキャンセルされているとき falseが返るので終了 + return; + } ArraySegment? msg; while ((msg = msgPool.Take(seqNum)).HasValue) diff --git a/wsnet2-unity/Assets/WSNet2/Scripts/Core/Msg/MsgPool.cs b/wsnet2-unity/Assets/WSNet2/Scripts/Core/Msg/MsgPool.cs index 3d654957..cca0f0fc 100644 --- a/wsnet2-unity/Assets/WSNet2/Scripts/Core/Msg/MsgPool.cs +++ b/wsnet2-unity/Assets/WSNet2/Scripts/Core/Msg/MsgPool.cs @@ -56,14 +56,25 @@ public MsgPool(int poolSize, int initialBufSize, HMAC hmac) /// /// Msgが来るまで待つ /// + /// + /// ctがキャンセルされている古いSenderから呼ばれていたときはfalse + /// /// /// /// スレッドをブロックする /// /// - public void Wait(CancellationToken ct) + public bool Wait(CancellationToken ct) { _ = hasMsg.Take(ct); + + if (ct.IsCancellationRequested) { + // 古いConnectionから呼ばれていたとき、hasMsgを戻してfalseを返す + hasMsg.TryAdd(true); + return false; + } + + return true; } /// From 57143cdfa02619fa15410c4321bebba2ed5c04d6 Mon Sep 17 00:00:00 2001 From: Daisuke MAKIUCHI Date: Tue, 23 Jun 2026 12:05:55 +0900 Subject: [PATCH 2/3] =?UTF-8?q?client:=20Connection=E3=81=AEsendSemaphore,?= =?UTF-8?q?=20closed=E3=82=92WebSocket=E6=8E=A5=E7=B6=9A=E6=AF=8E=E3=81=AB?= =?UTF-8?q?=E6=8C=81=E3=81=A4=E3=82=88=E3=81=86=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Assets/WSNet2/Scripts/Core/Connection.cs | 61 ++++++++++++------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs b/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs index 5c15a506..95a96a4b 100644 --- a/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs +++ b/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs @@ -38,8 +38,6 @@ class Connection volatile int pingInterval; volatile uint lastPingTime; CancellationTokenSource pingerDelayCanceller; - SemaphoreSlim sendSemaphore; - bool closed; TaskCompletionSource senderTaskSource; TaskCompletionSource pingerTaskSource; @@ -49,6 +47,25 @@ class Connection Logger logger; + class WebSockConn : IDisposable + { + public ClientWebSocket client; + public SemaphoreSlim sendSemaphore; + public bool closed; + + public WebSockConn(ClientWebSocket client) + { + this.client = client; + this.sendSemaphore = new SemaphoreSlim(1, 1); + this.closed = false; + } + + public void Dispose() + { + client.Dispose(); + } + } + /// /// コンストラクタ /// @@ -66,8 +83,6 @@ public Connection(Room room, string clientId, HMAC hmac, JoinedRoom joined, Logg this.hmac = hmac; this.pingInterval = calcPingInterval(room.ClientDeadline); this.pingerDelayCanceller = new CancellationTokenSource(); - this.sendSemaphore = new SemaphoreSlim(1, 1); - this.closed = false; this.evSeqNum = 0; this.evBufPool = new BlockingCollection( @@ -214,7 +229,7 @@ public void UpdatePingInterval(uint deadline) /// /// Websocketで接続する /// - private async Task Connect(CancellationToken ct) + private async Task Connect(CancellationToken ct) { var ws = new ClientWebSocket(); var authdata = authgen.GenerateBearer(authKey, clientId); @@ -232,13 +247,13 @@ private async Task Connect(CancellationToken ct) SetTcpNoDelay(ws); logger?.Info("connected"); - return ws; + return new WebSockConn(ws); } /// /// Event受信ループ /// - private async Task Receiver(ClientWebSocket ws, CancellationToken ct) + private async Task Receiver(WebSockConn ws, CancellationToken ct) { try { @@ -295,7 +310,7 @@ private async Task Receiver(ClientWebSocket ws, CancellationToken ct) /// /// Eventの受信 /// - private async Task ReceiveEvent(ClientWebSocket ws, CancellationToken ct) + private async Task ReceiveEvent(WebSockConn ws, CancellationToken ct) { var buf = evBufPool.Take(ct); try @@ -304,7 +319,7 @@ private async Task ReceiveEvent(ClientWebSocket ws, CancellationToken ct) while (true) { var seg = new ArraySegment(buf, pos, buf.Length - pos); - var ret = await ws.ReceiveAsync(seg, ct); + var ret = await ws.client.ReceiveAsync(seg, ct); if (ret.MessageType == WebSocketMessageType.Close) { @@ -354,7 +369,7 @@ private async Task ReceiveEvent(ClientWebSocket ws, CancellationToken ct) /// /// 開始Msg通し番号 /// ループ停止するトークン - private async Task Sender(ClientWebSocket ws, int seqNum, CancellationToken ct) + private async Task Sender(WebSockConn ws, int seqNum, CancellationToken ct) { while (true) { @@ -381,7 +396,7 @@ private async Task Sender(ClientWebSocket ws, int seqNum, CancellationToken ct) /// /// Ping送信ループ /// - private async Task Pinger(ClientWebSocket ws, CancellationToken ct) + private async Task Pinger(WebSockConn ws, CancellationToken ct) { var msg = new MsgPing(hmac); @@ -415,51 +430,51 @@ private async Task Pinger(ClientWebSocket ws, CancellationToken ct) /// /// websocketメッセージを送信 /// - private async Task Send(ClientWebSocket ws, ArraySegment msg, CancellationToken ct) + private async Task Send(WebSockConn ws, ArraySegment msg, CancellationToken ct) { - if (closed) + if (ws.closed) { // SendCloseがsemaphore握りっぱなしになる対策で先にチェック // ここすり抜けてsemaphore待ってしまったらご愁傷さま…… return; } - await sendSemaphore.WaitAsync(ct); + await ws.sendSemaphore.WaitAsync(ct); try { - if (closed) + if (ws.closed) { return; } NetworkInformer.OnRoomSend(room, msg); - await ws.SendAsync(msg, WebSocketMessageType.Binary, true, ct); + await ws.client.SendAsync(msg, WebSocketMessageType.Binary, true, ct); } finally { - sendSemaphore.Release(); + ws.sendSemaphore.Release(); } } - private async Task SendClose(ClientWebSocket ws, string msg, CancellationToken ct) + private async Task SendClose(WebSockConn ws, string msg, CancellationToken ct) { - await sendSemaphore.WaitAsync(ct); + await ws.sendSemaphore.WaitAsync(ct); try { - if (closed) + if (ws.closed) { return; } - closed = true; + ws.closed = true; var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); cts.CancelAfter(SendCloseTimeout); - await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, msg, cts.Token); + await ws.client.CloseAsync(WebSocketCloseStatus.NormalClosure, msg, cts.Token); } finally { - sendSemaphore.Release(); + ws.sendSemaphore.Release(); } } From 6ac0bca9de143a07c962adc6aab8c12cda861c47 Mon Sep 17 00:00:00 2001 From: Daisuke MAKIUCHI Date: Tue, 23 Jun 2026 12:39:34 +0900 Subject: [PATCH 3/3] =?UTF-8?q?client:=20WebSockConn=E3=81=AE=E3=83=A1?= =?UTF-8?q?=E3=83=B3=E3=83=90=E3=82=92private=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Assets/WSNet2/Scripts/Core/Connection.cs | 119 +++++++++--------- 1 file changed, 61 insertions(+), 58 deletions(-) diff --git a/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs b/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs index 95a96a4b..112bc95d 100644 --- a/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs +++ b/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs @@ -49,9 +49,9 @@ class Connection class WebSockConn : IDisposable { - public ClientWebSocket client; - public SemaphoreSlim sendSemaphore; - public bool closed; + ClientWebSocket client; + SemaphoreSlim sendSemaphore; + bool closed; public WebSockConn(ClientWebSocket client) { @@ -64,6 +64,58 @@ public void Dispose() { client.Dispose(); } + + public Task ReceiveAsync(ArraySegment seg, CancellationToken ct) + { + return client.ReceiveAsync(seg, ct); + } + + public async Task Send(ArraySegment msg, CancellationToken ct) + { + if (closed) + { + // SendCloseがsemaphore握りっぱなしになる対策で先にチェック + // ここすり抜けてsemaphore待ってしまったらご愁傷さま…… + return; + } + + await sendSemaphore.WaitAsync(ct); + try + { + if (closed) + { + return; + } + + await client.SendAsync(msg, WebSocketMessageType.Binary, true, ct); + } + finally + { + sendSemaphore.Release(); + } + } + + public async Task SendClose(string msg, CancellationToken ct) + { + await sendSemaphore.WaitAsync(ct); + try + { + if (closed) + { + return; + } + + closed = true; + + var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + cts.CancelAfter(SendCloseTimeout); + await client.CloseAsync(WebSocketCloseStatus.NormalClosure, msg, cts.Token); + } + finally + { + sendSemaphore.Release(); + } + } } /// @@ -319,13 +371,13 @@ private async Task ReceiveEvent(WebSockConn ws, CancellationToken ct) while (true) { var seg = new ArraySegment(buf, pos, buf.Length - pos); - var ret = await ws.client.ReceiveAsync(seg, ct); + var ret = await ws.ReceiveAsync(seg, ct); if (ret.MessageType == WebSocketMessageType.Close) { // iOSでごく稀にws.CloseAsync()が返ってこないことがあるので別Taskで実行 // Semaphoreを握りっぱなしになるけどこの接続は終了するので基本的には問題ない - Task.Run(async () => await SendClose(ws, ret.CloseStatusDescription, ct)); + Task.Run(async () => await ws.SendClose(ret.CloseStatusDescription, ct)); switch (ret.CloseStatus) { @@ -387,7 +439,8 @@ private async Task Sender(WebSockConn ws, int seqNum, CancellationToken ct) return; // ctのキャンセルで終了 } - await Send(ws, msg.Value, ct); + NetworkInformer.OnRoomSend(room, msg.Value); + await ws.Send(msg.Value, ct); seqNum++; } } @@ -410,7 +463,8 @@ private async Task Pinger(WebSockConn ws, CancellationToken ct) var interval = Task.Delay(pingInterval, pingerDelayCanceller.Token); var time = (uint)msg.SetTimestamp(); lastPingTime = time; - await Send(ws, msg.Value, ct); + NetworkInformer.OnRoomSend(room, msg.Value); + await ws.Send(msg.Value, ct); try { await interval; @@ -427,57 +481,6 @@ private async Task Pinger(WebSockConn ws, CancellationToken ct) } } - /// - /// websocketメッセージを送信 - /// - private async Task Send(WebSockConn ws, ArraySegment msg, CancellationToken ct) - { - if (ws.closed) - { - // SendCloseがsemaphore握りっぱなしになる対策で先にチェック - // ここすり抜けてsemaphore待ってしまったらご愁傷さま…… - return; - } - - await ws.sendSemaphore.WaitAsync(ct); - try - { - if (ws.closed) - { - return; - } - - NetworkInformer.OnRoomSend(room, msg); - await ws.client.SendAsync(msg, WebSocketMessageType.Binary, true, ct); - } - finally - { - ws.sendSemaphore.Release(); - } - } - - private async Task SendClose(WebSockConn ws, string msg, CancellationToken ct) - { - await ws.sendSemaphore.WaitAsync(ct); - try - { - if (ws.closed) - { - return; - } - - ws.closed = true; - - var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); - cts.CancelAfter(SendCloseTimeout); - await ws.client.CloseAsync(WebSocketCloseStatus.NormalClosure, msg, cts.Token); - } - finally - { - ws.sendSemaphore.Release(); - } - } - /// /// Pong受信時の処理 ///