1 module yu.asyncsocket.acceptor;
2 
3 import core.memory;
4 import core.sys.posix.sys.socket;
5 
6 import std.socket;
7 import std.functional;
8 import std.exception;
9 
10 import yu.eventloop;
11 import yu.asyncsocket.transport;
12 import yu.asyncsocket.tcpsocket;
13 import yu.asyncsocket.exception;
14 import yu.exception;
15 
16 alias AcceptCallBack = void delegate(Socket sock) nothrow;
17 
18 @trusted final class Acceptor : AsyncTransport, EventCallInterface {
19     this(EventLoop loop, bool isIpV6 = false) {
20         auto family = isIpV6 ? AddressFamily.INET6 : AddressFamily.INET;
21         this(loop, family);
22     }
23 
24     this(EventLoop loop, AddressFamily family)
25     in {
26         assert(family == AddressFamily.INET6 || family == AddressFamily.INET,
27             "the AddressFamily must be AddressFamily.INET or AddressFamily.INET6");
28     }
29     body {
30         _socket = yNew!Socket(family, SocketType.STREAM, ProtocolType.TCP);
31         _socket.blocking = false;
32         _event = AsyncEvent(AsynType.ACCEPT, this, _socket.handle, true, false, false,
33             false);
34         super(loop, TransportType.ACCEPT);
35         static if (IOMode == IO_MODE.iocp)
36             _buffer = makeArray!ubyte(yuAlloctor, 2048);
37     }
38 
39     ~this() {
40         onClose();
41         yDel(_socket);
42         static if (IOMode == IO_MODE.iocp)
43             yDel(_buffer);
44     }
45 
46     @property reusePort(bool use) {
47         _socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, use);
48         version (Posix)
49             _socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_REUSEPORT,
50                 use);
51         version (windows) {
52             if (!use) {
53                 import core.sys.windows.winsock2;
54 
55                 accpet.setOption(SocketOptionLevel.SOCKET,
56                     cast(SocketOption) SO_EXCLUSIVEADDRUSE, true);
57             }
58         }
59     }
60 
61     void bind(Address addr) @trusted {
62         static if (IO_MODE.iocp == IOMode) {
63             _addreslen = addr.nameLen();
64         }
65         _socket.bind(forward!addr);
66     }
67 
68     void listen(int backlog) @trusted {
69         _socket.listen(forward!backlog);
70     }
71 
72     override @property int fd() {
73         return cast(int) _socket.handle();
74     }
75 
76     pragma(inline, true) @property localAddress() {
77         return _socket.localAddress();
78     }
79 
80     override bool start() {
81         if (_event.isActive || !_socket.isAlive() || !_callBack) {
82             warning("accept start erro!");
83             return false;
84         }
85         _event = AsyncEvent(AsynType.ACCEPT, this, _socket.handle, true, false, false,
86             false);
87         static if (IOMode == IO_MODE.iocp) {
88             trace("start accept : , the fd is ", _socket.handle());
89             _loop.addEvent(&_event);
90             return doAccept();
91         } else {
92             return _loop.addEvent(&_event);
93         }
94     }
95 
96     override void close() {
97         if (isAlive) {
98             onClose();
99         } else if (_socket.isAlive()) {
100             _socket.close();
101         }
102     }
103 
104     override @property bool isAlive() @trusted nothrow {
105         bool alive = false;
106         yuCathException(_socket.isAlive(), alive);
107         return alive;
108     }
109 
110     mixin TransportSocketOption;
111 
112     void setCallBack(AcceptCallBack cback) {
113         _callBack = cback;
114     }
115 
116 protected:
117     override void onRead() nothrow {
118         static if (IO_MODE.iocp == IOMode) {
119             yuCathException({
120                 trace("new connect ,the fd is : ", _inSocket.handle());
121                 SOCKET slisten = cast(SOCKET) _socket.handle;
122                 SOCKET slink = cast(SOCKET) _inSocket.handle;
123                 setsockopt(slink, SOL_SOCKET, 0x700B, cast(const char*)&slisten, slisten.sizeof);
124                 _callBack(_inSocket);
125             }());
126             _inSocket = null;
127             doAccept();
128         } else {
129             while (true) {
130                 socket_t fd = cast(socket_t)(.accept(_socket.handle, null, null));
131                 if (fd == socket_t.init)
132                     return;
133                 yuCathException({
134                     Socket sock = yNew!Socket(fd, _socket.addressFamily);
135                     _callBack(sock);
136                 }());
137             }
138         }
139     }
140 
141     override void onWrite() nothrow {
142     }
143 
144     override void onClose() nothrow {
145         if (!isAlive)
146             return;
147         eventLoop.delEvent(&_event);
148         _socket.close();
149     }
150 
151     static if (IOMode == IO_MODE.iocp) {
152         bool doAccept() nothrow {
153             try {
154                 _iocp.event = &_event;
155                 _iocp.operationType = IOCP_OP_TYPE.accept;
156                 if (_inSocket is null) {
157                     _inSocket = yNew!Socket(_socket.addressFamily,
158                         SocketType.STREAM, ProtocolType.TCP);
159                 }
160 
161                 DWORD dwBytesReceived = 0;
162                 trace("AcceptEx is :  ", AcceptEx);
163                 int nRet = AcceptEx(cast(SOCKET) _socket.handle,
164                     cast(SOCKET) _inSocket.handle, _buffer.ptr, 0,
165                     sockaddr_in.sizeof + 16, sockaddr_in.sizeof + 16,
166                     &dwBytesReceived, &_iocp.ol);
167                 trace("do AcceptEx : the return is : ", nRet);
168                 if (nRet == 0) {
169                     DWORD dwLastError = GetLastError();
170                     if (ERROR_IO_PENDING != dwLastError) {
171                         yuCathException(error("AcceptEx failed with error: ", dwLastError));
172                         onClose();
173                         return false;
174                     }
175                 }
176             }
177             catch (Exception e) {
178                 import yu.exception;
179 
180                 showException(e);
181             }
182             return true;
183         }
184     }
185 
186 private:
187     Socket _socket;
188     AsyncEvent _event;
189 
190     AcceptCallBack _callBack;
191 
192     static if (IO_MODE.iocp == IOMode) {
193         IOCP_DATA _iocp;
194         WSABUF _iocpWBuf;
195 
196         Socket _inSocket;
197 
198         ubyte[] _buffer;
199 
200         uint _addreslen;
201     }
202 }
203 
204 unittest {
205     /*
206     import std.datetime;
207     import std.stdio;
208     import std.functional;
209 
210     import yu.asyncsocket;
211 
212     EventLoop loop = new EventLoop();
213 
214     
215 
216     class TCP
217     {
218         static int[TCP] tcpList;
219         this(EventLoop loop, Socket soc)
220         {
221             _socket = new TCPSocket(loop, soc);
222             _socket.setReadCallBack(&readed);
223             _socket.setCloseCallBack(&closed);
224             _socket.start();
225         }
226 
227         alias socket this;
228         @property socket()
229         {
230             return _socket;
231         }
232 
233     protected:
234         void readed(ubyte[] buf)
235         {
236             writeln("read data :  ", cast(string)(buf));
237             socket.write(buf.dup, &writed);
238         }
239 
240         void writed(ubyte[] data, uint size)
241         {
242             writeln("write data Size :  ", size, "\t data size : ", data.length);
243             ++_size;
244             if (_size == 5)
245                 socket.write(data, &writeClose);
246             else
247             {
248                 socket.write(data, &writed);
249             }
250 
251         }
252 
253         void writeClose(ubyte[] data, uint size)
254         {
255             writeln("write data Size :  ", size, "\t data size : ", data.length);
256             socket.close();
257             loop.stop();
258             //	throw new Exception("hahahahhaah ");
259         }
260 
261         void closed()
262         {
263             tcpList.remove(this);
264             writeln("Socket Closed .");
265         }
266 
267     private:
268         TCPSocket _socket;
269         int _size = 0;
270     }
271       
272     void newConnect(Socket soc)
273     {
274         auto tcp = new TCP(loop, soc);
275         TCP.tcpList[tcp] = 0;
276     }
277     
278     
279 
280     Acceptor accept = new Acceptor(loop);
281 
282     accept.setCallBack(toDelegate(&newConnect));
283 
284     accept.reusePort(true);
285     accept.bind(new InternetAddress("0.0.0.0", 6553));
286 
287     accept.listen(64);
288 
289     accept.start();
290 
291     loop.run(5000);
292 */
293 }