forum.boolean.name

forum.boolean.name (http://forum.boolean.name/index.php)
-   С# (http://forum.boolean.name/forumdisplay.php?f=128)
-   -   Асинхронная очередь (http://forum.boolean.name/showthread.php?t=19717)

pax 28.02.2015 20:21

Асинхронная очередь
 
Помогите сделать асинхронную очередь. У меня что-то не особо получается.

PHP код:

private objject _thisLocker = new object();
        private 
IAsyncResult _writingAR;
        
readonly Queue<NetMessage_sendQueue = new Queue<NetMessage>();

        
// посылаю сообщение
        
public void Send(NetMessage msg)
        {
            if (
msg.sended) throw new InvalidOperationException("Message already sended!");

            
msg.sended true;

            
// добавляю сообщение в очередь
            
Enqueue(msg);
        }

        
// добавление в очередь
        
internal void Enqueue(NetMessage msg)
        {
            
lock (_thisLocker// лок, чтобы никто сюда не вошел
            
{
                
_sendQueue.Enqueue(msg);

                
// если не равно null, то идет отправка
                
if (_writingAR != null)
                {
                    return;
                }
                
                
// начинаю отправку, если в очереди есть сообщения
                
if (_sendQueue.Count 0)
                {
                    var 
nextMessage _sendQueue.Dequeue();
                    
// ?????????????????? почему в данный момент _writingAR != null ???????????????????
                    
_writingAR NetworkStream.BeginWrite(nextMessage.data0nextMessage.Size NetMessage.HeaderSizeOnEndWritenextMessage);
                }
            }
        }

        
// завершение отправки
        
private void OnEndWrite(IAsyncResult ar)
        {
            
NetworkStream.EndWrite(ar);

            
lock (_thisLocker// лок, чтобы никто сюда не вошел
            
{
                
// если в очереди есть еще сообщения, начинаю отправку следующих сообщений, если они есть
                
if (_sendQueue.Count 0)
                {
                    var 
nextMessage _sendQueue.Dequeue();
                    
_writingAR NetworkStream.BeginWrite(nextMessage.data0nextMessage.Size NetMessage.HeaderSizeOnEndWritenextMessage);
                }
                else
                {
                    
_writingAR null;
                }
            }
        } 

Пробовал лочить, но не получается. Отправка сообщений происходит не по порядку, т.е. несколько отправок одновременно.

h1dd3n 01.03.2015 11:54

Ответ: Асинхронная очередь
 
Неправильный подход у тебя.

Тебе нужна очередь в которую сообщение будет при отправке добавляться, и отдельный асинхронный воркер который будет забирать из очереди и отправлять.

И отдельный асинхронный воркер, проще всего в отдельном потоке запилить.



ConcurrentQueue


Где-то в коде:
Код:

private void Initialize() {
        this._queue = new ConcurrentQueue<NetMessage>();

        var asyncWorker = new Thread(SomeWorker);
        asyncWorker.Start();
}

Код:

public void Send(NetMessage msg) {
        if (msg.sended) throw new InvalidOperationException("Message already sended!");

        msg.sended = true;

        this._queue.Enqueue(msg);
}

Код:

private void SomeWorker() {
        while (true) {
                NetMessage msg;
                while (!this._queue.TryDequeue(out msg)) {
                       
                }

                NetworkStream.Write(nextMessage.data, 0, nextMessage.Size + NetMessage.HeaderSize);
        }
}

Кстати, какая у тебя целевая платформа ? Если .NET 4.5, то советую вместо BeginWrite/EndWrite использовать await WriteAsync.

pax 01.03.2015 13:13

Ответ: Асинхронная очередь
 
Мне вроде удалось решить мою задачу с помощью Interlocked.Exchange
PHP код:

private int _writing;
        
readonly Queue<NetMessage_sendQueue = new Queue<NetMessage>();

        
// посылаю сообщение
        
public void Send(NetMessage msg)
        {
            if (
msg.sended) throw new InvalidOperationException("Message already sended!");

            
msg.sended true;

            
// добавляю сообщение в очередь
            
Enqueue(msg);

        }

        
// добавление в очередь
        
internal void Enqueue(NetMessage msg)
        {
            
lock (_sendQueue)
            {
                
_sendQueue.Enqueue(msg);
            }

            
SendNext();
        }

        private 
void OnEndWrite(IAsyncResult ar)
        {
            
NetworkStream.EndWrite(ar);
            
Interlocked.Exchange(ref _writing0);
            
SendNext();
        }

        
byte[] _sendBuffer = new byte[1024];
        
readonly List<NetMessage_messagesToSend = new List<NetMessage>();

        
internal bool SendNext()
        {
            var 
current Interlocked.Exchange(ref _writing1);

            if (
current == 0// если нет отправки, отправляю
            
{
                
int countBytesToSend 0;
                
int countMesagesToSend 0;

                
lock (_sendQueue)
                {
                    
_messagesToSend.Clear();

                    while (
_sendQueue.Count 0)
                    {
                        var 
msg _sendQueue.Dequeue();
                        
_messagesToSend.Add(msg);
                        
countMesagesToSend++;
                        
countBytesToSend += msg.SizeToSend;
                        if (
countBytesToSend 1024 32)
                        {
                            break;
                        }
                    }
                }


                if (
countBytesToSend 0)
                {
                    if (
_sendBuffer.Length countBytesToSend)
                    {
                        Array.
Resize(ref _sendBuffercountBytesToSend);
                    }

                    var 
pos 0;

                    for (
int i 0_messagesToSend.Counti++)
                    {
                        var 
msg _messagesToSend[i];
                        Array.
Copy(msg.data0_sendBufferposmsg.SizeToSend);
                        
pos += msg.SizeToSend;
                    }
                    
NetworkStream.BeginWrite(_sendBuffer0posOnEndWritenull);
                }
                else
                {
                    
// нечего слать, значит ничего не отправляю и отправки нет
                    
Interlocked.Exchange(ref _writing0);
                }
            }

            return 
false;
        } 



Часовой пояс GMT +4, время: 14:24.

vBulletin® Version 3.6.5.
Copyright ©2000 - 2024, Jelsoft Enterprises Ltd.
Перевод: zCarot