1 module yu.eventloop.selector.kqueue;
2 
3 import yu.eventloop.common;
4 import yu.memory.allocator;
5 
6 package(yu):
7 
8 static if (IOMode == IO_MODE.kqueue) {
9 
10     import core.stdc.errno;
11     import core.sys.posix.sys.types; // for ssize_t, size_t
12     import core.sys.posix.netinet.tcp;
13     import core.sys.posix.netinet.in_;
14     import core.sys.posix.time;
15     import core.sys.posix.unistd;
16 
17     import std.exception;
18     import std.socket;
19     import yu.exception : yuCathException, showException;
20 
21     struct KqueueLoop {
22         void initer() {
23             if (_event)
24                 return;
25             _efd = kqueue();
26             errnoEnforce((_efd >= 0), "kqueue failed");
27             _event = yNew!EventChannel();
28             addEvent(_event.event);
29         }
30 
31         ~this() {
32             if (_event) {
33                 .close(_efd);
34                 yDel(_event);
35             }
36         }
37 
38         /** 添加一个Channel对象到事件队列中。
39             @param   socket = 添加到时间队列中的Channel对象,根据其type自动选择需要注册的事件。
40             @return true 添加成功, false 添加失败,并把错误记录到日志中.
41             */
42         bool addEvent(AsyncEvent* event) nothrow {
43             int err = 0;
44             if (event.type() == AsynType.TIMER) {
45                 kevent_t ev;
46                 event.timeOut = event.timeOut < 20 ? 20 : event.timeOut;
47                 event._fd = getTimerfd();
48                 EV_SET(&ev, event.fd, EVFILT_TIMER,
49                     EV_ADD | EV_ENABLE | EV_CLEAR, 0, event.timeOut, event); //单位毫秒
50                 err = kevent(_efd, &ev, 1, null, 0, null);
51             } else if (event.enRead && event.enWrite) {
52                 kevent_t[2] ev = void;
53                 short read = EV_ADD | EV_ENABLE;
54                 short write = EV_ADD | EV_ENABLE;
55                 if (event.etMode) {
56                     read |= EV_CLEAR;
57                     write |= EV_CLEAR;
58                 }
59                 EV_SET(&(ev[0]), event.fd, EVFILT_READ, read, 0, 0, event);
60                 EV_SET(&(ev[1]), event.fd, EVFILT_WRITE, write, 0, 0, event);
61                 err = kevent(_efd, &(ev[0]), 2, null, 0, null);
62             } else if (event.enRead) {
63                 kevent_t ev;
64                 short read = EV_ADD | EV_ENABLE;
65                 if (event.etMode)
66                     read |= EV_CLEAR;
67                 EV_SET(&ev, event.fd, EVFILT_READ, read, 0, 0, event);
68                 err = kevent(_efd, &ev, 1, null, 0, null);
69             } else if (event.enWrite) {
70                 kevent_t ev;
71                 short write = EV_ADD | EV_ENABLE;
72                 if (event.etMode)
73                     write |= EV_CLEAR;
74                 EV_SET(&ev, event.fd, EVFILT_WRITE, write, 0, 0, event);
75                 err = kevent(_efd, &ev, 1, null, 0, null);
76             } else {
77                 return false;
78             }
79 
80             if (err < 0) {
81                 return false;
82             }
83 
84             event.isActive = true;
85             return true;
86         }
87 
88         bool modEvent(AsyncEvent* event) nothrow {
89             int err = 0;
90             if (event.type() != AsynType.TCP && event.type() != AsynType.UDP) {
91                 return false;
92             }
93 
94             kevent_t[2] ev = void;
95             short read = EV_ADD | EV_ENABLE;
96             short write = EV_ADD | EV_ENABLE;
97             if (event.etMode) {
98                 read |= EV_CLEAR;
99                 write |= EV_CLEAR;
100             }
101             if (event.enRead) {
102                 EV_SET(&ev[0], event.fd, EVFILT_READ, read, 0, 0, event);
103             } else {
104                 EV_SET(&ev[0], event.fd, EVFILT_READ, EV_DELETE, 0, 0, event);
105             }
106 
107             if (event.enWrite) {
108                 EV_SET(&ev[1], event.fd, EVFILT_WRITE, write, 0, 0, event);
109             } else {
110                 EV_SET(&ev[1], event.fd, EVFILT_WRITE, EV_DELETE, 0, 0, event);
111             }
112             kevent(_efd, ev.ptr, 2, null, 0, null);
113             event.isActive = true;
114             return true;
115         }
116 
117         /** 从epoll队列中移除Channel对象。
118             @param socket = 需要移除的Channel对象
119             @return (true) 移除成功, (false) 移除失败,并把错误输出到控制台.
120             */
121         bool delEvent(AsyncEvent* event) nothrow {
122             int err = 0;
123             if (event.type() == AsynType.TIMER) {
124                 kevent_t ev;
125                 EV_SET(&ev, event.fd, EVFILT_TIMER, EV_DELETE, 0, 0, event);
126                 err = kevent(_efd, &ev, 1, null, 0, null);
127             } else if (event.enRead && event.enWrite) {
128                 kevent_t[2] ev = void;
129                 EV_SET(&(ev[0]), event.fd, EVFILT_READ, EV_DELETE, 0, 0, event);
130                 EV_SET(&(ev[1]), event.fd, EVFILT_WRITE, EV_DELETE, 0, 0, event);
131                 err = kevent(_efd, &(ev[0]), 2, null, 0, null);
132             } else if (event.enRead) {
133                 kevent_t ev;
134                 short read = EV_ADD | EV_ENABLE;
135                 EV_SET(&ev, event.fd, EVFILT_READ, EV_DELETE, 0, 0, event);
136                 err = kevent(_efd, &ev, 1, null, 0, null);
137             } else if (event.enWrite) {
138                 kevent_t ev;
139                 short write = EV_ADD | EV_ENABLE;
140                 if (event.etMode)
141                     write |= EV_CLEAR;
142                 EV_SET(&ev, event.fd, EVFILT_WRITE, EV_DELETE, 0, 0, event);
143                 err = kevent(_efd, &ev, 1, null, 0, null);
144             } else {
145                 return false;
146             }
147 
148             event.isActive = false;
149             return true;
150         }
151 
152         /** 调用epoll_wait。
153             *    @param    timeout = epoll_wait的等待时间
154             *    @param    eptr   = epoll返回时间的存储的数组指针
155             *    @param    size   = 数组的大小
156             *    @return 返回当前获取的事件的数量。
157             */
158 
159         void wait(int timeout) nothrow {
160             auto tm = timeout % 1000;
161             auto tspec = timespec(timeout / 1000, tm * 1000 * 1000);
162             kevent_t[64] events;
163             auto len = kevent(_efd, null, 0, events.ptr, 64, &tspec);
164             if (len < 1) return;
165             foreach(i; 0 .. len){
166                 auto ev = cast(AsyncEvent*) events[i].udata;
167 
168                 if ((events[i].flags & EV_EOF) || (events[i].flags & EV_ERROR)) {
169                     ev.obj.onClose();
170                     continue;
171                 }
172 
173                 if (ev.type() == AsynType.TIMER) {
174                     ev.obj.onRead();
175                     continue;
176                 }
177 
178                 if (events[i].filter & EVFILT_WRITE) ev.obj.onWrite();
179         
180                 if (events[i].filter & EVFILT_READ) ev.obj.onRead();
181             }
182         }
183 
184         void weakUp() nothrow {
185             _event.doWrite();
186         }
187 
188     private:
189         /** 存储 epoll的fd */
190         int _efd;
191         EventChannel _event;
192     }
193 
194     static this() {
195         import core.sys.posix.signal;
196 
197         signal(SIGPIPE, SIG_IGN);
198     }
199 
200     private final class EventChannel : EventCallInterface {
201         this() {
202             _pair = createPair();
203             _pair[0].blocking = false;
204             _pair[1].blocking = false;
205             _event = AsyncEvent(AsynType.EVENT, this, _pair[1].handle(), true, false,
206                 false);
207         }
208 
209         ~this() {
210             yDel(_pair[0]);
211             yDel(_pair[1]);
212         }
213 
214         void doWrite() nothrow {
215             yuCathException(_pair[0].send("wekup"));
216         }
217 
218         static Socket[2] createPair() {
219             int[2] socks;
220             if (socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == -1)
221                 throw new SocketOSException("Unable to create socket pair");
222 
223             Socket toSocket(size_t id) {
224                 auto fd = cast(socket_t) socks[id];
225                 auto s = yNew!Socket(fd, AddressFamily.UNIX);
226                 return s;
227             }
228 
229             return [toSocket(0), toSocket(1)];
230         }
231 
232         override void onRead() nothrow {
233             ubyte[128] data;
234             while (true) {
235                 try {
236                     if (_pair[1].receive(data) <= 0)
237                         return;
238                 }
239                 catch (Exception e) {
240                     showException(e);
241                 }
242             }
243         }
244 
245         override void onWrite() nothrow {
246         }
247 
248         override void onClose() nothrow {
249         }
250 
251         @property AsyncEvent* event() {
252             return &_event;
253         }
254 
255         Socket[2] _pair;
256         AsyncEvent _event;
257     }
258 
259     auto getTimerfd() {
260         import core.atomic;
261 
262         static shared int i = int.max;
263         atomicOp!"-="(i, 1);
264         if (i < 655350)
265             i = int.max;
266         return cast(socket_t) i;
267     }
268 
269 extern (C):
270 @nogc:
271 nothrow:
272 
273     enum : short {
274         EVFILT_READ = -1,
275         EVFILT_WRITE = -2,
276         EVFILT_AIO = -3, /* attached to aio requests */
277         EVFILT_VNODE = -4, /* attached to vnodes */
278         EVFILT_PROC = -5, /* attached to struct proc */
279         EVFILT_SIGNAL = -6, /* attached to struct proc */
280         EVFILT_TIMER = -7, /* timers */
281         EVFILT_MACHPORT = -8, /* Mach portsets */
282         EVFILT_FS = -9, /* filesystem events */
283         EVFILT_USER = -10, /* User events */
284         EVFILT_VM = -12, /* virtual memory events */
285         EVFILT_SYSCOUNT = 11
286     }
287 
288     extern (D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) {
289         *kevp = kevent_t(args);
290     }
291 
292     struct kevent_t {
293         uintptr_t ident; /* identifier for this event */
294         short filter; /* filter for event */
295         ushort flags;
296         uint fflags;
297         intptr_t data;
298         void* udata; /* opaque user data identifier */
299     }
300 
301     enum {
302         /* actions */
303         EV_ADD = 0x0001, /* add event to kq (implies enable) */
304         EV_DELETE = 0x0002, /* delete event from kq */
305         EV_ENABLE = 0x0004, /* enable event */
306         EV_DISABLE = 0x0008, /* disable event (not reported) */
307 
308         /* flags */
309         EV_ONESHOT = 0x0010, /* only report one occurrence */
310         EV_CLEAR = 0x0020, /* clear event state after reporting */
311         EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
312         EV_DISPATCH = 0x0080, /* disable event after reporting */
313 
314         EV_SYSFLAGS = 0xF000, /* reserved by system */
315         EV_FLAG1 = 0x2000, /* filter-specific flag */
316 
317         /* returned values */
318         EV_EOF = 0x8000, /* EOF detected */
319         EV_ERROR = 0x4000, /* error, data contains errno */
320 
321     
322 
323 }
324 
325 enum {
326     /*
327         * data/hint flags/masks for EVFILT_USER, shared with userspace
328         *
329         * On input, the top two bits of fflags specifies how the lower twenty four
330         * bits should be applied to the stored value of fflags.
331         *
332         * On output, the top two bits will always be set to NOTE_FFNOP and the
333         * remaining twenty four bits will contain the stored fflags value.
334         */
335     NOTE_FFNOP = 0x00000000, /* ignore input fflags */
336     NOTE_FFAND = 0x40000000, /* AND fflags */
337     NOTE_FFOR = 0x80000000, /* OR fflags */
338     NOTE_FFCOPY = 0xc0000000, /* copy fflags */
339     NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */
340     NOTE_FFLAGSMASK = 0x00ffffff,
341 
342     NOTE_TRIGGER = 0x01000000, /* Cause the event to be
343                                     triggered for output. */
344 
345     /*
346         * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace
347         */
348     NOTE_LOWAT = 0x0001, /* low water mark */
349 
350     /*
351         * data/hint flags for EVFILT_VNODE, shared with userspace
352         */
353     NOTE_DELETE = 0x0001, /* vnode was removed */
354     NOTE_WRITE = 0x0002, /* data contents changed */
355     NOTE_EXTEND = 0x0004, /* size increased */
356     NOTE_ATTRIB = 0x0008, /* attributes changed */
357     NOTE_LINK = 0x0010, /* link count changed */
358     NOTE_RENAME = 0x0020, /* vnode was renamed */
359     NOTE_REVOKE = 0x0040, /* vnode access was revoked */
360 
361     /*
362         * data/hint flags for EVFILT_PROC, shared with userspace
363         */
364     NOTE_EXIT = 0x80000000, /* process exited */
365     NOTE_FORK = 0x40000000, /* process forked */
366     NOTE_EXEC = 0x20000000, /* process exec'd */
367     NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */
368     NOTE_PDATAMASK = 0x000fffff, /* mask for pid */
369 
370     /* additional flags for EVFILT_PROC */
371     NOTE_TRACK = 0x00000001, /* follow across forks */
372     NOTE_TRACKERR = 0x00000002, /* could not track child */
373     NOTE_CHILD = 0x00000004, /* am a child process */
374 
375 
376 
377 }
378 
379 int kqueue();
380 int kevent(int kq, const kevent_t* changelist, int nchanges, kevent_t* eventlist,
381     int nevents, const timespec* timeout);
382 }