1 module yu.eventloop.selector.iocp; 2 3 import yu.eventloop.common; 4 import yu.memory.allocator; 5 6 version (Windows) : package(yu) : pragma(lib, "Ws2_32"); 7 8 import core.time; 9 import core.memory; 10 11 public import core.sys.windows.windows; 12 public import core.sys.windows.winsock2; 13 public import core.sys.windows.mswsock; 14 15 import std.conv; 16 import std.exception; 17 import std.experimental.logger; 18 19 import yu.exception : showException,yuCathException; 20 21 enum IOCP_OP_TYPE { 22 accept, 23 connect, 24 read, 25 write, 26 event 27 } 28 29 struct IOCPLoop { 30 void initer() { 31 if (_iocp) 32 return; 33 _iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 1); 34 errnoEnforce(_iocp, "CreateIoCompletionPort failed"); 35 _event.operationType = IOCP_OP_TYPE.event; 36 _event.event = null; 37 } 38 39 ~this() { 40 41 } 42 43 /** 添加一个Channel对象到事件队列中。 44 @param socket = 添加到时间队列中的Channel对象,根据其type自动选择需要注册的事件。 45 @return true 添加成功, false 添加失败,并把错误记录到日志中. 46 */ 47 bool addEvent(AsyncEvent * event) nothrow { 48 if (event.type == AsynType.ACCEPT || event.type == AsynType.TCP || event.type 49 == AsynType.UDP) { 50 try { 51 auto v = CreateIoCompletionPort(cast(HANDLE) event.fd, _iocp, 52 cast(ULONG_PTR) event, 1); 53 event.isActive(true); 54 if (!v) 55 return false; 56 } 57 catch (Exception e) { 58 showException(e); 59 } 60 } 61 return true; 62 } 63 64 bool modEvent(AsyncEvent * event) nothrow { 65 return true; 66 } 67 68 bool delEvent(AsyncEvent * event) nothrow { 69 event.isActive(false); 70 return true; 71 } 72 73 void wait(int timeout) { 74 OVERLAPPED * overlapped; 75 ULONG_PTR key = 0; 76 DWORD bytes = 0; 77 int va = GetQueuedCompletionStatus(_iocp, & bytes, & key, & overlapped, 78 timeout); 79 if (overlapped is null) // timeout 80 return; 81 if (va == 0) { 82 auto erro = GetLastError(); 83 if (erro == WAIT_TIMEOUT) 84 return; 85 //error("GetQueuedCompletionStatus erro! : ", erro); 86 auto ev = cast(IOCP_DATA * ) overlapped; 87 if (ev && ev.event) { 88 if (ev.event.obj) 89 ev.event.obj.onClose(); 90 } 91 return; 92 93 } 94 auto ev = cast(IOCP_DATA * ) overlapped; 95 final switch (ev.operationType) { 96 case IOCP_OP_TYPE.accept : ev.event.obj.onRead(); 97 break; 98 case IOCP_OP_TYPE.connect : ev.event.writeLen = 0; 99 ev.event.obj.onWrite(); 100 break; 101 case IOCP_OP_TYPE.read : if (bytes > 0) { 102 ev.event.readLen = bytes; 103 ev.event.obj.onRead(); 104 } else { 105 ev.event.obj.onClose(); 106 } 107 break; 108 case IOCP_OP_TYPE.write : if (bytes > 0) { 109 ev.event.writeLen = bytes; 110 ev.event.obj.onWrite(); 111 } else { 112 ev.event.obj.onClose(); 113 } 114 break; 115 case IOCP_OP_TYPE.event : break; 116 } 117 118 return; 119 } 120 121 void weakUp() nothrow { 122 try { 123 PostQueuedCompletionStatus(_iocp, 0, 0, cast(LPOVERLAPPED)( & _event)); 124 } 125 catch (Exception e) { 126 showException(yuCathException(error(e.toString))); 127 } 128 } 129 private : HANDLE _iocp; 130 IOCP_DATA _event; 131 } 132 133 struct IOCP_DATA { 134 OVERLAPPED ol; 135 IOCP_OP_TYPE operationType; 136 AsyncEvent * event = null; 137 } 138 139 __gshared static LPFN_ACCEPTEX AcceptEx; 140 __gshared static LPFN_CONNECTEX ConnectEx; 141 /*__gshared LPFN_DISCONNECTEX DisconnectEx; 142 __gshared LPFN_GETACCEPTEXSOCKADDRS GetAcceptexSockAddrs; 143 __gshared LPFN_TRANSMITFILE TransmitFile; 144 __gshared LPFN_TRANSMITPACKETS TransmitPackets; 145 __gshared LPFN_WSARECVMSG WSARecvMsg; 146 __gshared LPFN_WSASENDMSG WSASendMsg;*/ 147 148 shared static this() { 149 WSADATA wsaData; 150 int iResult = WSAStartup(MAKEWORD(2, 2), & wsaData); 151 152 SOCKET ListenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 153 scope (exit) 154 closesocket(ListenSocket); 155 GUID guid; 156 mixin(GET_FUNC_POINTER("WSAID_ACCEPTEX", "AcceptEx")); 157 mixin(GET_FUNC_POINTER("WSAID_CONNECTEX", "ConnectEx")); 158 /* mixin(GET_FUNC_POINTER("WSAID_DISCONNECTEX", "DisconnectEx")); 159 mixin(GET_FUNC_POINTER("WSAID_GETACCEPTEXSOCKADDRS", "GetAcceptexSockAddrs")); 160 mixin(GET_FUNC_POINTER("WSAID_TRANSMITFILE", "TransmitFile")); 161 mixin(GET_FUNC_POINTER("WSAID_TRANSMITPACKETS", "TransmitPackets")); 162 mixin(GET_FUNC_POINTER("WSAID_WSARECVMSG", "WSARecvMsg"));*/ 163 } 164 165 shared static ~this() { 166 WSACleanup(); 167 } 168 169 private { 170 bool GetFunctionPointer(FuncPointer)(SOCKET sock, ref FuncPointer pfn, ref GUID guid) { 171 DWORD dwBytesReturned = 0; 172 if (WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, & guid, 173 guid.sizeof, & pfn, pfn.sizeof, & dwBytesReturned, null, null) == SOCKET_ERROR) { 174 error("Get function failed with error:", GetLastError()); 175 return false; 176 } 177 178 return true; 179 } 180 181 string GET_FUNC_POINTER(string GuidValue, string pft) { 182 string str = " guid = " ~ GuidValue ~ ";"; 183 str ~= "if( !GetFunctionPointer( ListenSocket, " ~ pft ~ ", guid ) ) { errnoEnforce(false,\"iocp get function error!\"); } "; 184 return str; 185 } 186 } 187 188 alias OVERLAPPED WSAOVERLAPPED; 189 alias OVERLAPPED * LPWSAOVERLAPPED; 190 191 struct WSABUF { 192 uint len; 193 char * buf; 194 } 195 196 alias WSABUF * LPWSABUF; 197 198 enum : DWORD { 199 IOCPARAM_MASK = 0x7f, 200 IOC_VOID = 0x20000000, 201 IOC_OUT = 0x40000000, 202 IOC_IN = 0x80000000, 203 IOC_INOUT = IOC_IN | IOC_OUT 204 } 205 206 enum IOC_UNIX = 0x00000000; 207 enum IOC_WS2 = 0x08000000; 208 enum IOC_PROTOCOL = 0x10000000; 209 enum IOC_VENDOR = 0x18000000; 210 211 template _WSAIO(int x, int y) { 212 enum _WSAIO = IOC_VOID | x | y; 213 } 214 template _WSAIOR(int x, int y) { 215 enum _WSAIOR = IOC_OUT | x | y; 216 } 217 template _WSAIOW(int x, int y) { 218 enum _WSAIOW = IOC_IN | x | y; 219 } 220 template _WSAIORW(int x, int y) { 221 enum _WSAIORW = IOC_INOUT | x | y; 222 } 223 224 enum SIO_ASSOCIATE_HANDLE = _WSAIOW!(IOC_WS2, 1); 225 enum SIO_ENABLE_CIRCULAR_QUEUEING = _WSAIO!(IOC_WS2, 2); 226 enum SIO_FIND_ROUTE = _WSAIOR!(IOC_WS2, 3); 227 enum SIO_FLUSH = _WSAIO!(IOC_WS2, 4); 228 enum SIO_GET_BROADCAST_ADDRESS = _WSAIOR!(IOC_WS2, 5); 229 enum SIO_GET_EXTENSION_FUNCTION_POINTER = _WSAIORW!(IOC_WS2, 6); 230 enum SIO_GET_QOS = _WSAIORW!(IOC_WS2, 7); 231 enum SIO_GET_GROUP_QOS = _WSAIORW!(IOC_WS2, 8); 232 enum SIO_MULTIPOINT_LOOPBACK = _WSAIOW!(IOC_WS2, 9); 233 enum SIO_MULTICAST_SCOPE = _WSAIOW!(IOC_WS2, 10); 234 enum SIO_SET_QOS = _WSAIOW!(IOC_WS2, 11); 235 enum SIO_SET_GROUP_QOS = _WSAIOW!(IOC_WS2, 12); 236 enum SIO_TRANSLATE_HANDLE = _WSAIORW!(IOC_WS2, 13); 237 enum SIO_ROUTING_INTERFACE_QUERY = _WSAIORW!(IOC_WS2, 20); 238 enum SIO_ROUTING_INTERFACE_CHANGE = _WSAIOW!(IOC_WS2, 21); 239 enum SIO_ADDRESS_LIST_QUERY = _WSAIOR!(IOC_WS2, 22); 240 enum SIO_ADDRESS_LIST_CHANGE = _WSAIO!(IOC_WS2, 23); 241 enum SIO_QUERY_TARGET_PNP_HANDLE = _WSAIOR!(IOC_WS2, 24); 242 enum SIO_NSP_NOTIFY_CHANGE = _WSAIOW!(IOC_WS2, 25); 243 244 extern (Windows) : nothrow : int WSARecv(SOCKET, LPWSABUF, DWORD, LPDWORD, 245 LPDWORD, LPWSAOVERLAPPED, LPWSAOVERLAPPED_COMPLETION_ROUTINE); 246 int WSARecvDisconnect(SOCKET, LPWSABUF); 247 int WSARecvFrom(SOCKET, LPWSABUF, DWORD, LPDWORD, LPDWORD, SOCKADDR * , LPINT, 248 LPWSAOVERLAPPED, LPWSAOVERLAPPED_COMPLETION_ROUTINE); 249 250 int WSASend(SOCKET, LPWSABUF, DWORD, LPDWORD, DWORD, LPWSAOVERLAPPED, 251 LPWSAOVERLAPPED_COMPLETION_ROUTINE); 252 int WSASendDisconnect(SOCKET, LPWSABUF); 253 int WSASendTo(SOCKET, LPWSABUF, DWORD, LPDWORD, DWORD, const(SOCKADDR) * , int, 254 LPWSAOVERLAPPED, LPWSAOVERLAPPED_COMPLETION_ROUTINE);