1 module yu.eventloop.selector.epoll;
2 
3 import yu.eventloop.common;
4 import yu.memory.allocator;
5 
6 version (linux)  : package(yu) : import core.time;
7 import core.stdc.errno;
8 import core.memory;
9 
10 import core.sys.posix.sys.types; // for ssize_t, size_t
11 import core.sys.posix.netinet.tcp;
12 import core.sys.posix.netinet.in_;
13 import core.sys.posix.time : itimerspec, CLOCK_MONOTONIC;
14 import core.sys.posix.unistd;
15 
16 import std.algorithm;
17 import std.array;
18 import std.conv;
19 import std.exception;
20 import std.format;
21 import std.socket;
22 import std.experimental.logger;
23 
24 import yu.exception : yuCathException;
25 
26 /** 系统I/O事件处理类,epoll操作的封装
27  */
28 struct EpollLoop {
29     void initer() {
30         if (_event)
31             return;
32         _efd = epoll_create1(0);
33         errnoEnforce((_efd >= 0), "epoll_create1 failed");
34         _event = yNew!EventChannel();
35         addEvent(_event.event);
36     }
37 
38     /** 析构函数,释放epoll。
39 	 */
40     ~this() {
41         if (_event) {
42             .close(_efd);
43             yDel(_event);
44         }
45     }
46 
47     /** 添加一个Channel对象到事件队列中。
48 	 @param   socket = 添加到时间队列中的Channel对象,根据其type自动选择需要注册的事件。
49 	 @return true 添加成功, false 添加失败,并把错误记录到日志中.
50 	 */
51     bool addEvent(AsyncEvent * event) nothrow {
52         if (event.fd == socket_t.init) {
53             event.isActive = false;
54             return false;
55         }
56 
57         mixin(mixinModEvent());
58         if ((epoll_ctl(_efd, EPOLL_CTL_ADD, event.fd,  & ev)) != 0) {
59             if (errno != EEXIST)
60                 return false;
61         }
62         event.isActive = true;
63         return true;
64     }
65 
66     bool modEvent(AsyncEvent * event) nothrow {
67         if (event.fd == socket_t.init) {
68             event.isActive = false;
69             return false;
70         }
71         mixin(mixinModEvent());
72 
73         if ((epoll_ctl(_efd, EPOLL_CTL_MOD, event.fd,  & ev)) != 0) {
74             return false;
75         }
76         event.isActive = true;
77         return true;
78     }
79 
80     /** 从epoll队列中移除Channel对象。
81 	 @param socket = 需要移除的Channel对象
82 	 @return (true) 移除成功, (false) 移除失败,并把错误输出到控制台.
83 	 */
84     bool delEvent(AsyncEvent * event) nothrow {
85         if (event.fd == socket_t.init) {
86             event.isActive = false;
87             return false;
88         }
89         epoll_event ev;
90         if ((epoll_ctl(_efd, EPOLL_CTL_DEL, event.fd,  & ev)) != 0) {
91             yuCathException(error("EPOLL_CTL_DEL erro! ", event.fd));
92             return false;
93         }
94         event.isActive = false;
95         return true;
96     }
97 
98     /** 调用epoll_wait。
99 	 *    @param    timeout = epoll_wait的等待时间
100 	 *    @param    eptr   = epoll返回时间的存储的数组指针
101 	 *    @param    size   = 数组的大小
102 	 *    @return 返回当前获取的事件的数量。
103 	 */
104 
105     void wait(int timeout)  nothrow  {
106         epoll_event[64] events;
107         auto len = epoll_wait(_efd, events.ptr, 64, timeout);
108         if(len < 1) return;
109         foreach(i;0..len){
110             AsyncEvent * ev = cast(AsyncEvent * )(events[i].data.ptr);
111 
112             if (isErro(events[i].events)) {
113                 ev.obj.onClose();
114                 continue;
115             }
116             if (isWrite(events[i].events)) ev.obj.onWrite();
117 
118             if (isRead(events[i].events))  ev.obj.onRead();
119         }
120     }
121 
122     void weakUp() nothrow {
123         _event.doWrite();
124     }
125 
126     protected : pragma(inline, true) bool isErro(uint events)  nothrow {
127         return (events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) != 0;
128     }
129     pragma(inline, true) bool isRead(uint events)  nothrow {
130         return (events & EPOLLIN) != 0;
131     }
132     pragma(inline, true) bool isWrite(uint events)  nothrow  {
133         return (events & EPOLLOUT) != 0;
134     }
135 
136     private :  /** 存储 epoll的fd */
137     int _efd;
138     EventChannel _event;
139 }
140 
141 static this() {
142     import core.sys.posix.signal;
143 
144     signal(SIGPIPE, SIG_IGN);
145 }
146 
147 enum EPOLL_EVENT : short {
148     init =  - 5
149 }
150 
151 final class EventChannel : EventCallInterface {
152     this() {
153         _fd = cast(socket_t) eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
154         _event = AsyncEvent(AsynType.EVENT, this, _fd, true, false, false);
155     }
156     ~this() {
157         .close(_fd);
158     }
159 
160     void doWrite() nothrow {
161         ulong ul = 1;
162         core.sys.posix.unistd.write(_fd,  & ul, ul.sizeof);
163     }
164     override void onRead() nothrow {
165         ulong ul = 1;
166         size_t len = read(_fd,  & ul, ul.sizeof);
167     }
168 
169     override void onWrite() nothrow {
170     }
171 
172     override void onClose() nothrow {
173     }
174 
175     @property AsyncEvent * event() {
176         return  & _event;
177     }
178 
179     socket_t _fd;
180     AsyncEvent _event;
181 }
182 
183 string mixinModEvent() {
184     return q{
185         epoll_event ev;
186         ev.data.ptr = event;
187         ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP;
188         if (event.enRead)
189             ev.events |= EPOLLIN;
190         if (event.enWrite)
191             ev.events |= EPOLLOUT;
192         if (event.oneShot)
193             ev.events |= EPOLLONESHOT;
194         if (event.etMode)
195             ev.events |= EPOLLET;
196     };
197 }
198 
199 extern (C) : @system : nothrow : enum {
200     EFD_SEMAPHORE = 0x1,
201     EFD_CLOEXEC = 0x80000,
202     EFD_NONBLOCK = 0x800
203 };
204 
205 enum {
206     EPOLL_CLOEXEC = 0x80000,
207     EPOLL_NONBLOCK = 0x800
208 }
209 
210 enum {
211     EPOLLIN = 0x001,
212     EPOLLPRI = 0x002,
213     EPOLLOUT = 0x004,
214     EPOLLRDNORM = 0x040,
215     EPOLLRDBAND = 0x080,
216     EPOLLWRNORM = 0x100,
217     EPOLLWRBAND = 0x200,
218     EPOLLMSG = 0x400,
219     EPOLLERR = 0x008,
220     EPOLLHUP = 0x010,
221     EPOLLRDHUP = 0x2000, // since Linux 2.6.17
222     EPOLLONESHOT = 1u << 30,
223     EPOLLET = 1u << 31
224 }
225 
226 /* Valid opcodes ( "op" parameter ) to issue to epoll_ctl().  */
227 enum {
228     EPOLL_CTL_ADD = 1, // Add a file descriptor to the interface.
229     EPOLL_CTL_DEL = 2, // Remove a file descriptor from the interface.
230     EPOLL_CTL_MOD = 3, // Change file descriptor epoll_event structure.
231 }
232 
233 align(1) struct epoll_event {
234     align(1) : uint events;
235     epoll_data_t data;
236 }
237 
238 union epoll_data_t {
239     void * ptr;
240     int fd;
241     uint u32;
242     ulong u64;
243 }
244 
245 int epoll_create(int size);
246 int epoll_create1(int flags);
247 int epoll_ctl(int epfd, int op, int fd, epoll_event * event);
248 int epoll_wait(int epfd, epoll_event * events, int maxevents, int timeout);
249 
250 int eventfd(uint initval, int flags);
251 
252 //timerfd
253 
254 int timerfd_create(int clockid, int flags);
255 int timerfd_settime(int fd, int flags, const itimerspec * new_value, itimerspec * old_value);
256 int timerfd_gettime(int fd, itimerspec * curr_value);
257 
258 enum TFD_TIMER_ABSTIME = 1 << 0;
259 enum TFD_CLOEXEC = 0x80000;
260 enum TFD_NONBLOCK = 0x800;