1 module yu.asyncsocket.client.client;
2 
3 import std.socket;
4 
5 import yu.eventloop;
6 import yu.timer.eventlooptimer;
7 import yu.asyncsocket.tcpclient;
8 import yu.asyncsocket.tcpsocket;
9 import yu.asyncsocket.client.linkinfo;
10 import yu.asyncsocket.client.exception;
11 import yu.task;
12 import yu.memory.allocator;
13 import yu.exception;
14 
15 @trusted abstract class BaseClient {
16     alias ClientCreatorCallBack = void delegate(TCPClient);
17     alias LinkInfo = TLinkInfo!ClientCreatorCallBack;
18 
19     this(EventLoop loop) {
20         _loop = loop;
21     }
22 
23     ~this() {
24         if (_info.client)
25             yDel(_info.client);
26         if (_timer)
27             yDel(_timer);
28         if (_info.addr)
29             yDel(_info.addr);
30     }
31 
32     final bool isAlive() @trusted {
33         return _info.client && _info.client.isAlive;
34     }
35 
36     final void setTimeout(uint s) @safe {
37         _timeout = s;
38     }
39 
40     @property tryCount() {
41         return _tryCount;
42     }
43 
44     @property tryCount(uint count) {
45         _tryCount = count;
46     }
47 
48     final void connect(Address addr, ClientCreatorCallBack cback = null) @trusted {
49         if (isAlive)
50             throw new SocketClientException("must set NewConnection callback ");
51         _info.tryCount = 0;
52         _info.cback = cback;
53         _info.addr = addr;
54         _loop.post(&_postConnect);
55     }
56 
57     final void write(const(ubyte)[] data, TCPWriteCallBack cback = null) @trusted {
58         if (_loop.isInLoopThread()) {
59             _postWrite(data, cback);
60         } else {
61             auto task = makeTask(yuAlloctor, &_postWrite, data, cback);
62             task.finishedCall = &_loop.finishDoFreeYuTask;
63             _loop.post(task);
64         }
65     }
66 
67     final void write(TCPWriteBuffer buffer) @trusted
68     {
69         if (_loop.isInLoopThread()) {
70             _postWriteBuffer(buffer);
71         } else {
72             auto task = makeTask(yuAlloctor, &_postWriteBuffer, buffer);
73             task.finishedCall = &_loop.finishDoFreeYuTask;
74             _loop.post(task);
75         }
76     }
77 
78     pragma(inline) final void close() @trusted {
79         if (_info.client is null)
80             return;
81         _loop.post(&_postClose);
82     }
83 
84     final @property tcpClient() @trusted {
85         return _info.client;
86     }
87 
88     final @property timer() @trusted {
89         return _timer;
90     }
91 
92     final @property timeout() @safe {
93         return _timeout;
94     }
95 
96     final @property eventLoop() @trusted {
97         return _loop;
98     }
99 
100 protected:
101     void onActive() nothrow;
102     void onFailure() nothrow;
103     void onClose() nothrow;
104     void onRead(in ubyte[] data) nothrow;
105     void onTimeout() nothrow;
106 
107     final startTimer() {
108         if (_timeout == 0)
109             return;
110         if (_timer)
111             _timer.stop();
112         else {
113             _timer = yNew!EventLoopTimer(_loop);
114             _timer.setCallBack(&onTimeout);
115         }
116         _timer.start(_timeout * 1000);
117     }
118 
119 private:
120     final void connect() {
121         if (_info.client)
122             yDel(_info.client);
123         _info.client = yNew!TCPClient(_loop);
124         if (_info.cback)
125             _info.cback(_info.client);
126         _info.client.setConnectCallBack(&connectCallBack);
127         _info.client.setCloseCallBack(&doClose);
128         _info.client.setReadCallBack(&onRead);
129         _info.client.connect(_info.addr);
130     }
131 
132     final void connectCallBack(bool state) nothrow {
133         if (state) {
134             _info.cback = null;
135             onActive();
136         } else {
137             yuCathException(yDel(_info.client));
138             _info.client = null;
139             if (_info.tryCount < _tryCount) {
140                 _info.tryCount++;
141                 try{
142                     connect();
143                 } catch(Exception e){
144                     yuCathException(error(e.msg));
145                     onFailure();
146                 }
147                 
148             } else {
149                 _info.cback = null;
150                 if (_timer)
151                     _timer.stop();
152                 onFailure();
153             }
154         }
155 
156     }
157 
158     final void doClose() nothrow {
159         import yu.task;
160 
161         if (_timer)
162             _timer.stop();
163         auto client = _info.client;
164         //_loop.post!true(makeTask!freeTcpClient(yuAlloctor,client));
165         _info.client = null;
166         onClose();
167     }
168 
169 private:
170     final void _postClose() {
171         if (_info.client)
172             _info.client.close();
173     }
174 
175     final void _postWriteBuffer(TCPWriteBuffer buffer)
176     {
177         if (_info.client) {
178             _info.client.write(buffer);
179         } else
180             buffer.doFinish();
181     }
182 
183     pragma(inline) final void _postWrite(const(ubyte)[] data, TCPWriteCallBack cback) {
184         if (_info.client)
185             _info.client.write(data, cback);
186         else if (cback)
187             cback(data, 0);
188     }
189 
190     final void _postConnect() {
191         startTimer();
192         connect();
193     }
194 
195     private EventLoop _loop;
196     LinkInfo _info;
197     uint _tryCount = 1;
198     EventLoopTimer _timer;
199     uint _timeout;
200 }