Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 87 additions & 65 deletions wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
volatile int pingInterval;
volatile uint lastPingTime;
CancellationTokenSource pingerDelayCanceller;
SemaphoreSlim sendSemaphore;
bool closed;

TaskCompletionSource<Task> senderTaskSource;
TaskCompletionSource<Task> pingerTaskSource;
Expand All @@ -49,6 +47,77 @@

Logger logger;

class WebSockConn : IDisposable
{
ClientWebSocket client;
SemaphoreSlim sendSemaphore;
bool closed;

public WebSockConn(ClientWebSocket client)
{
this.client = client;
this.sendSemaphore = new SemaphoreSlim(1, 1);
this.closed = false;
}

public void Dispose()
{
client.Dispose();
}

public Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> seg, CancellationToken ct)
{
return client.ReceiveAsync(seg, ct);
}

public async Task Send(ArraySegment<byte> msg, CancellationToken ct)
{
if (closed)
{
// SendCloseがsemaphore握りっぱなしになる対策で先にチェック
// ここすり抜けてsemaphore待ってしまったらご愁傷さま……
return;
}

await sendSemaphore.WaitAsync(ct);
try
{
if (closed)
{
return;
}

await client.SendAsync(msg, WebSocketMessageType.Binary, true, ct);
}
finally
{
sendSemaphore.Release();
}
}

public async Task SendClose(string msg, CancellationToken ct)
{
await sendSemaphore.WaitAsync(ct);
try
{
if (closed)
{
return;
}

closed = true;

var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(SendCloseTimeout);
await client.CloseAsync(WebSocketCloseStatus.NormalClosure, msg, cts.Token);
}
finally
{
sendSemaphore.Release();
}
}
}

/// <summary>
/// コンストラクタ
/// </summary>
Expand All @@ -66,8 +135,6 @@
this.hmac = hmac;
this.pingInterval = calcPingInterval(room.ClientDeadline);
this.pingerDelayCanceller = new CancellationTokenSource();
this.sendSemaphore = new SemaphoreSlim(1, 1);
this.closed = false;

