Мне вроде удалось решить мою задачу с помощью Interlocked.Exchange
private int _writing;
readonly Queue<NetMessage> _sendQueue = new Queue<NetMessage>();
// посылаю сообщение
public void Send(NetMessage msg)
{
if (msg.sended) throw new InvalidOperationException("Message already sended!");
msg.sended = true;
// добавляю сообщение в очередь
Enqueue(msg);
}
// добавление в очередь
internal void Enqueue(NetMessage msg)
{
lock (_sendQueue)
{
_sendQueue.Enqueue(msg);
}
SendNext();
}
private void OnEndWrite(IAsyncResult ar)
{
NetworkStream.EndWrite(ar);
Interlocked.Exchange(ref _writing, 0);
SendNext();
}
byte[] _sendBuffer = new byte[1024];
readonly List<NetMessage> _messagesToSend = new List<NetMessage>();
internal bool SendNext()
{
var current = Interlocked.Exchange(ref _writing, 1);
if (current == 0) // если нет отправки, отправляю
{
int countBytesToSend = 0;
int countMesagesToSend = 0;
lock (_sendQueue)
{
_messagesToSend.Clear();
while (_sendQueue.Count > 0)
{
var msg = _sendQueue.Dequeue();
_messagesToSend.Add(msg);
countMesagesToSend++;
countBytesToSend += msg.SizeToSend;
if (countBytesToSend > 1024 * 32)
{
break;
}
}
}
if (countBytesToSend > 0)
{
if (_sendBuffer.Length < countBytesToSend)
{
Array.Resize(ref _sendBuffer, countBytesToSend);
}
var pos = 0;
for (int i = 0; i < _messagesToSend.Count; i++)
{
var msg = _messagesToSend[i];
Array.Copy(msg.data, 0, _sendBuffer, pos, msg.SizeToSend);
pos += msg.SizeToSend;
}
NetworkStream.BeginWrite(_sendBuffer, 0, pos, OnEndWrite, null);
}
else
{
// нечего слать, значит ничего не отправляю и отправки нет
Interlocked.Exchange(ref _writing, 0);
}
}
return false;
}