1 module yu.eventloop; 2 3 import core.thread; 4 import core.memory; 5 6 import std.exception; 7 import std.datetime; 8 import std.variant; 9 import std.algorithm.mutation; 10 import std.stdio; 11 import std.string; 12 import std.exception; 13 import std.experimental.allocator; 14 import std.experimental.allocator.gc_allocator; 15 16 public import yu.eventloop.common; 17 import yu.memory.allocator; 18 import yu.task; 19 import yu.exception : yuCathException, showException; 20 21 /** 网络I/O处理的事件循环类 22 */ 23 24 @trusted final class EventLoopImpl(T) //用定义别名的方式 25 { 26 this() { 27 _poll.initer(); 28 _run = false; 29 static if (CustomTimer) 30 _timeWheel = yNew!ETimerWheel(CustomTimerWheelSize, yuAlloctor); 31 _evlist = AsyncEvent(AsynType.EVENT, null); 32 } 33 34 ~this() { 35 static if (CustomTimer) 36 if (_timeWheel) 37 yDel(_timeWheel); 38 } 39 40 /** 开始执行事件等待。 41 @param :timeout = 无事件的超时等待时间。单位:毫秒,如果是用CustomTimer, 这个超时时间将无效。 42 @note : 此函数可以多线程同时执行,实现多个线程共用一个事件调度 43 */ 44 void run(int timeout = 100) { 45 _thID = Thread.getThis.id(); 46 _run = true; 47 static if (CustomTimer) 48 _nextTime = (Clock.currStdTime() / 10000) + CustomTimerTimeOut; 49 while (_run) { 50 static if (CustomTimer) 51 timeout = doWheel(); 52 _poll.wait(timeout); 53 if (!_taskList.empty) { 54 doTaskList(); 55 } 56 } 57 _thID = ThreadID.init; 58 _run = false; 59 } 60 61 void weakUp() nothrow { 62 _poll.weakUp(); 63 } 64 65 bool isRuning() nothrow { 66 return _run; 67 } 68 69 void stop() { 70 if (isRuning()) { 71 _run = false; 72 weakUp(); 73 } 74 } 75 76 bool isInLoopThread() { 77 if (!isRuning()) 78 return true; 79 return _thID == Thread.getThis.id; 80 } 81 82 void post(bool MustInQueue = false)(void delegate() cback) 83 in { 84 assert(cback); 85 } 86 body { 87 static if (!MustInQueue) { 88 if (isInLoopThread()) { 89 cback(); 90 return; 91 } 92 } 93 auto task = makeTask(yuAlloctor, cback); 94 task.finishedCall = &finishDoFreeYuTask; 95 synchronized (this) { 96 _taskList.enQueue(task); 97 } 98 weakUp(); 99 } 100 101 void post(bool MustInQueue = true)(AbstractTask task) { 102 static if (!MustInQueue) { 103 if (isInLoopThread()) { 104 task.job(); 105 yDel(task); 106 return; 107 } 108 } 109 synchronized (this) { 110 _taskList.enQueue(task); 111 } 112 weakUp(); 113 } 114 115 bool addEvent(AsyncEvent* event) nothrow { 116 if (event == null) 117 return false; 118 addEventList(event); 119 static if (CustomTimer) { 120 if (event.type() == AsynType.TIMER) { 121 try { 122 CWheelTimer timer = yNew!CWheelTimer(event); 123 _timeWheel.addNewTimer(timer, timer.wheelSize()); 124 event.timer = timer; 125 event.isActive(true); 126 } 127 catch (Exception e) { 128 showException(yuCathException(error("new CWheelTimer error!!! : ", e.toString))); 129 return false; 130 } 131 return true; 132 } 133 } 134 return _poll.addEvent(event); 135 } 136 137 bool modEvent(AsyncEvent* event) nothrow { 138 if (event == null) 139 return false; 140 addEventList(event); 141 static if (CustomTimer) { 142 if (event.type() == AsynType.TIMER) 143 return false; 144 } 145 return _poll.modEvent(event); 146 } 147 148 bool delEvent(AsyncEvent* event) nothrow { 149 if (event == null) 150 return false; 151 event.rmNextPrev(); 152 static if (CustomTimer) { 153 if (event.type() == AsynType.TIMER) { 154 return rmCustomTimer(event); 155 } 156 } 157 return _poll.delEvent(event); 158 } 159 160 static if (CustomTimer) { 161 @property ETimerWheel timerWheel() { 162 return _timeWheel; 163 } 164 165 private bool rmCustomTimer(AsyncEvent* event) nothrow { 166 event.isActive(false); 167 event.timer.stop(); 168 yuCathException(yDel(event.timer)); 169 event.timer = null; 170 return true; 171 } 172 } 173 174 void finishDoFreeYuTask(AbstractTask task) nothrow @trusted 175 { 176 import yu.exception; 177 yuCathException(yDel(task)); 178 } 179 180 protected: 181 void doTaskList() { 182 import std.algorithm : swap; 183 184 TaskQueue tmp; 185 synchronized (this) { 186 swap(tmp, _taskList); 187 } 188 while (!tmp.empty) { 189 auto fp = tmp.deQueue(); 190 fp.job(); 191 } 192 } 193 194 pragma(inline) void addEventList(AsyncEvent* event) nothrow { 195 event.rmNextPrev(); 196 if (_evlist.next) { 197 _evlist.next.prev = event; 198 event.next = _evlist.next; 199 } 200 _evlist.next = event; 201 event.prev = &_evlist; 202 } 203 204 private: 205 T _poll; 206 TaskQueue _taskList; 207 bool _run; 208 ThreadID _thID; 209 AsyncEvent _evlist; 210 static if (CustomTimer) { 211 ETimerWheel _timeWheel; 212 long _nextTime; 213 214 int doWheel() { 215 auto nowTime = (Clock.currStdTime() / 10000); 216 while (nowTime >= _nextTime) { 217 _timeWheel.prevWheel(); 218 _nextTime += CustomTimerTimeOut; 219 nowTime = (Clock.currStdTime() / 10000); 220 } 221 nowTime = _nextTime - nowTime; 222 return cast(int) nowTime; 223 } 224 } 225 } 226 227 static if (IOMode == IO_MODE.kqueue) { 228 import yu.eventloop.selector.kqueue; 229 230 alias EventLoop = EventLoopImpl!(KqueueLoop); 231 } else static if (IOMode == IO_MODE.epoll) { 232 import yu.eventloop.selector.epoll; 233 234 alias EventLoop = EventLoopImpl!(EpollLoop); 235 } else static if (IOMode == IO_MODE.iocp) { 236 public import yu.eventloop.selector.iocp; 237 238 alias EventLoop = EventLoopImpl!(IOCPLoop); 239 } else { 240 static assert(0, "not suport this platform !"); 241 } 242 243 static if (CustomTimer) { 244 pragma(msg, "use CustomTimer!!!!"); 245 private: 246 @trusted final class CWheelTimer : EWheelTimer { 247 this(AsyncEvent* event) { 248 _event = event; 249 auto size = event.timeOut / CustomTimerTimeOut; 250 auto superfluous = event.timeOut % CustomTimerTimeOut; 251 size += superfluous > CustomTimer_Next_TimeOut ? 1 : 0; 252 size = size > 0 ? size : 1; 253 _wheelSize = cast(uint) size; 254 _circle = _wheelSize / CustomTimerWheelSize; 255 trace("_wheelSize = ", _wheelSize, " event.timeOut = ", event.timeOut); 256 } 257 258 override void onTimeOut() nothrow { 259 _now++; 260 if (_now >= _circle) { 261 _now = 0; 262 rest(_wheelSize); 263 _event.obj().onRead(); 264 } 265 } 266 267 pragma(inline, true) @property wheelSize() { 268 return _wheelSize; 269 } 270 271 private: 272 uint _wheelSize; 273 uint _circle; 274 uint _now = 0; 275 AsyncEvent* _event; 276 } 277 }