this.evSeqNum = 0;
this.evBufPool = new BlockingCollection<byte[]>(
Expand Down Expand Up @@ -214,7 +281,7 @@
/// <summary>
/// Websocketで接続する
/// </summary>
private async Task<ClientWebSocket> Connect(CancellationToken ct)
private async Task<WebSockConn> Connect(CancellationToken ct)
{
var ws = new ClientWebSocket();
var authdata = authgen.GenerateBearer(authKey, clientId);
Expand All @@ -232,13 +299,13 @@
SetTcpNoDelay(ws);

logger?.Info("connected");
return ws;
return new WebSockConn(ws);
}

/// <summary>
/// Event受信ループ
/// </summary>
private async Task Receiver(ClientWebSocket ws, CancellationToken ct)
private async Task Receiver(WebSockConn ws, CancellationToken ct)
{
try
{
Expand Down Expand Up @@ -295,7 +362,7 @@
/// <summary>
/// Eventの受信
/// </summary>
private async Task<Event> ReceiveEvent(ClientWebSocket ws, CancellationToken ct)
private async Task<Event> ReceiveEvent(WebSockConn ws, CancellationToken ct)
{
var buf = evBufPool.Take(ct);
try
Expand All @@ -310,7 +377,7 @@
{
// iOSでごく稀にws.CloseAsync()が返ってこないことがあるので別Taskで実行
// Semaphoreを握りっぱなしになるけどこの接続は終了するので基本的には問題ない
Task.Run(async () => await SendClose(ws, ret.CloseStatusDescription, ct));
Task.Run(async () => await ws.SendClose(ret.CloseStatusDescription, ct));

Check warning on line 380 in wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs

View workflow job for this annotation

GitHub Actions / dotnet

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 380 in wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs

View workflow job for this annotation

GitHub Actions / dotnet

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 380 in wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs

View workflow job for this annotation

GitHub Actions / dotnet

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 380 in wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs

View workflow job for this annotation

GitHub Actions / dotnet

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 380 in wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs

View workflow job for this annotation

GitHub Actions / dotnet

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 380 in wsnet2-unity/Assets/WSNet2/Scripts/Core/Connection.cs

View workflow job for this annotation

GitHub Actions / dotnet

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

switch (ret.CloseStatus)
{
Expand Down Expand Up @@ -354,11 +421,15 @@
/// </summary>
/// <param name="seqNum">開始Msg通し番号</param>
/// <param name="ct">ループ停止するトークン</param>
private async Task Sender(ClientWebSocket ws, int seqNum, CancellationToken ct)
private async Task Sender(WebSockConn ws, int seqNum, CancellationToken ct)
{
while (true)
{
msgPool.Wait(ct);
if (!msgPool.Wait(ct))
{
// ctがキャンセルされているとき falseが返るので終了
return;
}

ArraySegment<byte>? msg;
while ((msg = msgPool.Take(seqNum)).HasValue)
Expand All @@ -368,7 +439,8 @@
return; // ctのキャンセルで終了
}

await Send(ws, msg.Value, ct);
NetworkInformer.OnRoomSend(room, msg.Value);
await ws.Send(msg.Value, ct);
seqNum++;
}
}
Expand All @@ -377,7 +449,7 @@
/// <summary>
/// Ping送信ループ
/// </summary>
private async Task Pinger(ClientWebSocket ws, CancellationToken ct)
private async Task Pinger(WebSockConn ws, CancellationToken ct)
{
var msg = new MsgPing(hmac);

Expand All @@ -391,7 +463,8 @@
var interval = Task.Delay(pingInterval, pingerDelayCanceller.Token);
var time = (uint)msg.SetTimestamp();
lastPingTime = time;
await Send(ws, msg.Value, ct);
NetworkInformer.OnRoomSend(room, msg.Value);
await ws.Send(msg.Value, ct);
try
{
await interval;
Expand All @@ -408,57 +481,6 @@
}
}

/// <summary>
/// websocketメッセージを送信
/// </summary>
private async Task Send(ClientWebSocket ws, ArraySegment<byte> msg, CancellationToken ct)
{
if (closed)
{
// SendCloseがsemaphore握りっぱなしになる対策で先にチェック
// ここすり抜けてsemaphore待ってしまったらご愁傷さま……
return;
}

await sendSemaphore.WaitAsync(ct);
try
{
if (closed)
{
return;
}

NetworkInformer.OnRoomSend(room, msg);
await ws.SendAsync(msg, WebSocketMessageType.Binary, true, ct);
}
finally
{
sendSemaphore.Release();
}
}

private async Task SendClose(ClientWebSocket ws, string msg, CancellationToken ct)
{
await sendSemaphore.WaitAsync(ct);
try
{
if (closed)
{
return;
}

closed = true;

var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(SendCloseTimeout);
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, msg, cts.Token);
}
finally
{
sendSemaphore.Release();
}
}

/// <summary>
/// Pong受信時の処理
/// </summary>
Expand Down
13 changes: 12 additions & 1 deletion wsnet2-unity/Assets/WSNet2/Scripts/Core/Msg/MsgPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,25 @@ public MsgPool(int poolSize, int initialBufSize, HMAC hmac)
/// <summary>
/// Msgが来るまで待つ
/// </summary>
/// <return>
/// ctがキャンセルされている古いSenderから呼ばれていたときはfalse
/// </return>
/// <remarks>
/// <para>
/// スレッドをブロックする
/// </para>
/// </remarks>
public void Wait(CancellationToken ct)
public bool Wait(CancellationToken ct)
{
_ = hasMsg.Take(ct);

if (ct.IsCancellationRequested) {
// 古いConnectionから呼ばれていたとき、hasMsgを戻してfalseを返す
hasMsg.TryAdd(true);
return false;
}

return true;
}

/// <summary>
Expand Down
Loading