|
С# Средство разработки на платформе .Net |
28.02.2015, 20:21
|
#1
|
Unity/C# кодер
Регистрация: 03.10.2005
Адрес: Россия, Рязань
Сообщений: 7,568
Написано 3,006 полезных сообщений (для 5,323 пользователей)
|
Асинхронная очередь
Помогите сделать асинхронную очередь. У меня что-то не особо получается.
private objject _thisLocker = new object(); private IAsyncResult _writingAR; readonly Queue<NetMessage> _sendQueue = new Queue<NetMessage>(); // посылаю сообщение public void Send(NetMessage msg) { if (msg.sended) throw new InvalidOperationException("Message already sended!"); msg.sended = true; // добавляю сообщение в очередь Enqueue(msg); } // добавление в очередь internal void Enqueue(NetMessage msg) { lock (_thisLocker) // лок, чтобы никто сюда не вошел { _sendQueue.Enqueue(msg); // если не равно null, то идет отправка if (_writingAR != null) { return; } // начинаю отправку, если в очереди есть сообщения if (_sendQueue.Count > 0) { var nextMessage = _sendQueue.Dequeue(); // ?????????????????? почему в данный момент _writingAR != null ??????????????????? _writingAR = NetworkStream.BeginWrite(nextMessage.data, 0, nextMessage.Size + NetMessage.HeaderSize, OnEndWrite, nextMessage); } } } // завершение отправки private void OnEndWrite(IAsyncResult ar) { NetworkStream.EndWrite(ar); lock (_thisLocker) // лок, чтобы никто сюда не вошел { // если в очереди есть еще сообщения, начинаю отправку следующих сообщений, если они есть if (_sendQueue.Count > 0) { var nextMessage = _sendQueue.Dequeue(); _writingAR = NetworkStream.BeginWrite(nextMessage.data, 0, nextMessage.Size + NetMessage.HeaderSize, OnEndWrite, nextMessage); } else { _writingAR = null; } } }
Пробовал лочить, но не получается. Отправка сообщений происходит не по порядку, т.е. несколько отправок одновременно.
|
(Offline)
|
|
01.03.2015, 11:54
|
#2
|
Бывалый
Регистрация: 19.06.2008
Сообщений: 679
Написано 264 полезных сообщений (для 450 пользователей)
|
Ответ: Асинхронная очередь
Неправильный подход у тебя.
Тебе нужна очередь в которую сообщение будет при отправке добавляться, и отдельный асинхронный воркер который будет забирать из очереди и отправлять.
И отдельный асинхронный воркер, проще всего в отдельном потоке запилить.
ConcurrentQueue
Где-то в коде:
private void Initialize() {
this._queue = new ConcurrentQueue<NetMessage>();
var asyncWorker = new Thread(SomeWorker);
asyncWorker.Start();
}
public void Send(NetMessage msg) {
if (msg.sended) throw new InvalidOperationException("Message already sended!");
msg.sended = true;
this._queue.Enqueue(msg);
}
private void SomeWorker() {
while (true) {
NetMessage msg;
while (!this._queue.TryDequeue(out msg)) {
}
NetworkStream.Write(nextMessage.data, 0, nextMessage.Size + NetMessage.HeaderSize);
}
}
Кстати, какая у тебя целевая платформа ? Если .NET 4.5, то советую вместо BeginWrite/EndWrite использовать await WriteAsync.
__________________
Последний раз редактировалось h1dd3n, 01.03.2015 в 13:03.
|
(Offline)
|
|
Сообщение было полезно следующим пользователям:
|
|
01.03.2015, 13:13
|
#3
|
Unity/C# кодер
Регистрация: 03.10.2005
Адрес: Россия, Рязань
Сообщений: 7,568
Написано 3,006 полезных сообщений (для 5,323 пользователей)
|
Ответ: Асинхронная очередь
Мне вроде удалось решить мою задачу с помощью Interlocked.Exchange
private int _writing; readonly Queue<NetMessage> _sendQueue = new Queue<NetMessage>();
// посылаю сообщение public void Send(NetMessage msg) { if (msg.sended) throw new InvalidOperationException("Message already sended!");
msg.sended = true;
// добавляю сообщение в очередь Enqueue(msg);
}
// добавление в очередь internal void Enqueue(NetMessage msg) { lock (_sendQueue) { _sendQueue.Enqueue(msg); }
SendNext(); }
private void OnEndWrite(IAsyncResult ar) { NetworkStream.EndWrite(ar); Interlocked.Exchange(ref _writing, 0); SendNext(); }
byte[] _sendBuffer = new byte[1024]; readonly List<NetMessage> _messagesToSend = new List<NetMessage>();
internal bool SendNext() { var current = Interlocked.Exchange(ref _writing, 1);
if (current == 0) // если нет отправки, отправляю { int countBytesToSend = 0; int countMesagesToSend = 0;
lock (_sendQueue) { _messagesToSend.Clear();
while (_sendQueue.Count > 0) { var msg = _sendQueue.Dequeue(); _messagesToSend.Add(msg); countMesagesToSend++; countBytesToSend += msg.SizeToSend; if (countBytesToSend > 1024 * 32) { break; } } }
if (countBytesToSend > 0) { if (_sendBuffer.Length < countBytesToSend) { Array.Resize(ref _sendBuffer, countBytesToSend); }
var pos = 0;
for (int i = 0; i < _messagesToSend.Count; i++) { var msg = _messagesToSend[i]; Array.Copy(msg.data, 0, _sendBuffer, pos, msg.SizeToSend); pos += msg.SizeToSend; } NetworkStream.BeginWrite(_sendBuffer, 0, pos, OnEndWrite, null); } else { // нечего слать, значит ничего не отправляю и отправки нет Interlocked.Exchange(ref _writing, 0); } }
return false; }
|
(Offline)
|
|
Эти 3 пользователя(ей) сказали Спасибо pax за это полезное сообщение:
|
|
Ваши права в разделе
|
Вы не можете создавать темы
Вы не можете отвечать на сообщения
Вы не можете прикреплять файлы
Вы не можете редактировать сообщения
HTML код Выкл.
|
|
|
Часовой пояс GMT +4, время: 05:56.
|