1 module yu.asyncsocket.tcpsocket;
2 
3 import core.stdc.errno;
4 
5 import std.socket;
6 import std.functional;
7 import std.exception;
8 
9 import yu.eventloop;
10 import yu.asyncsocket.transport;
11 import yu.exception;
12 import std.string;
13 
14 alias TCPWriteCallBack = void delegate(const(ubyte)[] data, size_t writeSzie) nothrow;
15 alias TCPReadCallBack = void delegate(in ubyte[] buffer) nothrow;
16 
17 abstract class TCPWriteBuffer
18 {
19     // todo Send Data;
20     const(ubyte)[] data() nothrow;
21     // add send offiset and return is empty
22     bool popSize(size_t size) nothrow;
23     // do send finish
24     void doFinish() nothrow;
25 private:
26     TCPWriteBuffer _next;
27 }
28 
29 @trusted class TCPSocket : AsyncTransport, EventCallInterface {
30     this(EventLoop loop, bool isIpV6 = false) {
31         auto family = isIpV6 ? AddressFamily.INET6 : AddressFamily.INET;
32         _socket = yNew!Socket(family, SocketType.STREAM, ProtocolType.TCP);
33         this(loop, _socket);
34     }
35 
36     this(EventLoop loop, AddressFamily family) {
37         _socket = yNew!Socket(family, SocketType.STREAM, ProtocolType.TCP);
38         this(loop, _socket);
39     }
40 
41     this(EventLoop loop, Socket sock)
42     in {
43         assert(sock.addressFamily == AddressFamily.INET
44             || sock.addressFamily == AddressFamily.INET6,
45             "the AddressFamily must be AddressFamily.INET or AddressFamily.INET6");
46     }
47     body {
48         super(loop, TransportType.TCP);
49         _socket = sock;
50         _socket.blocking = false;
51         _readBuffer = makeArray!ubyte(yuAlloctor, TCP_READ_BUFFER_SIZE);
52         _event = AsyncEvent(AsynType.TCP, this, _socket.handle, true, true, true);
53         static if (IO_MODE.iocp == IOMode) {
54             _iocpBuffer.len = cast(uint)TCP_READ_BUFFER_SIZE;
55             _iocpBuffer.buf = cast(char*) _readBuffer.ptr;
56             _iocpread.event = &_event;
57             _iocpwrite.event = &_event;
58             _iocpwrite.operationType = IOCP_OP_TYPE.write;
59             _iocpread.operationType = IOCP_OP_TYPE.read;
60         }
61     }
62 
63     ~this() {
64         clearWriteQueue();
65         yDel(_socket);
66         yDel(_readBuffer);
67         _readBuffer = null;
68     }
69 
70     final override @property int fd() {
71         return cast(int) _socket.handle();
72     }
73 
74     override bool start() {
75         if (_event.isActive || !_socket.isAlive() || !_readCallBack)
76             return false;
77         _event = AsyncEvent(AsynType.TCP, this, _socket.handle, true, true, true);
78         static if (IOMode == IO_MODE.iocp) {
79             _loop.addEvent(&_event);
80             return doRead();
81         } else {
82             return _loop.addEvent(&_event);
83         }
84     }
85 
86     final override void close() {
87         if (alive) {
88             onClose();
89         } else if (_socket.isAlive()) {
90             Linger optLinger;
91             optLinger.on = 1;
92             optLinger.time = 0;
93             _socket.setOption(SocketOptionLevel.SOCKET, SocketOption.LINGER, optLinger);
94             _socket.close();
95         }
96     }
97 
98     override @property bool isAlive() @trusted nothrow {
99         return alive();
100     }
101 
102     void write(const(ubyte)[] data, TCPWriteCallBack cback) {
103         if (!alive) {
104             warning("tcp socket write on close!");
105             if (cback)
106                 cback(data, 0);
107             return;
108         }
109         auto buffer = yNew!WriteSite(data, cback);
110         write(buffer);
111     }
112 
113     void write(TCPWriteBuffer buffer)
114     {
115          if (!alive) {
116             warning("tcp socket write on close!");
117             buffer.doFinish();
118             return;
119         }
120         static if (IOMode == IO_MODE.iocp) {
121             bool dowrite = _writeQueue.empty;
122         }
123         _writeQueue.enQueue(buffer);
124         static if (IOMode == IO_MODE.iocp) {
125             trace("do write: ", dowrite);
126             if (dowrite) {
127                 _event.writeLen = 0;
128                 onWrite();
129             }
130         } else {
131             onWrite();
132         }
133     }
134 
135     mixin TransportSocketOption;
136 
137     pragma(inline) void setKeepAlive(int time, int interval) @trusted {
138         _socket.setKeepAlive(forward!(time, interval));
139     }
140 
141     pragma(inline) final void setReadCallBack(TCPReadCallBack cback) {
142         _readCallBack = cback;
143     }
144 
145     pragma(inline) final void setCloseCallBack(CallBack cback) {
146         _unActive = cback;
147     }
148 
149 protected:
150     pragma(inline, true) final @property bool alive() @trusted nothrow {
151         return _event.isActive && _socket.handle() != socket_t.init;
152     }
153 
154     override void onWrite() nothrow {
155         static if (IOMode == IO_MODE.iocp) {
156             if (!alive || _writeQueue.empty)
157                 return;
158             TCPWriteBuffer buffer = _writeQueue.front;
159             if (_event.writeLen > 0) {
160                 if (buffer.popSize(_event.writeLen)) {
161                     _writeQueue.deQueue();
162                     buffer.doFinish();
163                 }
164             }
165             while (!_writeQueue.empty){
166                 buffer = _writeQueue.front;
167                 _event.writeLen = 0;
168                 auto data = buffer.data;
169                 if(data.length == 0){
170                     _writeQueue.deQueue();
171                     buffer.doFinish();
172                     continue;
173                 }
174                 _iocpWBuf.len = cast(uint)data.length;
175                 _iocpWBuf.buf = cast(char*) data.ptr;
176                 doWrite();
177                 return;
178             }
179         } else {
180             try {
181                 import core.stdc.string;
182                 while (alive && !_writeQueue.empty) {
183                     TCPWriteBuffer buffer = _writeQueue.front;
184                     auto data =  buffer.data;
185                     if(data.length == 0){
186                         _writeQueue.deQueue();
187                         buffer.doFinish();
188                         continue;
189                     }
190                     auto len = _socket.send(data);
191                     if (len > 0) {
192                         if (buffer.popSize(len)) {
193                             _writeQueue.deQueue();
194                             buffer.doFinish();
195                         }
196                         continue;
197                     } else {
198                         if (errno == EAGAIN || errno == EWOULDBLOCK) {
199                             return;
200                         } else if (errno == 4) {
201                             continue;
202                         }
203                     }
204                     error("write size: ", len,
205                         " \n\tDo Close the erro code : ", errno,
206                         "  erro is : ", fromStringz(strerror(errno)), " \n\tthe socket fd : ",
207                         fd);
208                     onClose();
209                     return;
210                 }
211             }
212             catch (Exception e) {
213                 showException(e);
214                 onClose();
215             }
216         }
217     }
218 
219     override void onClose() nothrow {
220         if (!alive)
221             return;
222         eventLoop.delEvent(&_event);
223         clearWriteQueue();
224         try {
225             _socket.shutdown(SocketShutdown.BOTH);
226             _socket.close();
227         }
228         catch (Exception e) {
229             showException(e);
230         }
231         auto unActive = _unActive;
232         _readCallBack = null;
233         _unActive = null;
234         if (unActive)
235             unActive();
236     }
237 
238     override void onRead() nothrow {
239         static if (IOMode == IO_MODE.iocp) {
240             if (_event.readLen > 0) {
241                 _readCallBack(_readBuffer[0 .. _event.readLen]);
242             } else {
243                 onClose();
244                 return;
245             }
246             _event.readLen = 0;
247             if (alive)
248                 doRead();
249 
250         } else {
251             try {
252                 import core.stdc.string;
253 
254                 while (alive) {
255                     auto len = _socket.receive(_readBuffer);
256                     if (len > 0) {
257                         _readCallBack(_readBuffer[0 .. len]);
258                         continue;
259                     } else if (len < 0) {
260                         if (errno == EAGAIN || errno == EWOULDBLOCK) {
261                             return;
262                         } else if (errno == 4) {
263                             continue;
264                         }
265                         error("Do Close the erro code : ", errno,
266                             "  erro is : ", fromStringz(strerror(errno)),
267                             " \n\tthe socket fd : ", fd);
268                     }
269                     onClose();
270                     return;
271                 }
272             }
273             catch (Exception e) {
274                 showException(e);
275                 onClose();
276             }
277         }
278     }
279 
280     static if (IOMode == IO_MODE.iocp) {
281         bool doRead() nothrow {
282             _iocpBuffer.len = cast(uint)TCP_READ_BUFFER_SIZE;
283             _iocpBuffer.buf = cast(char*) _readBuffer.ptr;
284             _iocpread.event = &_event;
285             _iocpread.operationType = IOCP_OP_TYPE.read;
286 
287             DWORD dwReceived = 0;
288             DWORD dwFlags = 0;
289 
290             int nRet = WSARecv(cast(SOCKET) _socket.handle, &_iocpBuffer,
291                 cast(uint) 1, &dwReceived, &dwFlags, &_iocpread.ol,
292                 cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null);
293             if (nRet == SOCKET_ERROR) {
294                 DWORD dwLastError = GetLastError();
295                 if (ERROR_IO_PENDING != dwLastError) {
296                     yuCathException(error("WSARecv failed with error: ", dwLastError));
297                     onClose();
298                     return false;
299                 }
300             }
301             return true;
302         }
303 
304         bool doWrite() nothrow {
305             DWORD dwFlags = 0;
306             DWORD dwSent = 0;
307             _iocpwrite.event = &_event;
308             _iocpwrite.operationType = IOCP_OP_TYPE.write;
309             int nRet = WSASend(cast(SOCKET) _socket.handle(), &_iocpWBuf, 1,
310                 &dwSent, dwFlags, &_iocpwrite.ol, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null);
311             if (nRet == SOCKET_ERROR) {
312                 DWORD dwLastError = GetLastError();
313                 if (dwLastError != ERROR_IO_PENDING) {
314                     yuCathException(error("WSASend failed with error: ", dwLastError));
315                     onClose();
316                     return false;
317                 }
318             }
319             return true;
320         }
321     }
322 
323     final void clearWriteQueue() nothrow {
324         while (!_writeQueue.empty) {
325             TCPWriteBuffer buf = _writeQueue.deQueue();
326             buf.doFinish();
327         }
328     }
329 
330 protected:
331     import std.experimental.allocator.gc_allocator;
332 
333     Socket _socket;
334     WriteBufferQueue _writeQueue;
335     AsyncEvent _event;
336     ubyte[] _readBuffer;
337 
338     CallBack _unActive;
339     TCPReadCallBack _readCallBack;
340 
341     static if (IO_MODE.iocp == IOMode) {
342         IOCP_DATA _iocpread;
343         IOCP_DATA _iocpwrite;
344         WSABUF _iocpBuffer;
345         WSABUF _iocpWBuf;
346 
347     }
348 }
349 
350 package:
351 
352 final class WriteSite : TCPWriteBuffer
353 {
354     this(const(ubyte)[] data, TCPWriteCallBack cback = null)
355     {
356         _data = data;
357         _site = 0;
358         _cback = cback;
359     }
360 
361     override const(ubyte)[] data() nothrow
362     {
363         return _data[_site .. $];
364     }
365     // add send offiset and return is empty
366     override bool popSize(size_t size) nothrow
367     {
368         _site += size;
369         if (_site >= _data.length)
370             return true;
371         else
372             return false;
373     }
374     // do send finish
375     override void doFinish() nothrow
376     {
377         if (_cback)
378         {
379 			_cback(_data, _site);
380         }
381         _cback = null;
382         _data = null;
383         yuCathException({ yDel(this); }());
384     }
385 
386 private:
387     size_t _site = 0;
388     const(ubyte)[] _data;
389     TCPWriteCallBack _cback;
390 }
391 
392 struct WriteBufferQueue
393 {
394 	TCPWriteBuffer  front() nothrow{
395 		return _frist;
396 	}
397 
398 	bool empty() nothrow{
399 		return _frist is null;
400 	}
401 
402 	void enQueue(TCPWriteBuffer wsite) nothrow
403 	in{
404 		assert(wsite);
405 	}body{
406 		if(_last){
407 			_last._next = wsite;
408 		} else {
409 			_frist = wsite;
410 		}
411 		wsite._next = null;
412 		_last = wsite;
413 	}
414 
415 	TCPWriteBuffer deQueue() nothrow
416 	in{
417 		assert(_frist && _last);
418 	}body{
419 		TCPWriteBuffer  wsite = _frist;
420 		_frist = _frist._next;
421 		if(_frist is null)
422 			_last = null;
423 		return wsite;
424 	}
425 
426 private:
427 	TCPWriteBuffer  _last = null;
428 	TCPWriteBuffer  _frist = null;
429 }