1 module yu.asyncsocket.client.clientmanger; 2 3 import std.socket; 4 5 import yu.memory.allocator; 6 import yu.eventloop; 7 import yu.timer.eventlooptimer; 8 import yu.asyncsocket.tcpclient; 9 import yu.asyncsocket.tcpsocket; 10 import yu.asyncsocket.client.linkinfo; 11 import yu.asyncsocket.client.exception; 12 13 import yu.timer.timingwheeltimer; 14 import yu.task; 15 import yu.exception : yuCathException; 16 17 @trusted final class TCPClientManger { 18 alias ClientCreatorCallBack = void delegate(TCPClient) nothrow; 19 alias ConCallBack = void delegate(ClientConnection) nothrow; 20 alias LinkInfo = TLinkInfo!(ConCallBack, TCPClientManger); 21 alias NewConnection = ClientConnection delegate(TCPClient) nothrow; 22 alias STimerWheel = ITimingWheel!YuAlloctor; 23 24 this(EventLoop loop) { 25 _loop = loop; 26 } 27 28 ~this() { 29 if (_timer) 30 yDel(_timer); 31 if (_wheel) 32 yDel(_wheel); 33 } 34 35 void setClientCreatorCallBack(ClientCreatorCallBack cback) { 36 _oncreator = cback; 37 } 38 39 void setNewConnectionCallBack(NewConnection cback) { 40 _cback = cback; 41 } 42 43 @property eventLoop() { 44 return _loop; 45 } 46 47 @property tryCout() { 48 return _tryCout; 49 } 50 51 @property tryCout(uint count) { 52 _tryCout = count; 53 } 54 55 @property timeWheel() { 56 return _wheel; 57 } 58 59 @property timeout() { 60 return _timeout; 61 } 62 63 void startTimer(uint s) { 64 if (_wheel !is null) 65 throw new SocketClientException("TimeOut is runing!"); 66 _timeout = s; 67 if (_timeout == 0) 68 return; 69 70 uint whileSize; 71 uint time; 72 if (_timeout <= 40) { 73 whileSize = 50; 74 time = _timeout * 1000 / 50; 75 } else if (_timeout <= 120) { 76 whileSize = 60; 77 time = _timeout * 1000 / 60; 78 } else if (_timeout <= 600) { 79 whileSize = 100; 80 time = _timeout * 1000 / 100; 81 } else if (_timeout < 1000) { 82 whileSize = 150; 83 time = _timeout * 1000 / 150; 84 } else { 85 whileSize = 180; 86 time = _timeout * 1000 / 180; 87 } 88 89 _wheel = yNew!STimerWheel(whileSize, yuAlloctor); 90 if (_timer is null) 91 _timer = yNew!EventLoopTimer(_loop); 92 _timer.setCallBack(&onTimer); 93 if (_loop.isInLoopThread()) { 94 _timer.start(time); 95 } else { 96 auto task = makeTask(yuAlloctor, &_timer.start, time); 97 task.finishedCall = &_loop.finishDoFreeYuTask; 98 _loop.post(task); 99 } 100 } 101 102 void connect(Address addr, ConCallBack cback = null) { 103 if (_cback is null) 104 throw new SocketClientException("must set NewConnection callback "); 105 LinkInfo* info = yNew!LinkInfo(); 106 info.addr = addr; 107 info.tryCount = 0; 108 info.cback = cback; 109 if (_loop.isInLoopThread()) { 110 _postConmnect(info); 111 } else { 112 auto task = makeTask(yuAlloctor, &_postConmnect, info); 113 task.finishedCall = &_loop.finishDoFreeYuTask; 114 _loop.post(task); 115 } 116 } 117 118 void stopTimer() { 119 if (_wheel) { 120 if (_loop.isInLoopThread()) { 121 killTimer(); 122 } else { 123 _loop.post(&killTimer); 124 } 125 } 126 } 127 128 void connectCallBack(LinkInfo* info, bool state) { 129 import std.exception; 130 131 if (info is null) 132 return; 133 if (state) { 134 scope (exit) { 135 _waitConnect.rmInfo(info); 136 yDel(info); 137 } 138 ClientConnection con; 139 con = _cback(info.client); 140 if (con is null) { 141 auto task = makeTask!freeTcpClient(yuAlloctor, info.client); 142 task.finishedCall = &_loop.finishDoFreeYuTask; 143 _loop.post(task); 144 return; 145 } 146 if (info.cback) 147 info.cback(con); 148 if (_wheel) 149 _wheel.addNewTimer(con); 150 con.onActive(); 151 } else { 152 yDel(info.client); 153 info.client = null; 154 if (info.tryCount < _tryCout) { 155 info.tryCount++; 156 connect(info); 157 } else { 158 auto cback = info.cback; 159 _waitConnect.rmInfo(info); 160 yDel(info); 161 if (cback) 162 cback(null); 163 } 164 } 165 } 166 167 protected: 168 void connect(LinkInfo* info) { 169 info.client = yNew!TCPClient(_loop); 170 if (_oncreator) 171 _oncreator(info.client); 172 info.manger = this; 173 info.client.setCloseCallBack(&tmpCloseCallBack); 174 info.client.setConnectCallBack(&info.connectCallBack); 175 info.client.setReadCallBack(&tmpReadCallBack); 176 info.client.connect(info.addr); 177 } 178 179 void tmpReadCallBack(in ubyte[]) nothrow{ 180 } 181 182 void tmpCloseCallBack() nothrow{ 183 } 184 185 void onTimer() nothrow{ 186 _wheel.prevWheel(); 187 } 188 189 private: 190 final void _postConmnect(LinkInfo* info) { 191 _waitConnect.addInfo(info); 192 connect(info); 193 } 194 195 void killTimer() { 196 _timer.stop(); 197 if (_wheel) 198 yDel(_wheel); 199 _wheel = null; 200 } 201 202 private: 203 uint _tryCout = 1; 204 uint _timeout; 205 206 EventLoop _loop; 207 EventLoopTimer _timer; 208 STimerWheel _wheel; 209 TLinkManger!(ConCallBack, TCPClientManger) _waitConnect; 210 211 NewConnection _cback; 212 ClientCreatorCallBack _oncreator; 213 } 214 215 @trusted void freeTcpClient(TCPClient client) { 216 client.close(); 217 yDel(client); 218 } 219 220 @trusted abstract class ClientConnection : IWheelTimer!YuAlloctor { 221 this(TCPClient client) { 222 restClient(client); 223 } 224 225 ~this() { 226 if (_client) 227 yDel(_client); 228 } 229 230 final bool isAlive() @trusted { 231 return _client && _client.isAlive; 232 } 233 234 final @property tcpClient() @safe { 235 return _client; 236 } 237 238 final TCPClient restClient(TCPClient client) @trusted { 239 TCPClient tmp = _client; 240 if (_client !is null) { 241 _client.setCloseCallBack(null); 242 _client.setReadCallBack(null); 243 _client.setConnectCallBack(null); 244 _client = null; 245 } 246 if (client !is null) { 247 _client = client; 248 _loop = client.eventLoop; 249 _client.setCloseCallBack(&doClose); 250 _client.setReadCallBack(&onRead); 251 _client.setConnectCallBack(&tmpConnectCallBack); 252 } 253 return _client; 254 } 255 256 final void write(const(ubyte)[] data, TCPWriteCallBack cback = null) @trusted { 257 if (_loop.isInLoopThread()) { 258 _postWrite(data, cback); 259 } else { 260 auto task = makeTask(yuAlloctor, &_postWrite, data, cback); 261 task.finishedCall = &_loop.finishDoFreeYuTask; 262 _loop.post(task); 263 } 264 } 265 266 final void write(TCPWriteBuffer buffer) @trusted 267 { 268 if (_loop.isInLoopThread()) { 269 _postWriteBuffer(buffer); 270 } else { 271 auto task = makeTask(yuAlloctor, &_postWriteBuffer, buffer); 272 task.finishedCall = &_loop.finishDoFreeYuTask; 273 _loop.post(task); 274 } 275 } 276 277 final void restTimeout() @trusted { 278 if (_loop.isInLoopThread()) { 279 rest(); 280 } else { 281 auto task = makeTask(yuAlloctor, &rest, 0); 282 task.finishedCall = &_loop.finishDoFreeYuTask; 283 _loop.post(task); 284 } 285 } 286 287 pragma(inline) final void close() @trusted { 288 _loop.post(&_postClose); 289 } 290 291 protected: 292 void onActive() nothrow; 293 void onClose() nothrow; 294 void onRead(in ubyte[] data) nothrow; 295 private: 296 final void tmpConnectCallBack(bool) nothrow { 297 } 298 299 final void doClose() @trusted nothrow { 300 stop(); 301 onClose(); 302 } 303 304 final void _postClose() { 305 if (_client) 306 _client.close(); 307 } 308 309 final void _postWriteBuffer(TCPWriteBuffer buffer) 310 { 311 if (_client) { 312 rest(); 313 _client.write(buffer); 314 } else 315 buffer.doFinish(); 316 } 317 318 final void _postWrite(const(ubyte)[] data, TCPWriteCallBack cback) { 319 if (_client) { 320 rest(); 321 _client.write(data, cback); 322 } else if (cback) 323 cback(data, 0); 324 } 325 326 private: 327 TCPClient _client; 328 EventLoop _loop; 329 }