-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEventLoop.cpp
More file actions
145 lines (123 loc) · 3.75 KB
/
Copy pathEventLoop.cpp
File metadata and controls
145 lines (123 loc) · 3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#include "EventLoop.h"
#include "CurrentThread.h"
#include "EPollPoller.h"
#include "Logger.h"
#include "Channel.h"
#include <sys/eventfd.h>
// 线程局部存储,当不同线程访问该变量时会对应不同的实例
__thread EventLoop* t_loopInThisThread = nullptr; // 每个线程所对应的EventLoop对象
const int kPollTimeoutMs = 10000;
int createEventfd() {
int evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evfd < 0) {
LOG_ERROR("");
}
return evfd;
}
EventLoop::EventLoop()
: looping_(false),
quit_(false),
callingPendingFunctors_(false),
threadId_(CurrentThread::tid()),
poller_(new EpollPoller(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_)) {
// 检查所属线程是否已经存在EventLoop对象
if (t_loopInThisThread) {
// 退出程序
} else {
t_loopInThisThread = this;
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
wakeupChannel_->enableReading();
}
}
EventLoop::~EventLoop() {
looping_ = false;
t_loopInThisThread = nullptr;
wakeupChannel_->disableAll();
close(wakeupFd_);
}
void EventLoop::loop() {
assertInLoopThread();
looping_ = true;
quit_ = false;
while (!quit_) {
activeChannels_.clear();
Timestamp now = poller_->poll(kPollTimeoutMs, &activeChannels_);
// 开始处理事件
for (auto channel: activeChannels_) {
channel->handleEvent(now);
}
doPendingFunctors();
}
}
void EventLoop::updateChannel(Channel* channel) {
poller_->updateChannel(channel);
}
void EventLoop::quit() {
quit_ = true;
// EventLoop其所在的IO线程自始至终在执行loop,如果此时在调用quit
// 则说明不在阻塞,则直接设置quit即可,下次循环检查就会检查到quit并退出
if (!isInLoopThread()) {
wakeup();
}
}
// 可能跨线程调用,
void EventLoop::runInLoop(const Functor& cb) {
if (isInLoopThread()) {
cb();
} else {
queueInLoop(cb);
}
}
// 可能跨线程调用
void EventLoop::queueInLoop(const Functor& cb) {
{
std::lock_guard<std::mutex> lockGuard(mutex_);
pendingFunctors_.emplace_back(cb);
}
// 每个EventLoop都在执行loop函数,通常情况下阻塞在poll方法,不然就在执行poll后的操作
// 调用queueInLoop可能有三种情况:
// (1)其他线程调用queueInLoop (2)handleEvent执行用户回调时调用queueInLoop (3)doPendingFunctors执行functors调用queueInLoop
// (1)(3)情况下需要唤醒,来保证及时执行新添加的functors (2)不需要唤醒因为接下来马上会执行doPendingFunctors
if (!isInLoopThread() || callingPendingFunctors_) {
wakeup();
}
}
void EventLoop::doPendingFunctors() {
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::lock_guard<std::mutex> lockGuard(mutex_);
functors.swap(pendingFunctors_);
}
for (auto& functor: functors) {
functor();
}
callingPendingFunctors_ = false;
}
void EventLoop::wakeup() {
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one)) {
LOG_ERROR("EventLoop wakeup");
}
}
void EventLoop::handleRead() {
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one)) {
LOG_ERROR("EventLoop::handleRead");
}
}
bool EventLoop::isInLoopThread() {
return CurrentThread::tid() == threadId_;
}
void EventLoop::assertInLoopThread() {
if (!isInLoopThread()) {
abortNotInLoopThread();
}
}
void EventLoop::abortNotInLoopThread() {
LOG_FATAL("EventLoop::abortNotInLoopThread");
}