1 module yu.asyncsocket.tcpsocket; 2 3 import core.stdc.errno; 4 5 import std.socket; 6 import std.functional; 7 import std.exception; 8 9 import yu.eventloop; 10 import yu.asyncsocket.transport; 11 import yu.exception; 12 import std.string; 13 14 alias TCPWriteCallBack = void delegate(const(ubyte)[] data, size_t writeSzie) nothrow; 15 alias TCPReadCallBack = void delegate(in ubyte[] buffer) nothrow; 16 17 abstract class TCPWriteBuffer 18 { 19 // todo Send Data; 20 const(ubyte)[] data() nothrow; 21 // add send offiset and return is empty 22 bool popSize(size_t size) nothrow; 23 // do send finish 24 void doFinish() nothrow; 25 private: 26 TCPWriteBuffer _next; 27 } 28 29 @trusted class TCPSocket : AsyncTransport, EventCallInterface { 30 this(EventLoop loop, bool isIpV6 = false) { 31 auto family = isIpV6 ? AddressFamily.INET6 : AddressFamily.INET; 32 _socket = yNew!Socket(family, SocketType.STREAM, ProtocolType.TCP); 33 this(loop, _socket); 34 } 35 36 this(EventLoop loop, AddressFamily family) { 37 _socket = yNew!Socket(family, SocketType.STREAM, ProtocolType.TCP); 38 this(loop, _socket); 39 } 40 41 this(EventLoop loop, Socket sock) 42 in { 43 assert(sock.addressFamily == AddressFamily.INET 44 || sock.addressFamily == AddressFamily.INET6, 45 "the AddressFamily must be AddressFamily.INET or AddressFamily.INET6"); 46 } 47 body { 48 super(loop, TransportType.TCP); 49 _socket = sock; 50 _socket.blocking = false; 51 _readBuffer = makeArray!ubyte(yuAlloctor, TCP_READ_BUFFER_SIZE); 52 _event = AsyncEvent(AsynType.TCP, this, _socket.handle, true, true, true); 53 static if (IO_MODE.iocp == IOMode) { 54 _iocpBuffer.len = cast(uint)TCP_READ_BUFFER_SIZE; 55 _iocpBuffer.buf = cast(char*) _readBuffer.ptr; 56 _iocpread.event = &_event; 57 _iocpwrite.event = &_event; 58 _iocpwrite.operationType = IOCP_OP_TYPE.write; 59 _iocpread.operationType = IOCP_OP_TYPE.read; 60 } 61 } 62 63 ~this() { 64 clearWriteQueue(); 65 yDel(_socket); 66 yDel(_readBuffer); 67 _readBuffer = null; 68 } 69 70 final override @property int fd() { 71 return cast(int) _socket.handle(); 72 } 73 74 override bool start() { 75 if (_event.isActive || !_socket.isAlive() || !_readCallBack) 76 return false; 77 _event = AsyncEvent(AsynType.TCP, this, _socket.handle, true, true, true); 78 static if (IOMode == IO_MODE.iocp) { 79 _loop.addEvent(&_event); 80 return doRead(); 81 } else { 82 return _loop.addEvent(&_event); 83 } 84 } 85 86 final override void close() { 87 if (alive) { 88 onClose(); 89 } else if (_socket.isAlive()) { 90 Linger optLinger; 91 optLinger.on = 1; 92 optLinger.time = 0; 93 _socket.setOption(SocketOptionLevel.SOCKET, SocketOption.LINGER, optLinger); 94 _socket.close(); 95 } 96 } 97 98 override @property bool isAlive() @trusted nothrow { 99 return alive(); 100 } 101 102 void write(const(ubyte)[] data, TCPWriteCallBack cback) { 103 if (!alive) { 104 warning("tcp socket write on close!"); 105 if (cback) 106 cback(data, 0); 107 return; 108 } 109 auto buffer = yNew!WriteSite(data, cback); 110 write(buffer); 111 } 112 113 void write(TCPWriteBuffer buffer) 114 { 115 if (!alive) { 116 warning("tcp socket write on close!"); 117 buffer.doFinish(); 118 return; 119 } 120 static if (IOMode == IO_MODE.iocp) { 121 bool dowrite = _writeQueue.empty; 122 } 123 _writeQueue.enQueue(buffer); 124 static if (IOMode == IO_MODE.iocp) { 125 trace("do write: ", dowrite); 126 if (dowrite) { 127 _event.writeLen = 0; 128 onWrite(); 129 } 130 } else { 131 onWrite(); 132 } 133 } 134 135 mixin TransportSocketOption; 136 137 pragma(inline) void setKeepAlive(int time, int interval) @trusted { 138 _socket.setKeepAlive(forward!(time, interval)); 139 } 140 141 pragma(inline) final void setReadCallBack(TCPReadCallBack cback) { 142 _readCallBack = cback; 143 } 144 145 pragma(inline) final void setCloseCallBack(CallBack cback) { 146 _unActive = cback; 147 } 148 149 protected: 150 pragma(inline, true) final @property bool alive() @trusted nothrow { 151 return _event.isActive && _socket.handle() != socket_t.init; 152 } 153 154 override void onWrite() nothrow { 155 static if (IOMode == IO_MODE.iocp) { 156 if (!alive || _writeQueue.empty) 157 return; 158 TCPWriteBuffer buffer = _writeQueue.front; 159 if (_event.writeLen > 0) { 160 if (buffer.popSize(_event.writeLen)) { 161 _writeQueue.deQueue(); 162 buffer.doFinish(); 163 } 164 } 165 while (!_writeQueue.empty){ 166 buffer = _writeQueue.front; 167 _event.writeLen = 0; 168 auto data = buffer.data; 169 if(data.length == 0){ 170 _writeQueue.deQueue(); 171 buffer.doFinish(); 172 continue; 173 } 174 _iocpWBuf.len = cast(uint)data.length; 175 _iocpWBuf.buf = cast(char*) data.ptr; 176 doWrite(); 177 return; 178 } 179 } else { 180 try { 181 import core.stdc.string; 182 while (alive && !_writeQueue.empty) { 183 TCPWriteBuffer buffer = _writeQueue.front; 184 auto data = buffer.data; 185 if(data.length == 0){ 186 _writeQueue.deQueue(); 187 buffer.doFinish(); 188 continue; 189 } 190 auto len = _socket.send(data); 191 if (len > 0) { 192 if (buffer.popSize(len)) { 193 _writeQueue.deQueue(); 194 buffer.doFinish(); 195 } 196 continue; 197 } else { 198 if (errno == EAGAIN || errno == EWOULDBLOCK) { 199 return; 200 } else if (errno == 4) { 201 continue; 202 } 203 } 204 error("write size: ", len, 205 " \n\tDo Close the erro code : ", errno, 206 " erro is : ", fromStringz(strerror(errno)), " \n\tthe socket fd : ", 207 fd); 208 onClose(); 209 return; 210 } 211 } 212 catch (Exception e) { 213 showException(e); 214 onClose(); 215 } 216 } 217 } 218 219 override void onClose() nothrow { 220 if (!alive) 221 return; 222 eventLoop.delEvent(&_event); 223 clearWriteQueue(); 224 try { 225 _socket.shutdown(SocketShutdown.BOTH); 226 _socket.close(); 227 } 228 catch (Exception e) { 229 showException(e); 230 } 231 auto unActive = _unActive; 232 _readCallBack = null; 233 _unActive = null; 234 if (unActive) 235 unActive(); 236 } 237 238 override void onRead() nothrow { 239 static if (IOMode == IO_MODE.iocp) { 240 if (_event.readLen > 0) { 241 _readCallBack(_readBuffer[0 .. _event.readLen]); 242 } else { 243 onClose(); 244 return; 245 } 246 _event.readLen = 0; 247 if (alive) 248 doRead(); 249 250 } else { 251 try { 252 import core.stdc.string; 253 254 while (alive) { 255 auto len = _socket.receive(_readBuffer); 256 if (len > 0) { 257 _readCallBack(_readBuffer[0 .. len]); 258 continue; 259 } else if (len < 0) { 260 if (errno == EAGAIN || errno == EWOULDBLOCK) { 261 return; 262 } else if (errno == 4) { 263 continue; 264 } 265 error("Do Close the erro code : ", errno, 266 " erro is : ", fromStringz(strerror(errno)), 267 " \n\tthe socket fd : ", fd); 268 } 269 onClose(); 270 return; 271 } 272 } 273 catch (Exception e) { 274 showException(e); 275 onClose(); 276 } 277 } 278 } 279 280 static if (IOMode == IO_MODE.iocp) { 281 bool doRead() nothrow { 282 _iocpBuffer.len = cast(uint)TCP_READ_BUFFER_SIZE; 283 _iocpBuffer.buf = cast(char*) _readBuffer.ptr; 284 _iocpread.event = &_event; 285 _iocpread.operationType = IOCP_OP_TYPE.read; 286 287 DWORD dwReceived = 0; 288 DWORD dwFlags = 0; 289 290 int nRet = WSARecv(cast(SOCKET) _socket.handle, &_iocpBuffer, 291 cast(uint) 1, &dwReceived, &dwFlags, &_iocpread.ol, 292 cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null); 293 if (nRet == SOCKET_ERROR) { 294 DWORD dwLastError = GetLastError(); 295 if (ERROR_IO_PENDING != dwLastError) { 296 yuCathException(error("WSARecv failed with error: ", dwLastError)); 297 onClose(); 298 return false; 299 } 300 } 301 return true; 302 } 303 304 bool doWrite() nothrow { 305 DWORD dwFlags = 0; 306 DWORD dwSent = 0; 307 _iocpwrite.event = &_event; 308 _iocpwrite.operationType = IOCP_OP_TYPE.write; 309 int nRet = WSASend(cast(SOCKET) _socket.handle(), &_iocpWBuf, 1, 310 &dwSent, dwFlags, &_iocpwrite.ol, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null); 311 if (nRet == SOCKET_ERROR) { 312 DWORD dwLastError = GetLastError(); 313 if (dwLastError != ERROR_IO_PENDING) { 314 yuCathException(error("WSASend failed with error: ", dwLastError)); 315 onClose(); 316 return false; 317 } 318 } 319 return true; 320 } 321 } 322 323 final void clearWriteQueue() nothrow { 324 while (!_writeQueue.empty) { 325 TCPWriteBuffer buf = _writeQueue.deQueue(); 326 buf.doFinish(); 327 } 328 } 329 330 protected: 331 import std.experimental.allocator.gc_allocator; 332 333 Socket _socket; 334 WriteBufferQueue _writeQueue; 335 AsyncEvent _event; 336 ubyte[] _readBuffer; 337 338 CallBack _unActive; 339 TCPReadCallBack _readCallBack; 340 341 static if (IO_MODE.iocp == IOMode) { 342 IOCP_DATA _iocpread; 343 IOCP_DATA _iocpwrite; 344 WSABUF _iocpBuffer; 345 WSABUF _iocpWBuf; 346 347 } 348 } 349 350 package: 351 352 final class WriteSite : TCPWriteBuffer 353 { 354 this(const(ubyte)[] data, TCPWriteCallBack cback = null) 355 { 356 _data = data; 357 _site = 0; 358 _cback = cback; 359 } 360 361 override const(ubyte)[] data() nothrow 362 { 363 return _data[_site .. $]; 364 } 365 // add send offiset and return is empty 366 override bool popSize(size_t size) nothrow 367 { 368 _site += size; 369 if (_site >= _data.length) 370 return true; 371 else 372 return false; 373 } 374 // do send finish 375 override void doFinish() nothrow 376 { 377 if (_cback) 378 { 379 _cback(_data, _site); 380 } 381 _cback = null; 382 _data = null; 383 yuCathException({ yDel(this); }()); 384 } 385 386 private: 387 size_t _site = 0; 388 const(ubyte)[] _data; 389 TCPWriteCallBack _cback; 390 } 391 392 struct WriteBufferQueue 393 { 394 TCPWriteBuffer front() nothrow{ 395 return _frist; 396 } 397 398 bool empty() nothrow{ 399 return _frist is null; 400 } 401 402 void enQueue(TCPWriteBuffer wsite) nothrow 403 in{ 404 assert(wsite); 405 }body{ 406 if(_last){ 407 _last._next = wsite; 408 } else { 409 _frist = wsite; 410 } 411 wsite._next = null; 412 _last = wsite; 413 } 414 415 TCPWriteBuffer deQueue() nothrow 416 in{ 417 assert(_frist && _last); 418 }body{ 419 TCPWriteBuffer wsite = _frist; 420 _frist = _frist._next; 421 if(_frist is null) 422 _last = null; 423 return wsite; 424 } 425 426 private: 427 TCPWriteBuffer _last = null; 428 TCPWriteBuffer _frist = null; 429 }