1 module yu.eventloop.selector.kqueue; 2 3 import yu.eventloop.common; 4 import yu.memory.allocator; 5 6 package(yu): 7 8 static if (IOMode == IO_MODE.kqueue) { 9 10 import core.stdc.errno; 11 import core.sys.posix.sys.types; // for ssize_t, size_t 12 import core.sys.posix.netinet.tcp; 13 import core.sys.posix.netinet.in_; 14 import core.sys.posix.time; 15 import core.sys.posix.unistd; 16 17 import std.exception; 18 import std.socket; 19 import yu.exception : yuCathException, showException; 20 21 struct KqueueLoop { 22 void initer() { 23 if (_event) 24 return; 25 _efd = kqueue(); 26 errnoEnforce((_efd >= 0), "kqueue failed"); 27 _event = yNew!EventChannel(); 28 addEvent(_event.event); 29 } 30 31 ~this() { 32 if (_event) { 33 .close(_efd); 34 yDel(_event); 35 } 36 } 37 38 /** 添加一个Channel对象到事件队列中。 39 @param socket = 添加到时间队列中的Channel对象,根据其type自动选择需要注册的事件。 40 @return true 添加成功, false 添加失败,并把错误记录到日志中. 41 */ 42 bool addEvent(AsyncEvent* event) nothrow { 43 int err = 0; 44 if (event.type() == AsynType.TIMER) { 45 kevent_t ev; 46 event.timeOut = event.timeOut < 20 ? 20 : event.timeOut; 47 event._fd = getTimerfd(); 48 EV_SET(&ev, event.fd, EVFILT_TIMER, 49 EV_ADD | EV_ENABLE | EV_CLEAR, 0, event.timeOut, event); //单位毫秒 50 err = kevent(_efd, &ev, 1, null, 0, null); 51 } else if (event.enRead && event.enWrite) { 52 kevent_t[2] ev = void; 53 short read = EV_ADD | EV_ENABLE; 54 short write = EV_ADD | EV_ENABLE; 55 if (event.etMode) { 56 read |= EV_CLEAR; 57 write |= EV_CLEAR; 58 } 59 EV_SET(&(ev[0]), event.fd, EVFILT_READ, read, 0, 0, event); 60 EV_SET(&(ev[1]), event.fd, EVFILT_WRITE, write, 0, 0, event); 61 err = kevent(_efd, &(ev[0]), 2, null, 0, null); 62 } else if (event.enRead) { 63 kevent_t ev; 64 short read = EV_ADD | EV_ENABLE; 65 if (event.etMode) 66 read |= EV_CLEAR; 67 EV_SET(&ev, event.fd, EVFILT_READ, read, 0, 0, event); 68 err = kevent(_efd, &ev, 1, null, 0, null); 69 } else if (event.enWrite) { 70 kevent_t ev; 71 short write = EV_ADD | EV_ENABLE; 72 if (event.etMode) 73 write |= EV_CLEAR; 74 EV_SET(&ev, event.fd, EVFILT_WRITE, write, 0, 0, event); 75 err = kevent(_efd, &ev, 1, null, 0, null); 76 } else { 77 return false; 78 } 79 80 if (err < 0) { 81 return false; 82 } 83 84 event.isActive = true; 85 return true; 86 } 87 88 bool modEvent(AsyncEvent* event) nothrow { 89 int err = 0; 90 if (event.type() != AsynType.TCP && event.type() != AsynType.UDP) { 91 return false; 92 } 93 94 kevent_t[2] ev = void; 95 short read = EV_ADD | EV_ENABLE; 96 short write = EV_ADD | EV_ENABLE; 97 if (event.etMode) { 98 read |= EV_CLEAR; 99 write |= EV_CLEAR; 100 } 101 if (event.enRead) { 102 EV_SET(&ev[0], event.fd, EVFILT_READ, read, 0, 0, event); 103 } else { 104 EV_SET(&ev[0], event.fd, EVFILT_READ, EV_DELETE, 0, 0, event); 105 } 106 107 if (event.enWrite) { 108 EV_SET(&ev[1], event.fd, EVFILT_WRITE, write, 0, 0, event); 109 } else { 110 EV_SET(&ev[1], event.fd, EVFILT_WRITE, EV_DELETE, 0, 0, event); 111 } 112 kevent(_efd, ev.ptr, 2, null, 0, null); 113 event.isActive = true; 114 return true; 115 } 116 117 /** 从epoll队列中移除Channel对象。 118 @param socket = 需要移除的Channel对象 119 @return (true) 移除成功, (false) 移除失败,并把错误输出到控制台. 120 */ 121 bool delEvent(AsyncEvent* event) nothrow { 122 int err = 0; 123 if (event.type() == AsynType.TIMER) { 124 kevent_t ev; 125 EV_SET(&ev, event.fd, EVFILT_TIMER, EV_DELETE, 0, 0, event); 126 err = kevent(_efd, &ev, 1, null, 0, null); 127 } else if (event.enRead && event.enWrite) { 128 kevent_t[2] ev = void; 129 EV_SET(&(ev[0]), event.fd, EVFILT_READ, EV_DELETE, 0, 0, event); 130 EV_SET(&(ev[1]), event.fd, EVFILT_WRITE, EV_DELETE, 0, 0, event); 131 err = kevent(_efd, &(ev[0]), 2, null, 0, null); 132 } else if (event.enRead) { 133 kevent_t ev; 134 short read = EV_ADD | EV_ENABLE; 135 EV_SET(&ev, event.fd, EVFILT_READ, EV_DELETE, 0, 0, event); 136 err = kevent(_efd, &ev, 1, null, 0, null); 137 } else if (event.enWrite) { 138 kevent_t ev; 139 short write = EV_ADD | EV_ENABLE; 140 if (event.etMode) 141 write |= EV_CLEAR; 142 EV_SET(&ev, event.fd, EVFILT_WRITE, EV_DELETE, 0, 0, event); 143 err = kevent(_efd, &ev, 1, null, 0, null); 144 } else { 145 return false; 146 } 147 148 event.isActive = false; 149 return true; 150 } 151 152 /** 调用epoll_wait。 153 * @param timeout = epoll_wait的等待时间 154 * @param eptr = epoll返回时间的存储的数组指针 155 * @param size = 数组的大小 156 * @return 返回当前获取的事件的数量。 157 */ 158 159 void wait(int timeout) nothrow { 160 auto tm = timeout % 1000; 161 auto tspec = timespec(timeout / 1000, tm * 1000 * 1000); 162 kevent_t[64] events; 163 auto len = kevent(_efd, null, 0, events.ptr, 64, &tspec); 164 if (len < 1) return; 165 foreach(i; 0 .. len){ 166 auto ev = cast(AsyncEvent*) events[i].udata; 167 168 if ((events[i].flags & EV_EOF) || (events[i].flags & EV_ERROR)) { 169 ev.obj.onClose(); 170 continue; 171 } 172 173 if (ev.type() == AsynType.TIMER) { 174 ev.obj.onRead(); 175 continue; 176 } 177 178 if (events[i].filter & EVFILT_WRITE) ev.obj.onWrite(); 179 180 if (events[i].filter & EVFILT_READ) ev.obj.onRead(); 181 } 182 } 183 184 void weakUp() nothrow { 185 _event.doWrite(); 186 } 187 188 private: 189 /** 存储 epoll的fd */ 190 int _efd; 191 EventChannel _event; 192 } 193 194 static this() { 195 import core.sys.posix.signal; 196 197 signal(SIGPIPE, SIG_IGN); 198 } 199 200 private final class EventChannel : EventCallInterface { 201 this() { 202 _pair = createPair(); 203 _pair[0].blocking = false; 204 _pair[1].blocking = false; 205 _event = AsyncEvent(AsynType.EVENT, this, _pair[1].handle(), true, false, 206 false); 207 } 208 209 ~this() { 210 yDel(_pair[0]); 211 yDel(_pair[1]); 212 } 213 214 void doWrite() nothrow { 215 yuCathException(_pair[0].send("wekup")); 216 } 217 218 static Socket[2] createPair() { 219 int[2] socks; 220 if (socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == -1) 221 throw new SocketOSException("Unable to create socket pair"); 222 223 Socket toSocket(size_t id) { 224 auto fd = cast(socket_t) socks[id]; 225 auto s = yNew!Socket(fd, AddressFamily.UNIX); 226 return s; 227 } 228 229 return [toSocket(0), toSocket(1)]; 230 } 231 232 override void onRead() nothrow { 233 ubyte[128] data; 234 while (true) { 235 try { 236 if (_pair[1].receive(data) <= 0) 237 return; 238 } 239 catch (Exception e) { 240 showException(e); 241 } 242 } 243 } 244 245 override void onWrite() nothrow { 246 } 247 248 override void onClose() nothrow { 249 } 250 251 @property AsyncEvent* event() { 252 return &_event; 253 } 254 255 Socket[2] _pair; 256 AsyncEvent _event; 257 } 258 259 auto getTimerfd() { 260 import core.atomic; 261 262 static shared int i = int.max; 263 atomicOp!"-="(i, 1); 264 if (i < 655350) 265 i = int.max; 266 return cast(socket_t) i; 267 } 268 269 extern (C): 270 @nogc: 271 nothrow: 272 273 enum : short { 274 EVFILT_READ = -1, 275 EVFILT_WRITE = -2, 276 EVFILT_AIO = -3, /* attached to aio requests */ 277 EVFILT_VNODE = -4, /* attached to vnodes */ 278 EVFILT_PROC = -5, /* attached to struct proc */ 279 EVFILT_SIGNAL = -6, /* attached to struct proc */ 280 EVFILT_TIMER = -7, /* timers */ 281 EVFILT_MACHPORT = -8, /* Mach portsets */ 282 EVFILT_FS = -9, /* filesystem events */ 283 EVFILT_USER = -10, /* User events */ 284 EVFILT_VM = -12, /* virtual memory events */ 285 EVFILT_SYSCOUNT = 11 286 } 287 288 extern (D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) { 289 *kevp = kevent_t(args); 290 } 291 292 struct kevent_t { 293 uintptr_t ident; /* identifier for this event */ 294 short filter; /* filter for event */ 295 ushort flags; 296 uint fflags; 297 intptr_t data; 298 void* udata; /* opaque user data identifier */ 299 } 300 301 enum { 302 /* actions */ 303 EV_ADD = 0x0001, /* add event to kq (implies enable) */ 304 EV_DELETE = 0x0002, /* delete event from kq */ 305 EV_ENABLE = 0x0004, /* enable event */ 306 EV_DISABLE = 0x0008, /* disable event (not reported) */ 307 308 /* flags */ 309 EV_ONESHOT = 0x0010, /* only report one occurrence */ 310 EV_CLEAR = 0x0020, /* clear event state after reporting */ 311 EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ 312 EV_DISPATCH = 0x0080, /* disable event after reporting */ 313 314 EV_SYSFLAGS = 0xF000, /* reserved by system */ 315 EV_FLAG1 = 0x2000, /* filter-specific flag */ 316 317 /* returned values */ 318 EV_EOF = 0x8000, /* EOF detected */ 319 EV_ERROR = 0x4000, /* error, data contains errno */ 320 321 322 323 } 324 325 enum { 326 /* 327 * data/hint flags/masks for EVFILT_USER, shared with userspace 328 * 329 * On input, the top two bits of fflags specifies how the lower twenty four 330 * bits should be applied to the stored value of fflags. 331 * 332 * On output, the top two bits will always be set to NOTE_FFNOP and the 333 * remaining twenty four bits will contain the stored fflags value. 334 */ 335 NOTE_FFNOP = 0x00000000, /* ignore input fflags */ 336 NOTE_FFAND = 0x40000000, /* AND fflags */ 337 NOTE_FFOR = 0x80000000, /* OR fflags */ 338 NOTE_FFCOPY = 0xc0000000, /* copy fflags */ 339 NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */ 340 NOTE_FFLAGSMASK = 0x00ffffff, 341 342 NOTE_TRIGGER = 0x01000000, /* Cause the event to be 343 triggered for output. */ 344 345 /* 346 * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace 347 */ 348 NOTE_LOWAT = 0x0001, /* low water mark */ 349 350 /* 351 * data/hint flags for EVFILT_VNODE, shared with userspace 352 */ 353 NOTE_DELETE = 0x0001, /* vnode was removed */ 354 NOTE_WRITE = 0x0002, /* data contents changed */ 355 NOTE_EXTEND = 0x0004, /* size increased */ 356 NOTE_ATTRIB = 0x0008, /* attributes changed */ 357 NOTE_LINK = 0x0010, /* link count changed */ 358 NOTE_RENAME = 0x0020, /* vnode was renamed */ 359 NOTE_REVOKE = 0x0040, /* vnode access was revoked */ 360 361 /* 362 * data/hint flags for EVFILT_PROC, shared with userspace 363 */ 364 NOTE_EXIT = 0x80000000, /* process exited */ 365 NOTE_FORK = 0x40000000, /* process forked */ 366 NOTE_EXEC = 0x20000000, /* process exec'd */ 367 NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */ 368 NOTE_PDATAMASK = 0x000fffff, /* mask for pid */ 369 370 /* additional flags for EVFILT_PROC */ 371 NOTE_TRACK = 0x00000001, /* follow across forks */ 372 NOTE_TRACKERR = 0x00000002, /* could not track child */ 373 NOTE_CHILD = 0x00000004, /* am a child process */ 374 375 376 377 } 378 379 int kqueue(); 380 int kevent(int kq, const kevent_t* changelist, int nchanges, kevent_t* eventlist, 381 int nevents, const timespec* timeout); 382 }