1 module yu.asyncsocket.acceptor; 2 3 import core.memory; 4 import core.sys.posix.sys.socket; 5 6 import std.socket; 7 import std.functional; 8 import std.exception; 9 10 import yu.eventloop; 11 import yu.asyncsocket.transport; 12 import yu.asyncsocket.tcpsocket; 13 import yu.asyncsocket.exception; 14 import yu.exception; 15 16 alias AcceptCallBack = void delegate(Socket sock) nothrow; 17 18 @trusted final class Acceptor : AsyncTransport, EventCallInterface { 19 this(EventLoop loop, bool isIpV6 = false) { 20 auto family = isIpV6 ? AddressFamily.INET6 : AddressFamily.INET; 21 this(loop, family); 22 } 23 24 this(EventLoop loop, AddressFamily family) 25 in { 26 assert(family == AddressFamily.INET6 || family == AddressFamily.INET, 27 "the AddressFamily must be AddressFamily.INET or AddressFamily.INET6"); 28 } 29 body { 30 _socket = yNew!Socket(family, SocketType.STREAM, ProtocolType.TCP); 31 _socket.blocking = false; 32 _event = AsyncEvent(AsynType.ACCEPT, this, _socket.handle, true, false, false, 33 false); 34 super(loop, TransportType.ACCEPT); 35 static if (IOMode == IO_MODE.iocp) 36 _buffer = makeArray!ubyte(yuAlloctor, 2048); 37 } 38 39 ~this() { 40 onClose(); 41 yDel(_socket); 42 static if (IOMode == IO_MODE.iocp) 43 yDel(_buffer); 44 } 45 46 @property reusePort(bool use) { 47 _socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, use); 48 version (Posix) 49 _socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_REUSEPORT, 50 use); 51 version (windows) { 52 if (!use) { 53 import core.sys.windows.winsock2; 54 55 accpet.setOption(SocketOptionLevel.SOCKET, 56 cast(SocketOption) SO_EXCLUSIVEADDRUSE, true); 57 } 58 } 59 } 60 61 void bind(Address addr) @trusted { 62 static if (IO_MODE.iocp == IOMode) { 63 _addreslen = addr.nameLen(); 64 } 65 _socket.bind(forward!addr); 66 } 67 68 void listen(int backlog) @trusted { 69 _socket.listen(forward!backlog); 70 } 71 72 override @property int fd() { 73 return cast(int) _socket.handle(); 74 } 75 76 pragma(inline, true) @property localAddress() { 77 return _socket.localAddress(); 78 } 79 80 override bool start() { 81 if (_event.isActive || !_socket.isAlive() || !_callBack) { 82 warning("accept start erro!"); 83 return false; 84 } 85 _event = AsyncEvent(AsynType.ACCEPT, this, _socket.handle, true, false, false, 86 false); 87 static if (IOMode == IO_MODE.iocp) { 88 trace("start accept : , the fd is ", _socket.handle()); 89 _loop.addEvent(&_event); 90 return doAccept(); 91 } else { 92 return _loop.addEvent(&_event); 93 } 94 } 95 96 override void close() { 97 if (isAlive) { 98 onClose(); 99 } else if (_socket.isAlive()) { 100 _socket.close(); 101 } 102 } 103 104 override @property bool isAlive() @trusted nothrow { 105 bool alive = false; 106 yuCathException(_socket.isAlive(), alive); 107 return alive; 108 } 109 110 mixin TransportSocketOption; 111 112 void setCallBack(AcceptCallBack cback) { 113 _callBack = cback; 114 } 115 116 protected: 117 override void onRead() nothrow { 118 static if (IO_MODE.iocp == IOMode) { 119 yuCathException({ 120 trace("new connect ,the fd is : ", _inSocket.handle()); 121 SOCKET slisten = cast(SOCKET) _socket.handle; 122 SOCKET slink = cast(SOCKET) _inSocket.handle; 123 setsockopt(slink, SOL_SOCKET, 0x700B, cast(const char*)&slisten, slisten.sizeof); 124 _callBack(_inSocket); 125 }()); 126 _inSocket = null; 127 doAccept(); 128 } else { 129 while (true) { 130 socket_t fd = cast(socket_t)(.accept(_socket.handle, null, null)); 131 if (fd == socket_t.init) 132 return; 133 yuCathException({ 134 Socket sock = yNew!Socket(fd, _socket.addressFamily); 135 _callBack(sock); 136 }()); 137 } 138 } 139 } 140 141 override void onWrite() nothrow { 142 } 143 144 override void onClose() nothrow { 145 if (!isAlive) 146 return; 147 eventLoop.delEvent(&_event); 148 _socket.close(); 149 } 150 151 static if (IOMode == IO_MODE.iocp) { 152 bool doAccept() nothrow { 153 try { 154 _iocp.event = &_event; 155 _iocp.operationType = IOCP_OP_TYPE.accept; 156 if (_inSocket is null) { 157 _inSocket = yNew!Socket(_socket.addressFamily, 158 SocketType.STREAM, ProtocolType.TCP); 159 } 160 161 DWORD dwBytesReceived = 0; 162 trace("AcceptEx is : ", AcceptEx); 163 int nRet = AcceptEx(cast(SOCKET) _socket.handle, 164 cast(SOCKET) _inSocket.handle, _buffer.ptr, 0, 165 sockaddr_in.sizeof + 16, sockaddr_in.sizeof + 16, 166 &dwBytesReceived, &_iocp.ol); 167 trace("do AcceptEx : the return is : ", nRet); 168 if (nRet == 0) { 169 DWORD dwLastError = GetLastError(); 170 if (ERROR_IO_PENDING != dwLastError) { 171 yuCathException(error("AcceptEx failed with error: ", dwLastError)); 172 onClose(); 173 return false; 174 } 175 } 176 } 177 catch (Exception e) { 178 import yu.exception; 179 180 showException(e); 181 } 182 return true; 183 } 184 } 185 186 private: 187 Socket _socket; 188 AsyncEvent _event; 189 190 AcceptCallBack _callBack; 191 192 static if (IO_MODE.iocp == IOMode) { 193 IOCP_DATA _iocp; 194 WSABUF _iocpWBuf; 195 196 Socket _inSocket; 197 198 ubyte[] _buffer; 199 200 uint _addreslen; 201 } 202 } 203 204 unittest { 205 /* 206 import std.datetime; 207 import std.stdio; 208 import std.functional; 209 210 import yu.asyncsocket; 211 212 EventLoop loop = new EventLoop(); 213 214 215 216 class TCP 217 { 218 static int[TCP] tcpList; 219 this(EventLoop loop, Socket soc) 220 { 221 _socket = new TCPSocket(loop, soc); 222 _socket.setReadCallBack(&readed); 223 _socket.setCloseCallBack(&closed); 224 _socket.start(); 225 } 226 227 alias socket this; 228 @property socket() 229 { 230 return _socket; 231 } 232 233 protected: 234 void readed(ubyte[] buf) 235 { 236 writeln("read data : ", cast(string)(buf)); 237 socket.write(buf.dup, &writed); 238 } 239 240 void writed(ubyte[] data, uint size) 241 { 242 writeln("write data Size : ", size, "\t data size : ", data.length); 243 ++_size; 244 if (_size == 5) 245 socket.write(data, &writeClose); 246 else 247 { 248 socket.write(data, &writed); 249 } 250 251 } 252 253 void writeClose(ubyte[] data, uint size) 254 { 255 writeln("write data Size : ", size, "\t data size : ", data.length); 256 socket.close(); 257 loop.stop(); 258 // throw new Exception("hahahahhaah "); 259 } 260 261 void closed() 262 { 263 tcpList.remove(this); 264 writeln("Socket Closed ."); 265 } 266 267 private: 268 TCPSocket _socket; 269 int _size = 0; 270 } 271 272 void newConnect(Socket soc) 273 { 274 auto tcp = new TCP(loop, soc); 275 TCP.tcpList[tcp] = 0; 276 } 277 278 279 280 Acceptor accept = new Acceptor(loop); 281 282 accept.setCallBack(toDelegate(&newConnect)); 283 284 accept.reusePort(true); 285 accept.bind(new InternetAddress("0.0.0.0", 6553)); 286 287 accept.listen(64); 288 289 accept.start(); 290 291 loop.run(5000); 292 */ 293 }