博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
C#高性能大容量SOCKET并发(十):SocketAsyncEventArgs线程模型
阅读量:6840 次
发布时间:2019-06-26

本文共 4553 字,大约阅读时间需要 15 分钟。

原文:

线程模型

SocketAsyncEventArgs编程模式不支持设置同时工作线程个数,使用的NET的IO线程,由NET底层提供,这点和直接使用完成端口API编程不同。NET底层IO线程也是每个异步事件都是由不同的线程返回到Completed事件,因此在Completed事件需要对用户对象进行加锁,避免同一个用户对象同时触发两个Completed事件。

void IO_Completed(object sender, SocketAsyncEventArgs asyncEventArgs)        {            AsyncSocketUserToken userToken = asyncEventArgs.UserToken as AsyncSocketUserToken;            userToken.ActiveDateTime = DateTime.Now;            try            {                                lock (userToken) //避免同一个userToken同时有多个线程操作                {                    if (asyncEventArgs.LastOperation == SocketAsyncOperation.Receive)                        ProcessReceive(asyncEventArgs);                    else if (asyncEventArgs.LastOperation == SocketAsyncOperation.Send)                        ProcessSend(asyncEventArgs);                    else                        throw new ArgumentException("The last operation completed on the socket was not a receive or send");                }               }            catch (Exception E)            {                Program.Logger.ErrorFormat("IO_Completed {0} error, message: {1}", userToken.ConnectSocket, E.Message);                Program.Logger.Error(E.StackTrace);            }                             }
使用ProceXP可以看到服务端在比较忙的时候,服务的线程会越多。在Completed事件加锁有好处是后续逻辑处理都是串行的,可以不用考虑线程同步。还有一个地方需要注意的是断开超时连接,由于超时连接会调用Shutdown函数来强行中断SOCKET,因此在守护线程调用时,也需要锁住userToken对象。

public void DaemonThreadStart()        {            while (m_thread.IsAlive)            {                AsyncSocketUserToken[] userTokenArray = null;                m_asyncSocketServer.AsyncSocketUserTokenList.CopyList(ref userTokenArray);                for (int i = 0; i < userTokenArray.Length; i++)                {                    if (!m_thread.IsAlive)                        break;                    try                    {                        if ((DateTime.Now - userTokenArray[i].ActiveDateTime).Milliseconds > m_asyncSocketServer.SocketTimeOutMS) //超时Socket断开                        {                            lock (userTokenArray[i])                            {                                m_asyncSocketServer.CloseClientSocket(userTokenArray[i]);                            }                        }                    }                                        catch (Exception E)                    {                        Program.Logger.ErrorFormat("Daemon thread check timeout socket error, message: {0}", E.Message);                        Program.Logger.Error(E.StackTrace);                    }                }                for (int i = 0; i < 60 * 1000 / 10; i++) //每分钟检测一次                {                    if (!m_thread.IsAlive)                        break;                    Thread.Sleep(10);                }            }        }
在CloseClientSocket方法中,对m_asyncSocketUserTokenPool和m_asyncSocketUserTokenList进行处理的时候都有加锁,代码如下:

public void CloseClientSocket(AsyncSocketUserToken userToken)        {            if (userToken.ConnectSocket == null)                return;            string socketInfo = string.Format("Local Address: {0} Remote Address: {1}", userToken.ConnectSocket.LocalEndPoint,                userToken.ConnectSocket.RemoteEndPoint);            Program.Logger.InfoFormat("Client connection disconnected. {0}", socketInfo);            try            {                userToken.ConnectSocket.Shutdown(SocketShutdown.Both);            }            catch (Exception E)             {                Program.Logger.ErrorFormat("CloseClientSocket Disconnect client {0} error, message: {1}", socketInfo, E.Message);            }            userToken.ConnectSocket.Close();            userToken.ConnectSocket = null; //释放引用,并清理缓存,包括释放协议对象等资源            m_maxNumberAcceptedClients.Release();            m_asyncSocketUserTokenPool.Push(userToken);            m_asyncSocketUserTokenList.Remove(userToken);        }

public void Push(AsyncSocketUserToken item)        {            if (item == null)            {                throw new ArgumentException("Items added to a AsyncSocketUserToken cannot be null");            }            lock (m_pool)            {                m_pool.Push(item);            }        }
public void Remove(AsyncSocketUserToken userToken)        {            lock (m_list)            {                m_list.Remove(userToken);            }        }
在有些性能要求更高的系统,特别是在一些C++写的完成端口中,会使用原子操作来代替锁,这样做的好处是不用进行系统内核和用户态切换,性能会高。不过技术比较偏门,不易维护,而且实际表现需要进行多方面测试,这类优化更建议优化业务逻辑,并尽量减少内存分配和释放。

DEMO下载地址:
免责声明:此代码只是为了演示C#完成端口编程,仅用于学习和研究,切勿用于商业用途。水平有限,C#也属于初学,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。

转载地址:http://ntkul.baihongyu.com/

你可能感兴趣的文章