1 module yu.eventloop.selector.iocp;
2 
3 import yu.eventloop.common;
4 import yu.memory.allocator;
5 
6 version (Windows)  : package(yu) : pragma(lib, "Ws2_32");
7 
8 import core.time;
9 import core.memory;
10 
11 public import core.sys.windows.windows;
12 public import core.sys.windows.winsock2;
13 public import core.sys.windows.mswsock;
14 
15 import std.conv;
16 import std.exception;
17 import std.experimental.logger;
18 
19 import yu.exception : showException,yuCathException;
20 
21 enum IOCP_OP_TYPE {
22     accept,
23     connect,
24     read,
25     write,
26     event
27 }
28 
29 struct IOCPLoop {
30     void initer() {
31         if (_iocp)
32             return;
33         _iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 1);
34         errnoEnforce(_iocp, "CreateIoCompletionPort failed");
35         _event.operationType = IOCP_OP_TYPE.event;
36         _event.event = null;
37     }
38 
39     ~this() {
40 
41     }
42 
43     /** 添加一个Channel对象到事件队列中。
44 	 @param   socket = 添加到时间队列中的Channel对象,根据其type自动选择需要注册的事件。
45 	 @return true 添加成功, false 添加失败,并把错误记录到日志中.
46 	 */
47     bool addEvent(AsyncEvent * event) nothrow {
48         if (event.type == AsynType.ACCEPT || event.type == AsynType.TCP || event.type
49                 == AsynType.UDP) {
50             try {
51                 auto v = CreateIoCompletionPort(cast(HANDLE) event.fd, _iocp,
52                     cast(ULONG_PTR) event, 1);
53                 event.isActive(true);
54                 if (!v)
55                     return false;
56             }
57             catch (Exception e) {
58                 showException(e);
59             }
60         }
61         return true;
62     }
63 
64     bool modEvent(AsyncEvent * event) nothrow {
65         return true;
66     }
67 
68     bool delEvent(AsyncEvent * event) nothrow {
69         event.isActive(false);
70         return true;
71     }
72 
73     void wait(int timeout) {
74         OVERLAPPED * overlapped;
75         ULONG_PTR key = 0;
76         DWORD bytes = 0;
77         int va = GetQueuedCompletionStatus(_iocp,  & bytes,  & key,  & overlapped,
78             timeout);
79         if (overlapped is null) // timeout
80             return;
81         if (va == 0) {
82             auto erro = GetLastError();
83             if (erro == WAIT_TIMEOUT)
84                 return;
85             //error("GetQueuedCompletionStatus erro! : ", erro);
86             auto ev = cast(IOCP_DATA * ) overlapped;
87             if (ev && ev.event) {
88                 if (ev.event.obj)
89                     ev.event.obj.onClose();
90             }
91             return;
92 
93         }
94         auto ev = cast(IOCP_DATA * ) overlapped;
95         final switch (ev.operationType) {
96         case IOCP_OP_TYPE.accept : ev.event.obj.onRead();
97             break;
98         case IOCP_OP_TYPE.connect : ev.event.writeLen = 0;
99             ev.event.obj.onWrite();
100             break;
101         case IOCP_OP_TYPE.read : if (bytes > 0) {
102                 ev.event.readLen = bytes;
103                 ev.event.obj.onRead();
104             } else {
105                 ev.event.obj.onClose();
106             }
107             break;
108         case IOCP_OP_TYPE.write : if (bytes > 0) {
109                 ev.event.writeLen = bytes;
110                 ev.event.obj.onWrite();
111             } else {
112                 ev.event.obj.onClose();
113             }
114             break;
115         case IOCP_OP_TYPE.event : break;
116         }
117 
118         return;
119     }
120 
121     void weakUp() nothrow {
122         try {
123             PostQueuedCompletionStatus(_iocp, 0, 0, cast(LPOVERLAPPED)( & _event));
124         }
125         catch (Exception e) {
126             showException(yuCathException(error(e.toString)));
127         }
128     }
129     private : HANDLE _iocp;
130     IOCP_DATA _event;
131 }
132 
133 struct IOCP_DATA {
134     OVERLAPPED ol;
135     IOCP_OP_TYPE operationType;
136     AsyncEvent * event = null;
137 }
138 
139 __gshared static LPFN_ACCEPTEX AcceptEx;
140 __gshared static LPFN_CONNECTEX ConnectEx;
141 /*__gshared LPFN_DISCONNECTEX DisconnectEx;
142 __gshared LPFN_GETACCEPTEXSOCKADDRS GetAcceptexSockAddrs;
143 __gshared LPFN_TRANSMITFILE TransmitFile;
144 __gshared LPFN_TRANSMITPACKETS TransmitPackets;
145 __gshared LPFN_WSARECVMSG WSARecvMsg;
146 __gshared LPFN_WSASENDMSG WSASendMsg;*/
147 
148 shared static this() {
149     WSADATA wsaData;
150     int iResult = WSAStartup(MAKEWORD(2, 2),  & wsaData);
151 
152     SOCKET ListenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
153     scope (exit)
154         closesocket(ListenSocket);
155     GUID guid;
156     mixin(GET_FUNC_POINTER("WSAID_ACCEPTEX", "AcceptEx"));
157     mixin(GET_FUNC_POINTER("WSAID_CONNECTEX", "ConnectEx"));
158     /* mixin(GET_FUNC_POINTER("WSAID_DISCONNECTEX", "DisconnectEx"));
159      mixin(GET_FUNC_POINTER("WSAID_GETACCEPTEXSOCKADDRS", "GetAcceptexSockAddrs"));
160      mixin(GET_FUNC_POINTER("WSAID_TRANSMITFILE", "TransmitFile"));
161      mixin(GET_FUNC_POINTER("WSAID_TRANSMITPACKETS", "TransmitPackets"));
162      mixin(GET_FUNC_POINTER("WSAID_WSARECVMSG", "WSARecvMsg"));*/
163 }
164 
165 shared static ~this() {
166     WSACleanup();
167 }
168 
169 private {
170     bool GetFunctionPointer(FuncPointer)(SOCKET sock, ref FuncPointer pfn, ref GUID guid) {
171         DWORD dwBytesReturned = 0;
172         if (WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER,  & guid,
173                 guid.sizeof,  & pfn, pfn.sizeof,  & dwBytesReturned, null, null) == SOCKET_ERROR) {
174             error("Get function failed with error:", GetLastError());
175             return false;
176         }
177 
178         return true;
179     }
180 
181     string GET_FUNC_POINTER(string GuidValue, string pft) {
182         string str = " guid = " ~ GuidValue ~ ";";
183         str ~= "if( !GetFunctionPointer( ListenSocket, " ~ pft ~ ", guid ) ) { errnoEnforce(false,\"iocp get function error!\"); } ";
184         return str;
185     }
186 }
187 
188 alias OVERLAPPED WSAOVERLAPPED;
189 alias OVERLAPPED * LPWSAOVERLAPPED;
190 
191 struct WSABUF {
192     uint len;
193     char * buf;
194 }
195 
196 alias WSABUF * LPWSABUF;
197 
198 enum : DWORD {
199     IOCPARAM_MASK = 0x7f,
200     IOC_VOID = 0x20000000,
201     IOC_OUT = 0x40000000,
202     IOC_IN = 0x80000000,
203     IOC_INOUT = IOC_IN | IOC_OUT
204 }
205 
206 enum IOC_UNIX = 0x00000000;
207 enum IOC_WS2 = 0x08000000;
208 enum IOC_PROTOCOL = 0x10000000;
209 enum IOC_VENDOR = 0x18000000;
210 
211 template _WSAIO(int x, int y) {
212     enum _WSAIO = IOC_VOID | x | y;
213 }
214 template _WSAIOR(int x, int y) {
215     enum _WSAIOR = IOC_OUT | x | y;
216 }
217 template _WSAIOW(int x, int y) {
218     enum _WSAIOW = IOC_IN | x | y;
219 }
220 template _WSAIORW(int x, int y) {
221     enum _WSAIORW = IOC_INOUT | x | y;
222 }
223 
224 enum SIO_ASSOCIATE_HANDLE = _WSAIOW!(IOC_WS2, 1);
225 enum SIO_ENABLE_CIRCULAR_QUEUEING = _WSAIO!(IOC_WS2, 2);
226 enum SIO_FIND_ROUTE = _WSAIOR!(IOC_WS2, 3);
227 enum SIO_FLUSH = _WSAIO!(IOC_WS2, 4);
228 enum SIO_GET_BROADCAST_ADDRESS = _WSAIOR!(IOC_WS2, 5);
229 enum SIO_GET_EXTENSION_FUNCTION_POINTER = _WSAIORW!(IOC_WS2, 6);
230 enum SIO_GET_QOS = _WSAIORW!(IOC_WS2, 7);
231 enum SIO_GET_GROUP_QOS = _WSAIORW!(IOC_WS2, 8);
232 enum SIO_MULTIPOINT_LOOPBACK = _WSAIOW!(IOC_WS2, 9);
233 enum SIO_MULTICAST_SCOPE = _WSAIOW!(IOC_WS2, 10);
234 enum SIO_SET_QOS = _WSAIOW!(IOC_WS2, 11);
235 enum SIO_SET_GROUP_QOS = _WSAIOW!(IOC_WS2, 12);
236 enum SIO_TRANSLATE_HANDLE = _WSAIORW!(IOC_WS2, 13);
237 enum SIO_ROUTING_INTERFACE_QUERY = _WSAIORW!(IOC_WS2, 20);
238 enum SIO_ROUTING_INTERFACE_CHANGE = _WSAIOW!(IOC_WS2, 21);
239 enum SIO_ADDRESS_LIST_QUERY = _WSAIOR!(IOC_WS2, 22);
240 enum SIO_ADDRESS_LIST_CHANGE = _WSAIO!(IOC_WS2, 23);
241 enum SIO_QUERY_TARGET_PNP_HANDLE = _WSAIOR!(IOC_WS2, 24);
242 enum SIO_NSP_NOTIFY_CHANGE = _WSAIOW!(IOC_WS2, 25);
243 
244 extern (Windows) : nothrow : int WSARecv(SOCKET, LPWSABUF, DWORD, LPDWORD,
245     LPDWORD, LPWSAOVERLAPPED, LPWSAOVERLAPPED_COMPLETION_ROUTINE);
246 int WSARecvDisconnect(SOCKET, LPWSABUF);
247 int WSARecvFrom(SOCKET, LPWSABUF, DWORD, LPDWORD, LPDWORD, SOCKADDR * , LPINT,
248     LPWSAOVERLAPPED, LPWSAOVERLAPPED_COMPLETION_ROUTINE);
249 
250 int WSASend(SOCKET, LPWSABUF, DWORD, LPDWORD, DWORD, LPWSAOVERLAPPED,
251     LPWSAOVERLAPPED_COMPLETION_ROUTINE);
252 int WSASendDisconnect(SOCKET, LPWSABUF);
253 int WSASendTo(SOCKET, LPWSABUF, DWORD, LPDWORD, DWORD, const(SOCKADDR) * , int,
254     LPWSAOVERLAPPED, LPWSAOVERLAPPED_COMPLETION_ROUTINE);