1 module yu.asyncsocket.server.tcpserver; 2 3 import std.socket; 4 5 import yu.memory.allocator; 6 import yu.eventloop; 7 import yu.asyncsocket.acceptor; 8 import yu.asyncsocket.tcpsocket; 9 import yu.asyncsocket.server.connection; 10 import yu.asyncsocket.server.exception; 11 import yu.timer.timingwheeltimer; 12 import yu.timer.eventlooptimer; 13 import yu.task; 14 import yu.exception : yuCathException; 15 16 @trusted final class TCPServer { 17 alias NewConnection = ServerConnection delegate(EventLoop, Socket) nothrow; 18 alias OnAceptorCreator = void delegate(Acceptor); 19 alias STimerWheel = ITimingWheel!YuAlloctor; 20 21 this(EventLoop loop) { 22 _loop = loop; 23 } 24 25 ~this() { 26 if (_acceptor) 27 yDel(_acceptor); 28 if (_timer) 29 yDel(_timer); 30 if (_wheel) 31 yDel(_wheel); 32 if (_bind) 33 yDel(_bind); 34 } 35 36 @property acceptor() { 37 return _acceptor; 38 } 39 40 @property eventLoop() { 41 return _loop; 42 } 43 44 @property bindAddress() { 45 return _bind; 46 } 47 48 @property timeWheel() { 49 return _wheel; 50 } 51 52 @property timeout() { 53 return _timeout; 54 } 55 56 void bind(Address addr, OnAceptorCreator ona = null) { 57 if (_acceptor !is null) 58 throw new SocketBindException("the server is areadly binded!"); 59 _bind = addr; 60 _acceptor = yNew!Acceptor(_loop, addr.addressFamily); 61 if (ona) 62 ona(_acceptor); 63 _acceptor.bind(_bind); 64 } 65 66 void listen(int listenBlock = 1024) { 67 if (_acceptor is null) 68 throw new SocketBindException("the server is not bind!"); 69 if (_cback is null) 70 throw new SocketServerException("Please set CallBack frist!"); 71 _acceptor.setCallBack(&newConnect); 72 if (_loop.isInLoopThread()) { 73 startListen(listenBlock); 74 } else { 75 auto task = makeTask(yuAlloctor, &startListen, listenBlock); 76 task.finishedCall = &_loop.finishDoFreeYuTask; 77 _loop.post(task); 78 } 79 } 80 81 void startTimer(uint s) { 82 if (_wheel !is null) 83 throw new SocketServerException("TimeOut is runing!"); 84 _timeout = s; 85 if (_timeout == 0) 86 return; 87 88 uint whileSize; 89 uint time; 90 if (_timeout <= 40) { 91 whileSize = 50; 92 time = _timeout * 1000 / 50; 93 } else if (_timeout <= 120) { 94 whileSize = 60; 95 time = _timeout * 1000 / 60; 96 } else if (_timeout <= 600) { 97 whileSize = 100; 98 time = _timeout * 1000 / 100; 99 } else if (_timeout < 1000) { 100 whileSize = 150; 101 time = _timeout * 1000 / 150; 102 } else { 103 whileSize = 180; 104 time = _timeout * 1000 / 180; 105 } 106 107 _wheel = yNew!STimerWheel(whileSize, yuAlloctor); 108 if (_timer is null) 109 _timer = yNew!EventLoopTimer(_loop); 110 if (_loop.isInLoopThread()) { 111 _timer.start(time); 112 } else { 113 auto task = makeTask(yuAlloctor, &_timer.start, time); 114 task.finishedCall = &_loop.finishDoFreeYuTask; 115 _loop.post(task); 116 } 117 } 118 119 void stopTimer() { 120 if (_wheel) { 121 if (_loop.isInLoopThread()) { 122 killTimer(); 123 } else { 124 _loop.post(&killTimer); 125 } 126 } 127 } 128 129 void setNewConntionCallBack(NewConnection cback) { 130 _cback = cback; 131 } 132 133 void close() { 134 if (_acceptor) 135 _loop.post(&_acceptor.close); 136 } 137 138 protected: 139 void newConnect(Socket socket) nothrow { 140 import std.exception; 141 142 ServerConnection connection; 143 connection = _cback(_loop, socket); 144 if (connection is null) 145 return; 146 if (connection.active() && _wheel) 147 _wheel.addNewTimer(connection); 148 } 149 150 void prevWheel() { 151 _wheel.prevWheel(); 152 } 153 154 void startListen(int block) { 155 _acceptor.listen(block); 156 _acceptor.start(); 157 } 158 159 void killTimer() { 160 _timer.stop(); 161 if (_wheel) 162 yDel(_wheel); 163 _wheel = null; 164 } 165 166 private: 167 Acceptor _acceptor; 168 EventLoop _loop; 169 Address _bind; 170 private: 171 NewConnection _cback; 172 private: 173 STimerWheel _wheel; 174 EventLoopTimer _timer; 175 uint _timeout; 176 }