diff --git a/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs b/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs index f8c5a00e..112bc95d 100644 --- a/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs +++ b/wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs @@ -38,8 +38,6 @@ class Connection volatile int pingInterval; volatile uint lastPingTime; CancellationTokenSource pingerDelayCanceller; - SemaphoreSlim sendSemaphore; - bool closed; TaskCompletionSource senderTaskSource; TaskCompletionSource pingerTaskSource; @@ -49,6 +47,77 @@ class Connection Logger logger; + class WebSockConn : IDisposable + { + ClientWebSocket client; + SemaphoreSlim sendSemaphore; + bool closed; + + public WebSockConn(ClientWebSocket client) + { + this.client = client; + this.sendSemaphore = new SemaphoreSlim(1, 1); + this.closed = false; + } + + public void Dispose() + { + client.Dispose(); + } + + public Task ReceiveAsync(ArraySegment seg, CancellationToken ct) + { + return client.ReceiveAsync(seg, ct); + } + + public async Task Send(ArraySegment msg, CancellationToken ct) + { + if (closed) + { + // SendCloseがsemaphore握りっぱなしになる対策で先にチェック + // ここすり抜けてsemaphore待ってしまったらご愁傷さま…… + return; + } + + await sendSemaphore.WaitAsync(ct); + try + { + if (closed) + { + return; + } + + await client.SendAsync(msg, WebSocketMessageType.Binary, true, ct); + } + finally + { + sendSemaphore.Release(); + } + } + + public async Task SendClose(string msg, CancellationToken ct) + { + await sendSemaphore.WaitAsync(ct); + try + { + if (closed) + { + return; + } + + closed = true; + + var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + cts.CancelAfter(SendCloseTimeout); + await client.CloseAsync(WebSocketCloseStatus.NormalClosure, msg, cts.Token); + } + finally + { + sendSemaphore.Release(); + } + } + } + /// /// コンストラクタ /// @@ -66,8 +135,6 @@ public Connection(Room room, string clientId, HMAC hmac, JoinedRoom joined, Logg this.hmac = hmac; this.pingInterval = calcPingInterval(room.ClientDeadline); this.pingerDelayCanceller = new CancellationTokenSource(); - this.sendSemaphore = new SemaphoreSlim(1, 1); - this.closed = false; this.evSeqNum = 0; this.evBufPool = new BlockingCollection( @@ -214,7 +281,7 @@ public void UpdatePingInterval(uint deadline) /// /// Websocketで接続する /// - private async Task Connect(CancellationToken ct) + private async Task Connect(CancellationToken ct) { var ws = new ClientWebSocket(); var authdata = authgen.GenerateBearer(authKey, clientId); @@ -232,13 +299,13 @@ private async Task Connect(CancellationToken ct) SetTcpNoDelay(ws); logger?.Info("connected"); - return ws; + return new WebSockConn(ws); } /// /// Event受信ループ /// - private async Task Receiver(ClientWebSocket ws, CancellationToken ct) + private async Task Receiver(WebSockConn ws, CancellationToken ct) { try { @@ -295,7 +362,7 @@ private async Task Receiver(ClientWebSocket ws, CancellationToken ct) /// /// Eventの受信 /// - private async Task ReceiveEvent(ClientWebSocket ws, CancellationToken ct) + private async Task ReceiveEvent(WebSockConn ws, CancellationToken ct) { var buf = evBufPool.Take(ct); try @@ -310,7 +377,7 @@ private async Task ReceiveEvent(ClientWebSocket ws, CancellationToken ct) { // iOSでごく稀にws.CloseAsync()が返ってこないことがあるので別Taskで実行 // Semaphoreを握りっぱなしになるけどこの接続は終了するので基本的には問題ない - Task.Run(async () => await SendClose(ws, ret.CloseStatusDescription, ct)); + Task.Run(async () => await ws.SendClose(ret.CloseStatusDescription, ct)); switch (ret.CloseStatus) { @@ -354,11 +421,15 @@ private async Task ReceiveEvent(ClientWebSocket ws, CancellationToken ct) /// /// 開始Msg通し番号 /// ループ停止するトークン - private async Task Sender(ClientWebSocket ws, int seqNum, CancellationToken ct) + private async Task Sender(WebSockConn ws, int seqNum, CancellationToken ct) { while (true) { - msgPool.Wait(ct); + if (!msgPool.Wait(ct)) + { + // ctがキャンセルされているとき falseが返るので終了 + return; + } ArraySegment? msg; while ((msg = msgPool.Take(seqNum)).HasValue) @@ -368,7 +439,8 @@ private async Task Sender(ClientWebSocket ws, int seqNum, CancellationToken ct) return; // ctのキャンセルで終了 } - await Send(ws, msg.Value, ct); + NetworkInformer.OnRoomSend(room, msg.Value); + await ws.Send(msg.Value, ct); seqNum++; } } @@ -377,7 +449,7 @@ private async Task Sender(ClientWebSocket ws, int seqNum, CancellationToken ct) /// /// Ping送信ループ /// - private async Task Pinger(ClientWebSocket ws, CancellationToken ct) + private async Task Pinger(WebSockConn ws, CancellationToken ct) { var msg = new MsgPing(hmac); @@ -391,7 +463,8 @@ private async Task Pinger(ClientWebSocket ws, CancellationToken ct) var interval = Task.Delay(pingInterval, pingerDelayCanceller.Token); var time = (uint)msg.SetTimestamp(); lastPingTime = time; - await Send(ws, msg.Value, ct); + NetworkInformer.OnRoomSend(room, msg.Value); + await ws.Send(msg.Value, ct); try { await interval; @@ -408,57 +481,6 @@ private async Task Pinger(ClientWebSocket ws, CancellationToken ct) } } - /// - /// websocketメッセージを送信 - /// - private async Task Send(ClientWebSocket ws, ArraySegment msg, CancellationToken ct) - { - if (closed) - { - // SendCloseがsemaphore握りっぱなしになる対策で先にチェック - // ここすり抜けてsemaphore待ってしまったらご愁傷さま…… - return; - } - - await sendSemaphore.WaitAsync(ct); - try - { - if (closed) - { - return; - } - - NetworkInformer.OnRoomSend(room, msg); - await ws.SendAsync(msg, WebSocketMessageType.Binary, true, ct); - } - finally - { - sendSemaphore.Release(); - } - } - - private async Task SendClose(ClientWebSocket ws, string msg, CancellationToken ct) - { - await sendSemaphore.WaitAsync(ct); - try - { - if (closed) - { - return; - } - - closed = true; - - var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); - cts.CancelAfter(SendCloseTimeout); - await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, msg, cts.Token); - } - finally - { - sendSemaphore.Release(); - } - } - /// /// Pong受信時の処理 /// diff --git a/wsnet2-unity/Assets/WSNet2/Scripts/Core/Msg/MsgPool.cs b/wsnet2-unity/Assets/WSNet2/Scripts/Core/Msg/MsgPool.cs index 3d654957..cca0f0fc 100644 --- a/wsnet2-unity/Assets/WSNet2/Scripts/Core/Msg/MsgPool.cs +++ b/wsnet2-unity/Assets/WSNet2/Scripts/Core/Msg/MsgPool.cs @@ -56,14 +56,25 @@ public MsgPool(int poolSize, int initialBufSize, HMAC hmac) /// /// Msgが来るまで待つ /// + /// + /// ctがキャンセルされている古いSenderから呼ばれていたときはfalse + /// /// /// /// スレッドをブロックする /// /// - public void Wait(CancellationToken ct) + public bool Wait(CancellationToken ct) { _ = hasMsg.Take(ct); + + if (ct.IsCancellationRequested) { + // 古いConnectionから呼ばれていたとき、hasMsgを戻してfalseを返す + hasMsg.TryAdd(true); + return false; + } + + return true; } ///