1 module yu.asyncsocket.client.clientmanger;
2 
3 import std.socket;
4 
5 import yu.memory.allocator;
6 import yu.eventloop;
7 import yu.timer.eventlooptimer;
8 import yu.asyncsocket.tcpclient;
9 import yu.asyncsocket.tcpsocket;
10 import yu.asyncsocket.client.linkinfo;
11 import yu.asyncsocket.client.exception;
12 
13 import yu.timer.timingwheeltimer;
14 import yu.task;
15 import yu.exception : yuCathException;
16 
17 @trusted final class TCPClientManger {
18     alias ClientCreatorCallBack = void delegate(TCPClient) nothrow;
19     alias ConCallBack = void delegate(ClientConnection) nothrow;
20     alias LinkInfo = TLinkInfo!(ConCallBack, TCPClientManger);
21     alias NewConnection = ClientConnection delegate(TCPClient) nothrow;
22     alias STimerWheel = ITimingWheel!YuAlloctor;
23 
24     this(EventLoop loop) {
25         _loop = loop;
26     }
27 
28     ~this() {
29         if (_timer)
30             yDel(_timer);
31         if (_wheel)
32             yDel(_wheel);
33     }
34 
35     void setClientCreatorCallBack(ClientCreatorCallBack cback) {
36         _oncreator = cback;
37     }
38 
39     void setNewConnectionCallBack(NewConnection cback) {
40         _cback = cback;
41     }
42 
43     @property eventLoop() {
44         return _loop;
45     }
46 
47     @property tryCout() {
48         return _tryCout;
49     }
50 
51     @property tryCout(uint count) {
52         _tryCout = count;
53     }
54 
55     @property timeWheel() {
56         return _wheel;
57     }
58 
59     @property timeout() {
60         return _timeout;
61     }
62 
63     void startTimer(uint s) {
64         if (_wheel !is null)
65             throw new SocketClientException("TimeOut is runing!");
66         _timeout = s;
67         if (_timeout == 0)
68             return;
69 
70         uint whileSize;
71         uint time;
72         if (_timeout <= 40) {
73             whileSize = 50;
74             time = _timeout * 1000 / 50;
75         } else if (_timeout <= 120) {
76             whileSize = 60;
77             time = _timeout * 1000 / 60;
78         } else if (_timeout <= 600) {
79             whileSize = 100;
80             time = _timeout * 1000 / 100;
81         } else if (_timeout < 1000) {
82             whileSize = 150;
83             time = _timeout * 1000 / 150;
84         } else {
85             whileSize = 180;
86             time = _timeout * 1000 / 180;
87         }
88 
89         _wheel = yNew!STimerWheel(whileSize, yuAlloctor);
90         if (_timer is null)
91             _timer = yNew!EventLoopTimer(_loop);
92         _timer.setCallBack(&onTimer);
93         if (_loop.isInLoopThread()) {
94             _timer.start(time);
95         } else {
96             auto task = makeTask(yuAlloctor, &_timer.start, time);
97             task.finishedCall = &_loop.finishDoFreeYuTask;
98             _loop.post(task);
99         }
100     }
101 
102     void connect(Address addr, ConCallBack cback = null) {
103         if (_cback is null)
104             throw new SocketClientException("must set NewConnection callback ");
105         LinkInfo* info = yNew!LinkInfo();
106         info.addr = addr;
107         info.tryCount = 0;
108         info.cback = cback;
109         if (_loop.isInLoopThread()) {
110             _postConmnect(info);
111         } else {
112             auto task = makeTask(yuAlloctor, &_postConmnect, info);
113             task.finishedCall = &_loop.finishDoFreeYuTask;
114             _loop.post(task);
115         }
116     }
117 
118     void stopTimer() {
119         if (_wheel) {
120             if (_loop.isInLoopThread()) {
121                 killTimer();
122             } else {
123                 _loop.post(&killTimer);
124             }
125         }
126     }
127 
128     void connectCallBack(LinkInfo* info, bool state) {
129         import std.exception;
130 
131         if (info is null)
132             return;
133         if (state) {
134             scope (exit) {
135                 _waitConnect.rmInfo(info);
136                 yDel(info);
137             }
138             ClientConnection con;
139             con = _cback(info.client);
140             if (con is null) {
141                 auto task = makeTask!freeTcpClient(yuAlloctor, info.client);
142                 task.finishedCall = &_loop.finishDoFreeYuTask;
143                 _loop.post(task);
144                 return;
145             }
146             if (info.cback)
147                 info.cback(con);
148             if (_wheel)
149                 _wheel.addNewTimer(con);
150             con.onActive();
151         } else {
152             yDel(info.client);
153             info.client = null;
154             if (info.tryCount < _tryCout) {
155                 info.tryCount++;
156                 connect(info);
157             } else {
158                 auto cback = info.cback;
159                 _waitConnect.rmInfo(info);
160                 yDel(info);
161                 if (cback)
162                     cback(null);
163             }
164         }
165     }
166 
167 protected:
168     void connect(LinkInfo* info) {
169         info.client = yNew!TCPClient(_loop);
170         if (_oncreator)
171             _oncreator(info.client);
172         info.manger = this;
173         info.client.setCloseCallBack(&tmpCloseCallBack);
174         info.client.setConnectCallBack(&info.connectCallBack);
175         info.client.setReadCallBack(&tmpReadCallBack);
176         info.client.connect(info.addr);
177     }
178 
179     void tmpReadCallBack(in ubyte[]) nothrow{
180     }
181 
182     void tmpCloseCallBack() nothrow{
183     }
184 
185     void onTimer() nothrow{
186         _wheel.prevWheel();
187     }
188 
189 private:
190     final void _postConmnect(LinkInfo* info) {
191         _waitConnect.addInfo(info);
192         connect(info);
193     }
194 
195     void killTimer() {
196         _timer.stop();
197         if (_wheel)
198             yDel(_wheel);
199         _wheel = null;
200     }
201 
202 private:
203     uint _tryCout = 1;
204     uint _timeout;
205 
206     EventLoop _loop;
207     EventLoopTimer _timer;
208     STimerWheel _wheel;
209     TLinkManger!(ConCallBack, TCPClientManger) _waitConnect;
210 
211     NewConnection _cback;
212     ClientCreatorCallBack _oncreator;
213 }
214 
215 @trusted void freeTcpClient(TCPClient client) {
216     client.close();
217     yDel(client);
218 }
219 
220 @trusted abstract class ClientConnection : IWheelTimer!YuAlloctor {
221     this(TCPClient client) {
222         restClient(client);
223     }
224 
225     ~this() {
226         if (_client)
227             yDel(_client);
228     }
229 
230     final bool isAlive() @trusted {
231         return _client && _client.isAlive;
232     }
233 
234     final @property tcpClient() @safe {
235         return _client;
236     }
237 
238     final TCPClient restClient(TCPClient client) @trusted {
239         TCPClient tmp = _client;
240         if (_client !is null) {
241             _client.setCloseCallBack(null);
242             _client.setReadCallBack(null);
243             _client.setConnectCallBack(null);
244             _client = null;
245         }
246         if (client !is null) {
247             _client = client;
248             _loop = client.eventLoop;
249             _client.setCloseCallBack(&doClose);
250             _client.setReadCallBack(&onRead);
251             _client.setConnectCallBack(&tmpConnectCallBack);
252         }
253         return _client;
254     }
255 
256     final void write(const(ubyte)[] data, TCPWriteCallBack cback = null) @trusted {
257         if (_loop.isInLoopThread()) {
258             _postWrite(data, cback);
259         } else {
260             auto task = makeTask(yuAlloctor, &_postWrite, data, cback);
261             task.finishedCall = &_loop.finishDoFreeYuTask;
262             _loop.post(task);
263         }
264     }
265 
266     final void write(TCPWriteBuffer buffer) @trusted
267     {
268         if (_loop.isInLoopThread()) {
269             _postWriteBuffer(buffer);
270         } else {
271             auto task = makeTask(yuAlloctor, &_postWriteBuffer, buffer);
272             task.finishedCall = &_loop.finishDoFreeYuTask;
273             _loop.post(task);
274         }
275     }
276 
277     final void restTimeout() @trusted {
278         if (_loop.isInLoopThread()) {
279             rest();
280         } else {
281             auto task = makeTask(yuAlloctor, &rest, 0);
282             task.finishedCall = &_loop.finishDoFreeYuTask;
283             _loop.post(task);
284         }
285     }
286 
287     pragma(inline) final void close() @trusted {
288         _loop.post(&_postClose);
289     }
290 
291 protected:
292     void onActive() nothrow;
293     void onClose() nothrow;
294     void onRead(in ubyte[] data) nothrow;
295 private:
296     final void tmpConnectCallBack(bool) nothrow {
297     }
298 
299     final void doClose() @trusted nothrow {
300         stop();
301         onClose();
302     }
303 
304     final void _postClose() {
305         if (_client)
306             _client.close();
307     }
308 
309     final void _postWriteBuffer(TCPWriteBuffer buffer)
310     {
311         if (_client) {
312             rest();
313             _client.write(buffer);
314         } else
315             buffer.doFinish();
316     }
317 
318     final void _postWrite(const(ubyte)[] data, TCPWriteCallBack cback) {
319         if (_client) {
320             rest();
321             _client.write(data, cback);
322         } else if (cback)
323             cback(data, 0);
324     }
325 
326 private:
327     TCPClient _client;
328     EventLoop _loop;
329 }