6

Android 消息机制(三)Native层消息机制

 3 years ago
source link: https://www.viseator.com/2017/11/02/android_event_3/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

之前的两篇文章讲解了java层消息机制的过程以及使用,中间省略了比较多的代码就是在native层实现的消息机制中使用的。这篇文章就对这一部分进行讲解。

我们从第一篇文章知道,Looper.loop()方法被调用后,会启动一个无限循环,而在这个循环中,调用了MessageQueuenext()方法以获取下一条消息,而next()方法中会首先调用nativePollOnce()方法,这个方法的作用在之前说过是阻塞,达到超时时间或有新的消息到达时得到eventFd的通知再唤醒消息队列,其实这个方法也是native消息处理的开始。

进入Native层

android_os_MessageQueue_nativePollOnce()

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
...
}

调用了Native LooperpollOnce()方法:

pollOnce()

inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;

if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}

if (result != 0) {
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}

result = pollInner(timeoutMillis);
}
}

我们之前直接跳过了26行之前的内容,现在我们还是不能理解这一段的意义,从程序上看,我们从mResponse容器中取出了一个response并把他的内容放入了传入的地址参数中返回。首先,这个调用中没有传入地址参数,其次,这个mResponse数组是什么呢?

我们继续先往下看,下面的pollInner()方法比较长也是native消息机制的核心,我们拆成几个部分看。

pollInner()

Request 与 Response

   int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
mPolling = true;

struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
mPolling = false;
mLock.lock();
...
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}

当第7行系统调用epoll_wait()返回时,说明因注册的fd有消息或达到超时,在第11行就对收到的唤醒events进行遍历,首先判断有消息的fd是不是用于唤醒的mWakeEventFd,如果不是的话,说明是系统调用addFd()方法设置的自定义fd(后面会讲)。那么我们需要对这个事件作出响应。

第21到28行就对这个event做处理,首先,我们以这个fdkeymRequests中找到他的索引,这个mRequests是我们在addFd()方法一并注册的以fdkeyRequestvalue的映射表。找到request之后,28行调用pushResponse()方法去建立response

void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}

现在我们要处理的任务已经被封装成了一个Response对象,等待被处理,那么真正的处理在哪里呢?

在上面的代码与处理response的代码中间夹着的是处理MessageEnvelope的代码,我们后面再讲这段,现在到处理response的代码:

for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
response.request.callback.clear();
result = POLL_CALLBACK;
}
}

遍历所有response对象,取出之前注册的request对象的信息,然后调用了request.callback->handleEvent()方法进行回调,如果该回调返回0,则调用removeFd()方法取消这个fd的注册。

再梳理一遍这个过程:注册的自定义fd被消息唤醒,从mRequests中以fdkey找到对应的注册好的request对象然后生成response对象,在MessageEnvelop处理完毕之后处理response,调用request中的callbackhandleEvent()方法。

那么addFd()注册自定义fdremoveFd()取消注册是如何实现的呢?

addFd()

int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
...
{ // acquire lock
AutoMutex _l(mLock);

Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1

struct epoll_event eventItem;
request.initEventItem(&eventItem);

ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
if (epollResult < 0) {
if (errno == ENOENT) {
epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error modifying or adding epoll events for fd %d: %s",
fd, strerror(errno));
return -1;
}
scheduleEpollRebuildLocked();
} else {
ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}

第6-13行使用传入的参数初始化了request对象,然后16行由request来初始化注册epoll使用的event。19行根据mRequests.indexOfKey()方法取出的值来判断fd是否已经注册,如果未注册,则在20行进行系统调用epoll_ctl()注册新监听并在25行将fdrequest存入mRequest,如果已注册,则在27行更新注册并在42行更新request

这就是自定义fd设置的过程:保存request并使用epoll_ctl系统调用注册fd的监听。

removeFd()

int Looper::removeFd(int fd, int seq) {
{ // acquire lock
AutoMutex _l(mLock);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
return 0;
}
if (seq != -1 && mRequests.valueAt(requestIndex).seq != seq) {
return 0;
}
mRequests.removeItemsAt(requestIndex);

int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
if (epollResult < 0) {
if (seq != -1 && (errno == EBADF || errno == ENOENT)) {
scheduleEpollRebuildLocked();
} else {
ALOGE("Error removing epoll events for fd %d: %s", fd, strerror(errno));
scheduleEpollRebuildLocked();
return -1;
}
}
} // release lock
return 1;
}

解除的过程相反,在第11行删除mRequests中的键值对,然后在第13行系统调用epoll_ctl()解除fdepoll注册。

MessageEnvelop消息处理

之前说到,在request生成responseresponse的处理中间有一段代码执行了MessageEnvelop消息的处理,这个顺序保证了MessageEnvelop优先于fd引起的request的处理。

现在我们来看这段代码:

mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
handler->handleMessage(message);
} // release handler

mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}

可以看到mMessageEnvelopes容器中存储了所有的消息,第4行从首位置取出一条消息,随后进行时间判断,如果时间到达,先移出容器,与java层比较相似都是调用了handlerhandleMessage()来进行消息的处理。

那么MessageEnvelope是如何添加的呢?

Native Looper提供了一套与javaMessageQueue类似的方法,用于添加MessageEnvelope

void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);

size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}

MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

if (mSendingMessage) {
return;
}
} // release lock
if (i == 0) {
wake();
}
}

这段添加MessageEnvelope的代码不需要太多的解释。

现在我们看到,其实Native中的消息机制有两个方面,一方面是通过addFd()注册的自定义fd触发消息处理,通过mRequests保存的request对象中的callback进行消息处理。另一方面是通过与java层类似的MessageEnvelop消息对象进行处理,调用的是该对象handler域的handleMessage()方法,与java层非常类似。优先级是先处理MessageEnvelop再处理request

现在消息机制全部内容分析下来,我们可以看到android的消息机制不算复杂,分为nativejava两个部分,这两个部分分别有自己的消息处理机制,其中关键的超时与唤醒部分是借助了linux系统epoll机制来实现的。

连接javanative层消息处理过程的是next()方法中的nativePollOnce()java层消息循环先调用它,自身阻塞,进入native的消息处理,在native消息处理完毕后返回,再进行java层的消息处理,正是因为如此,如果我们在处理java层消息的时候执行了耗时或阻塞的任务(甚至阻塞了整个主线程),整个java层的消息循环就会阻塞,也无法进入native层的消息处理,也就无法响应例如触摸事件这样的消息,导致ANR的发生。这也就是我们不应在主线程中执行这类任务的原因。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK