 |
С# Средство разработки на платформе .Net |
28.02.2015, 20:21
|
#1
|
Unity/C# кодер
Регистрация: 03.10.2005
Адрес: Россия, Рязань
Сообщений: 7,568
Написано 3,006 полезных сообщений (для 5,323 пользователей)
|
Асинхронная очередь
Помогите сделать асинхронную очередь. У меня что-то не особо получается.

private objject _thisLocker = new object();
private IAsyncResult _writingAR;
readonly Queue<NetMessage> _sendQueue = new Queue<NetMessage>();
// посылаю сообщение
public void Send(NetMessage msg)
{
if (msg.sended) throw new InvalidOperationException("Message already sended!");
msg.sended = true;
// добавляю сообщение в очередь
Enqueue(msg);
}
// добавление в очередь
internal void Enqueue(NetMessage msg)
{
lock (_thisLocker) // лок, чтобы никто сюда не вошел
{
_sendQueue.Enqueue(msg);
// если не равно null, то идет отправка
if (_writingAR != null)
{
return;
}
// начинаю отправку, если в очереди есть сообщения
if (_sendQueue.Count > 0)
{
var nextMessage = _sendQueue.Dequeue();
// ?????????????????? почему в данный момент _writingAR != null ???????????????????
_writingAR = NetworkStream.BeginWrite(nextMessage.data, 0, nextMessage.Size + NetMessage.HeaderSize, OnEndWrite, nextMessage);
}
}
}
// завершение отправки
private void OnEndWrite(IAsyncResult ar)
{
NetworkStream.EndWrite(ar);
lock (_thisLocker) // лок, чтобы никто сюда не вошел
{
// если в очереди есть еще сообщения, начинаю отправку следующих сообщений, если они есть
if (_sendQueue.Count > 0)
{
var nextMessage = _sendQueue.Dequeue();
_writingAR = NetworkStream.BeginWrite(nextMessage.data, 0, nextMessage.Size + NetMessage.HeaderSize, OnEndWrite, nextMessage);
}
else
{
_writingAR = null;
}
}
}
Пробовал лочить, но не получается. Отправка сообщений происходит не по порядку, т.е. несколько отправок одновременно.
|
(Offline)
|
|
01.03.2015, 11:54
|
#2
|
Бывалый
Регистрация: 19.06.2008
Сообщений: 679
Написано 264 полезных сообщений (для 450 пользователей)
|
Ответ: Асинхронная очередь
Неправильный подход у тебя.
Тебе нужна очередь в которую сообщение будет при отправке добавляться, и отдельный асинхронный воркер который будет забирать из очереди и отправлять.
И отдельный асинхронный воркер, проще всего в отдельном потоке запилить.
ConcurrentQueue
Где-то в коде:
private void Initialize() {
this._queue = new ConcurrentQueue<NetMessage>();
var asyncWorker = new Thread(SomeWorker);
asyncWorker.Start();
}
public void Send(NetMessage msg) {
if (msg.sended) throw new InvalidOperationException("Message already sended!");
msg.sended = true;
this._queue.Enqueue(msg);
}
private void SomeWorker() {
while (true) {
NetMessage msg;
while (!this._queue.TryDequeue(out msg)) {
}
NetworkStream.Write(nextMessage.data, 0, nextMessage.Size + NetMessage.HeaderSize);
}
}
Кстати, какая у тебя целевая платформа ? Если .NET 4.5, то советую вместо BeginWrite/EndWrite использовать await WriteAsync.
__________________
Последний раз редактировалось h1dd3n, 01.03.2015 в 13:03.
|
(Offline)
|
|
Сообщение было полезно следующим пользователям:
|
|
01.03.2015, 13:13
|
#3
|
Unity/C# кодер
Регистрация: 03.10.2005
Адрес: Россия, Рязань
Сообщений: 7,568
Написано 3,006 полезных сообщений (для 5,323 пользователей)
|
Ответ: Асинхронная очередь
Мне вроде удалось решить мою задачу с помощью Interlocked.Exchange

private int _writing; readonly Queue<NetMessage> _sendQueue = new Queue<NetMessage>();
// посылаю сообщение public void Send(NetMessage msg) { if (msg.sended) throw new InvalidOperationException("Message already sended!");
msg.sended = true;
// добавляю сообщение в очередь Enqueue(msg);
}
// добавление в очередь internal void Enqueue(NetMessage msg) { lock (_sendQueue) { _sendQueue.Enqueue(msg); }
SendNext(); }
private void OnEndWrite(IAsyncResult ar) { NetworkStream.EndWrite(ar); Interlocked.Exchange(ref _writing, 0); SendNext(); }
byte[] _sendBuffer = new byte[1024]; readonly List<NetMessage> _messagesToSend = new List<NetMessage>();
internal bool SendNext() { var current = Interlocked.Exchange(ref _writing, 1);
if (current == 0) // если нет отправки, отправляю { int countBytesToSend = 0; int countMesagesToSend = 0;
lock (_sendQueue) { _messagesToSend.Clear();
while (_sendQueue.Count > 0) { var msg = _sendQueue.Dequeue(); _messagesToSend.Add(msg); countMesagesToSend++; countBytesToSend += msg.SizeToSend; if (countBytesToSend > 1024 * 32) { break; } } }
if (countBytesToSend > 0) { if (_sendBuffer.Length < countBytesToSend) { Array.Resize(ref _sendBuffer, countBytesToSend); }
var pos = 0;
for (int i = 0; i < _messagesToSend.Count; i++) { var msg = _messagesToSend[i]; Array.Copy(msg.data, 0, _sendBuffer, pos, msg.SizeToSend); pos += msg.SizeToSend; } NetworkStream.BeginWrite(_sendBuffer, 0, pos, OnEndWrite, null); } else { // нечего слать, значит ничего не отправляю и отправки нет Interlocked.Exchange(ref _writing, 0); } }
return false; }
|
(Offline)
|
|
Эти 3 пользователя(ей) сказали Спасибо pax за это полезное сообщение:
|
|
Ваши права в разделе
|
Вы не можете создавать темы
Вы не можете отвечать на сообщения
Вы не можете прикреплять файлы
Вы не можете редактировать сообщения
HTML код Выкл.
|
|
|
Часовой пояс GMT +4, время: 05:21.
|