1 module yu.eventloop;
2 
3 import core.thread;
4 import core.memory;
5 
6 import std.exception;
7 import std.datetime;
8 import std.variant;
9 import std.algorithm.mutation;
10 import std.stdio;
11 import std.string;
12 import std.exception;
13 import std.experimental.allocator;
14 import std.experimental.allocator.gc_allocator;
15 
16 public import yu.eventloop.common;
17 import yu.memory.allocator;
18 import yu.task;
19 import yu.exception : yuCathException, showException;
20 
21 /** 网络I/O处理的事件循环类
22  */
23 
24 @trusted final class EventLoopImpl(T) //用定义别名的方式
25 {
26     this() {
27         _poll.initer();
28         _run = false;
29         static if (CustomTimer)
30             _timeWheel = yNew!ETimerWheel(CustomTimerWheelSize, yuAlloctor);
31         _evlist = AsyncEvent(AsynType.EVENT, null);
32     }
33 
34     ~this() {
35         static if (CustomTimer)
36             if (_timeWheel)
37                 yDel(_timeWheel);
38     }
39 
40     /** 开始执行事件等待。
41 	 @param :timeout = 无事件的超时等待时间。单位:毫秒,如果是用CustomTimer, 这个超时时间将无效。
42 	 @note : 此函数可以多线程同时执行,实现多个线程共用一个事件调度
43 	 */
44     void run(int timeout = 100) {
45         _thID = Thread.getThis.id();
46         _run = true;
47         static if (CustomTimer)
48             _nextTime = (Clock.currStdTime() / 10000) + CustomTimerTimeOut;
49         while (_run) {
50             static if (CustomTimer)
51                 timeout = doWheel();
52             _poll.wait(timeout);
53             if (!_taskList.empty) {
54                 doTaskList();
55             }
56         }
57         _thID = ThreadID.init;
58         _run = false;
59     }
60 
61     void weakUp() nothrow {
62         _poll.weakUp();
63     }
64 
65     bool isRuning() nothrow {
66         return _run;
67     }
68 
69     void stop() {
70         if (isRuning()) {
71             _run = false;
72             weakUp();
73         }
74     }
75 
76     bool isInLoopThread() {
77         if (!isRuning())
78             return true;
79         return _thID == Thread.getThis.id;
80     }
81 
82     void post(bool MustInQueue = false)(void delegate() cback)
83     in {
84         assert(cback);
85     }
86     body {
87         static if (!MustInQueue) {
88             if (isInLoopThread()) {
89                 cback();
90                 return;
91             }
92         }
93         auto task = makeTask(yuAlloctor, cback);
94         task.finishedCall = &finishDoFreeYuTask;
95         synchronized (this) {
96             _taskList.enQueue(task);
97         }
98         weakUp();
99     }
100 
101     void post(bool MustInQueue = true)(AbstractTask task) {
102         static if (!MustInQueue) {
103             if (isInLoopThread()) {
104                 task.job();
105                 yDel(task);
106                 return;
107             }
108         }
109         synchronized (this) {
110             _taskList.enQueue(task);
111         }
112         weakUp();
113     }
114 
115     bool addEvent(AsyncEvent* event) nothrow {
116         if (event == null)
117             return false;
118         addEventList(event);
119         static if (CustomTimer) {
120             if (event.type() == AsynType.TIMER) {
121                 try {
122                     CWheelTimer timer = yNew!CWheelTimer(event);
123                     _timeWheel.addNewTimer(timer, timer.wheelSize());
124                     event.timer = timer;
125                     event.isActive(true);
126                 }
127                 catch (Exception e) {
128                     showException(yuCathException(error("new CWheelTimer error!!! : ", e.toString)));
129                     return false;
130                 }
131                 return true;
132             }
133         }
134         return _poll.addEvent(event);
135     }
136 
137     bool modEvent(AsyncEvent* event) nothrow {
138         if (event == null)
139             return false;
140         addEventList(event);
141         static if (CustomTimer) {
142             if (event.type() == AsynType.TIMER)
143                 return false;
144         }
145         return _poll.modEvent(event);
146     }
147 
148     bool delEvent(AsyncEvent* event) nothrow {
149         if (event == null)
150             return false;
151         event.rmNextPrev();
152         static if (CustomTimer) {
153             if (event.type() == AsynType.TIMER) {
154                 return rmCustomTimer(event);
155             }
156         }
157         return _poll.delEvent(event);
158     }
159 
160     static if (CustomTimer) {
161         @property ETimerWheel timerWheel() {
162             return _timeWheel;
163         }
164 
165         private bool rmCustomTimer(AsyncEvent* event) nothrow {
166             event.isActive(false);
167             event.timer.stop();
168 			yuCathException(yDel(event.timer));
169             event.timer = null;
170             return true;
171         }
172     }
173 
174     void finishDoFreeYuTask(AbstractTask task) nothrow @trusted
175     {
176         import yu.exception;
177         yuCathException(yDel(task));
178     }
179 
180 protected:
181     void doTaskList() {
182         import std.algorithm : swap;
183 
184         TaskQueue tmp;
185         synchronized (this) {
186             swap(tmp, _taskList);
187         }
188         while (!tmp.empty) {
189             auto fp = tmp.deQueue();
190             fp.job();
191         }
192     }
193 
194     pragma(inline) void addEventList(AsyncEvent* event) nothrow {
195         event.rmNextPrev();
196         if (_evlist.next) {
197             _evlist.next.prev = event;
198             event.next = _evlist.next;
199         }
200         _evlist.next = event;
201         event.prev = &_evlist;
202     }
203 
204 private:
205     T _poll;
206     TaskQueue _taskList;
207     bool _run;
208     ThreadID _thID;
209     AsyncEvent _evlist;
210     static if (CustomTimer) {
211         ETimerWheel _timeWheel;
212         long _nextTime;
213 
214         int doWheel() {
215             auto nowTime = (Clock.currStdTime() / 10000);
216             while (nowTime >= _nextTime) {
217                 _timeWheel.prevWheel();
218                 _nextTime += CustomTimerTimeOut;
219                 nowTime = (Clock.currStdTime() / 10000);
220             }
221             nowTime = _nextTime - nowTime;
222             return cast(int) nowTime;
223         }
224     }
225 }
226 
227 static if (IOMode == IO_MODE.kqueue) {
228     import yu.eventloop.selector.kqueue;
229 
230     alias EventLoop = EventLoopImpl!(KqueueLoop);
231 } else static if (IOMode == IO_MODE.epoll) {
232     import yu.eventloop.selector.epoll;
233 
234     alias EventLoop = EventLoopImpl!(EpollLoop);
235 } else static if (IOMode == IO_MODE.iocp) {
236     public import yu.eventloop.selector.iocp;
237 
238     alias EventLoop = EventLoopImpl!(IOCPLoop);
239 } else {
240     static assert(0, "not suport this  platform !");
241 }
242 
243 static if (CustomTimer) {
244     pragma(msg, "use CustomTimer!!!!");
245 private:
246     @trusted final class CWheelTimer : EWheelTimer {
247         this(AsyncEvent* event) {
248             _event = event;
249             auto size = event.timeOut / CustomTimerTimeOut;
250             auto superfluous = event.timeOut % CustomTimerTimeOut;
251             size += superfluous > CustomTimer_Next_TimeOut ? 1 : 0;
252             size = size > 0 ? size : 1;
253             _wheelSize = cast(uint) size;
254             _circle = _wheelSize / CustomTimerWheelSize;
255             trace("_wheelSize = ", _wheelSize, " event.timeOut = ", event.timeOut);
256         }
257 
258         override void onTimeOut() nothrow {
259             _now++;
260             if (_now >= _circle) {
261                 _now = 0;
262                 rest(_wheelSize);
263                 _event.obj().onRead();
264             }
265         }
266 
267         pragma(inline, true) @property wheelSize() {
268             return _wheelSize;
269         }
270 
271     private:
272         uint _wheelSize;
273         uint _circle;
274         uint _now = 0;
275         AsyncEvent* _event;
276     }
277 }