1 module yu.eventloop.selector.epoll; 2 3 import yu.eventloop.common; 4 import yu.memory.allocator; 5 6 version (linux) : package(yu) : import core.time; 7 import core.stdc.errno; 8 import core.memory; 9 10 import core.sys.posix.sys.types; // for ssize_t, size_t 11 import core.sys.posix.netinet.tcp; 12 import core.sys.posix.netinet.in_; 13 import core.sys.posix.time : itimerspec, CLOCK_MONOTONIC; 14 import core.sys.posix.unistd; 15 16 import std.algorithm; 17 import std.array; 18 import std.conv; 19 import std.exception; 20 import std.format; 21 import std.socket; 22 import std.experimental.logger; 23 24 import yu.exception : yuCathException; 25 26 /** 系统I/O事件处理类,epoll操作的封装 27 */ 28 struct EpollLoop { 29 void initer() { 30 if (_event) 31 return; 32 _efd = epoll_create1(0); 33 errnoEnforce((_efd >= 0), "epoll_create1 failed"); 34 _event = yNew!EventChannel(); 35 addEvent(_event.event); 36 } 37 38 /** 析构函数,释放epoll。 39 */ 40 ~this() { 41 if (_event) { 42 .close(_efd); 43 yDel(_event); 44 } 45 } 46 47 /** 添加一个Channel对象到事件队列中。 48 @param socket = 添加到时间队列中的Channel对象,根据其type自动选择需要注册的事件。 49 @return true 添加成功, false 添加失败,并把错误记录到日志中. 50 */ 51 bool addEvent(AsyncEvent * event) nothrow { 52 if (event.fd == socket_t.init) { 53 event.isActive = false; 54 return false; 55 } 56 57 mixin(mixinModEvent()); 58 if ((epoll_ctl(_efd, EPOLL_CTL_ADD, event.fd, & ev)) != 0) { 59 if (errno != EEXIST) 60 return false; 61 } 62 event.isActive = true; 63 return true; 64 } 65 66 bool modEvent(AsyncEvent * event) nothrow { 67 if (event.fd == socket_t.init) { 68 event.isActive = false; 69 return false; 70 } 71 mixin(mixinModEvent()); 72 73 if ((epoll_ctl(_efd, EPOLL_CTL_MOD, event.fd, & ev)) != 0) { 74 return false; 75 } 76 event.isActive = true; 77 return true; 78 } 79 80 /** 从epoll队列中移除Channel对象。 81 @param socket = 需要移除的Channel对象 82 @return (true) 移除成功, (false) 移除失败,并把错误输出到控制台. 83 */ 84 bool delEvent(AsyncEvent * event) nothrow { 85 if (event.fd == socket_t.init) { 86 event.isActive = false; 87 return false; 88 } 89 epoll_event ev; 90 if ((epoll_ctl(_efd, EPOLL_CTL_DEL, event.fd, & ev)) != 0) { 91 yuCathException(error("EPOLL_CTL_DEL erro! ", event.fd)); 92 return false; 93 } 94 event.isActive = false; 95 return true; 96 } 97 98 /** 调用epoll_wait。 99 * @param timeout = epoll_wait的等待时间 100 * @param eptr = epoll返回时间的存储的数组指针 101 * @param size = 数组的大小 102 * @return 返回当前获取的事件的数量。 103 */ 104 105 void wait(int timeout) nothrow { 106 epoll_event[64] events; 107 auto len = epoll_wait(_efd, events.ptr, 64, timeout); 108 if(len < 1) return; 109 foreach(i;0..len){ 110 AsyncEvent * ev = cast(AsyncEvent * )(events[i].data.ptr); 111 112 if (isErro(events[i].events)) { 113 ev.obj.onClose(); 114 continue; 115 } 116 if (isWrite(events[i].events)) ev.obj.onWrite(); 117 118 if (isRead(events[i].events)) ev.obj.onRead(); 119 } 120 } 121 122 void weakUp() nothrow { 123 _event.doWrite(); 124 } 125 126 protected : pragma(inline, true) bool isErro(uint events) nothrow { 127 return (events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) != 0; 128 } 129 pragma(inline, true) bool isRead(uint events) nothrow { 130 return (events & EPOLLIN) != 0; 131 } 132 pragma(inline, true) bool isWrite(uint events) nothrow { 133 return (events & EPOLLOUT) != 0; 134 } 135 136 private : /** 存储 epoll的fd */ 137 int _efd; 138 EventChannel _event; 139 } 140 141 static this() { 142 import core.sys.posix.signal; 143 144 signal(SIGPIPE, SIG_IGN); 145 } 146 147 enum EPOLL_EVENT : short { 148 init = - 5 149 } 150 151 final class EventChannel : EventCallInterface { 152 this() { 153 _fd = cast(socket_t) eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 154 _event = AsyncEvent(AsynType.EVENT, this, _fd, true, false, false); 155 } 156 ~this() { 157 .close(_fd); 158 } 159 160 void doWrite() nothrow { 161 ulong ul = 1; 162 core.sys.posix.unistd.write(_fd, & ul, ul.sizeof); 163 } 164 override void onRead() nothrow { 165 ulong ul = 1; 166 size_t len = read(_fd, & ul, ul.sizeof); 167 } 168 169 override void onWrite() nothrow { 170 } 171 172 override void onClose() nothrow { 173 } 174 175 @property AsyncEvent * event() { 176 return & _event; 177 } 178 179 socket_t _fd; 180 AsyncEvent _event; 181 } 182 183 string mixinModEvent() { 184 return q{ 185 epoll_event ev; 186 ev.data.ptr = event; 187 ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP; 188 if (event.enRead) 189 ev.events |= EPOLLIN; 190 if (event.enWrite) 191 ev.events |= EPOLLOUT; 192 if (event.oneShot) 193 ev.events |= EPOLLONESHOT; 194 if (event.etMode) 195 ev.events |= EPOLLET; 196 }; 197 } 198 199 extern (C) : @system : nothrow : enum { 200 EFD_SEMAPHORE = 0x1, 201 EFD_CLOEXEC = 0x80000, 202 EFD_NONBLOCK = 0x800 203 }; 204 205 enum { 206 EPOLL_CLOEXEC = 0x80000, 207 EPOLL_NONBLOCK = 0x800 208 } 209 210 enum { 211 EPOLLIN = 0x001, 212 EPOLLPRI = 0x002, 213 EPOLLOUT = 0x004, 214 EPOLLRDNORM = 0x040, 215 EPOLLRDBAND = 0x080, 216 EPOLLWRNORM = 0x100, 217 EPOLLWRBAND = 0x200, 218 EPOLLMSG = 0x400, 219 EPOLLERR = 0x008, 220 EPOLLHUP = 0x010, 221 EPOLLRDHUP = 0x2000, // since Linux 2.6.17 222 EPOLLONESHOT = 1u << 30, 223 EPOLLET = 1u << 31 224 } 225 226 /* Valid opcodes ( "op" parameter ) to issue to epoll_ctl(). */ 227 enum { 228 EPOLL_CTL_ADD = 1, // Add a file descriptor to the interface. 229 EPOLL_CTL_DEL = 2, // Remove a file descriptor from the interface. 230 EPOLL_CTL_MOD = 3, // Change file descriptor epoll_event structure. 231 } 232 233 align(1) struct epoll_event { 234 align(1) : uint events; 235 epoll_data_t data; 236 } 237 238 union epoll_data_t { 239 void * ptr; 240 int fd; 241 uint u32; 242 ulong u64; 243 } 244 245 int epoll_create(int size); 246 int epoll_create1(int flags); 247 int epoll_ctl(int epfd, int op, int fd, epoll_event * event); 248 int epoll_wait(int epfd, epoll_event * events, int maxevents, int timeout); 249 250 int eventfd(uint initval, int flags); 251 252 //timerfd 253 254 int timerfd_create(int clockid, int flags); 255 int timerfd_settime(int fd, int flags, const itimerspec * new_value, itimerspec * old_value); 256 int timerfd_gettime(int fd, itimerspec * curr_value); 257 258 enum TFD_TIMER_ABSTIME = 1 << 0; 259 enum TFD_CLOEXEC = 0x80000; 260 enum TFD_NONBLOCK = 0x800;