【Android 异步操作】Handler 机制 ( MessageQueue 消息队列的阻塞机制 | Java 层机制 | native 层阻塞机制 | native 层解除阻塞机制 )(一)

简介: 【Android 异步操作】Handler 机制 ( MessageQueue 消息队列的阻塞机制 | Java 层机制 | native 层阻塞机制 | native 层解除阻塞机制 )(一)

一、MessageQueue 的 Java 层机制


之前在 【Android 异步操作】手写 Handler ( 消息队列 MessageQueue | 消息保存到链表 | 从链表中获取消息 ) 中 , 模仿 Android 的 MessageQueue 手写的 MessageQueue , 使用了如下同步机制 ,


从 消息队列 MessageQueue 中取出 消息 Message ,


如果当前链表为空 , 此时会 调用 wait 方法阻塞 , 直到消息入队时 , 链表中有了元素 , 会调用 notify 解除该阻塞 ;



在实际的 Android 中的 消息队列 MessageQueue 的同步机制 是在 native 层实现 的 ;


在创建 消息队列 MessageQueue 时 , 调用了 nativeInit() 方法 , 销毁 MessageQueue 时调用 nativeDestroy 方法 ;


如果调用 next 获取下一个消息时 , 如果当前消息队列 MessageQueue 中没有消息 , 此时需要阻塞 , 调用 nativePollOnce 即可实现在 native 阻塞线程 ;


 

// 初始化 MessageQueue 时调用的方法 
    private native static long nativeInit();
    // 销毁 MessageQueue 时调用的方法 
    private native static void nativeDestroy(long ptr);
    // 线程阻塞方法
    @UnsupportedAppUsage
    private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
    // 线程唤醒方法 
    private native static void nativeWake(long ptr);
    private native static boolean nativeIsPolling(long ptr);
    private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
  // 此处初始化 MessageQueue , 调用了 nativeInit 方法 
    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();
    }
  // 获取消息队列中的下一个消息 
    @UnsupportedAppUsage
    Message next() {
        // Return here if the message loop has already quit and been disposed.
        // This can happen if the application tries to restart a looper after quit
        // which is not supported.
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }
        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
    // 此处阻塞线程 
            nativePollOnce(ptr, nextPollTimeoutMillis);
        }
    }






二、MessageQueue 的 native 层阻塞机制


线程阻塞方法 private native void nativePollOnce(long ptr, int timeoutMillis) , 是 native 方法 , 该方法在 frameworks/base/core/jni/android_os_MessageQueue.cpp 中实现 ,


从 Java 层传入 long 类型 , 然后转为 NativeMessageQueue* 类型指针 ,


该 Java 层传入的 long 类型是初始化消息队列时 , 由 nativeInit 方法返回 , 是 消息队列在 Native 层的指针 ,


之后 NativeMessageQueue 指针调用了其本身的 pollOnce 函数 , 该函数中 , 主要调用了 Looper 的 pollOnce 函数 , mLooper->pollOnce(timeoutMillis) ;


frameworks/base/core/jni/android_os_MessageQueue.cpp 中阻塞相关源码 :


void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    // 这是主要操作 
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;
    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}


参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp



继续查看 Native 层 Looper.cpp 的 pollOnce 方法 ,


Looper.cpp 源码路径是 system/core/libutils/Looper.cpp ,


在该方法中 , 最终调用 Looper.cpp 的 pollInner 方法 ,


在 pollInner 方法中 , 调用了 epoll_wait 方法 , 该方法就是等待方法 , 在该方法中会监听 mEpollFd 文件句柄 ,


   

#include <sys/epoll.h>
       int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);
       int epoll_pwait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout,
                      const sigset_t *sigmask);


参考 : epoll_wait



system/core/libutils/Looper.cpp 中阻塞相关源码 :


int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p",
                        this, ident, fd, events, data);
#endif
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }
        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }
        result = pollInner(timeoutMillis);
    }
}
int Looper::pollInner(int timeoutMillis) {
  // ... 
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
}



参考 : system/core/libutils/Looper.cpp






三、MessageQueue 的 native 层解除阻塞机制


在 MessageQueue 消息队列的 Java 层 , 将 Message 消息插入到链表表头后 , 调用了 nativeWake 方法 , 唤醒了线程 , 即解除了阻塞 ;


public final class MessageQueue {
    boolean enqueueMessage(Message msg, long when) {
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }
        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }
        synchronized (this) {
            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }
            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }
            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
}



在 native 层的 frameworks/base/core/jni/android_os_MessageQueue.cpp 实现了上述


Java 层定义的 private native static void nativeWake(long ptr) 方法 ,


注册 JNI 方法方式是动态注册 , 注册的参数如下 , Java 层的 nativeWake 对应的 native 层方法是 android_os_MessageQueue_nativeWake 方法 ,


static const JNINativeMethod gMessageQueueMethods[] = {
    /* name, signature, funcPtr */
    { "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
    { "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
    { "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
    { "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
    { "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
    { "nativeSetFileDescriptorEvents", "(JII)V",
            (void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};


参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp



下面是 frameworks/base/core/jni/android_os_MessageQueue.cpp 中的相关方法实现 ,


在 android_os_MessageQueue_nativeWake 方法中调用了 本身的 wake 方法 ,


在 wake 方法中调用了 system/core/libutils/Looper.cpp 中的 wake 方法 ;


void NativeMessageQueue::wake() {
  // 此处调用了 Looper 的 wake 函数 
    mLooper->wake();
}
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}


参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp



查看 system/core/libutils/Looper.cpp 中的 wake 方法 , 在该方法中 ,


ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))) 代码说明 ,


向 mWakeEventFd 文件句柄写入了数据 ;


void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif
    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
                    mWakeEventFd, strerror(errno));
        }
    }
}


参考 : system/core/libutils/Looper.cpp



阻塞的时候使用的是 mEpollFd 文件句柄 ,


唤醒的时候使用的是 mWakeEventFd 文件句柄 ,


下面分析这两个文件句柄之间的联系 ;



Looper 构造函数 , 调用了 rebuildEpollLocked() 方法 ,


在 rebuildEpollLocked 方法 中调用 mEpollFd = epoll_create(EPOLL_SIZE_HINT) , 创建了一个句柄 ,


调用 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem) 注册事件监听 ,


注册 mEpollFd 句柄 , 监听 mWakeEventFd 句柄的 eventItem 事件 ,


监听的事件是 eventItem.events = EPOLLIN 事件 ,


该事件代表 , 向 mWakeEventFd 文件句柄写入数据 , 此时就对应解除 epoll_wait 阻塞 ;



system/core/libutils/Looper.cpp 中 Looper 构造函数 , rebuildEpollLocked 方法 :


Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
                        strerror(errno));
    AutoMutex _l(mLock);
    rebuildEpollLocked();
}
void Looper::rebuildEpollLocked() {
    // Close old epoll instance if we have one.
    if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
        ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
        close(mEpollFd);
    }
    // Allocate the new epoll instance and register the wake pipe.
    // 创建句柄 
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    // 注册监听的事件
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;
    // 注册事件监听 
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
                        strerror(errno));
    for (size_t i = 0; i < mRequests.size(); i++) {
        const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);
        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
        if (epollResult < 0) {
            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
                  request.fd, strerror(errno));
        }
    }
}



MessageQueue 消息队列是通过 Linux 的 epoll 机制实现的阻塞 ;


目录
相关文章
|
8天前
|
Java 程序员
深入理解Java异常处理机制
Java的异常处理是编程中的一块基石,它不仅保障了代码的健壮性,还提升了程序的可读性和可维护性。本文将深入浅出地探讨Java异常处理的核心概念、分类、处理策略以及最佳实践,旨在帮助读者建立正确的异常处理观念,提升编程效率和质量。
|
9天前
|
Java 开发者 UED
深入探索Java中的异常处理机制##
本文将带你深入了解Java语言中的异常处理机制,包括异常的分类、异常的捕获与处理、自定义异常的创建以及最佳实践。通过具体实例和代码演示,帮助你更好地理解和运用Java中的异常处理,提高程序的健壮性和可维护性。 ##
27 2
|
9天前
|
Java 开发者
Java中的异常处理机制深度剖析####
本文深入探讨了Java语言中异常处理的重要性、核心机制及其在实际编程中的应用策略,旨在帮助开发者更有效地编写健壮的代码。通过实例分析,揭示了try-catch-finally结构的最佳实践,以及如何利用自定义异常提升程序的可读性和维护性。此外,还简要介绍了Java 7引入的多异常捕获特性,为读者提供了一个全面而实用的异常处理指南。 ####
26 2
|
12天前
|
Java 程序员 UED
深入理解Java中的异常处理机制
本文旨在揭示Java异常处理的奥秘,从基础概念到高级应用,逐步引导读者掌握如何优雅地管理程序中的错误。我们将探讨异常类型、捕获流程,以及如何在代码中有效利用try-catch语句。通过实例分析,我们将展示异常处理在提升代码质量方面的关键作用。
25 3
|
12天前
|
Java 数据库连接 开发者
Java中的异常处理机制:深入解析与最佳实践####
本文旨在为Java开发者提供一份关于异常处理机制的全面指南,从基础概念到高级技巧,涵盖try-catch结构、自定义异常、异常链分析以及最佳实践策略。不同于传统的摘要概述,本文将以一个实际项目案例为线索,逐步揭示如何高效地管理运行时错误,提升代码的健壮性和可维护性。通过对比常见误区与优化方案,读者将获得编写更加健壮Java应用程序的实用知识。 --- ####
|
12天前
|
开发框架 安全 Java
Java 反射机制:动态编程的强大利器
Java反射机制允许程序在运行时检查类、接口、字段和方法的信息,并能操作对象。它提供了一种动态编程的方式,使得代码更加灵活,能够适应未知的或变化的需求,是开发框架和库的重要工具。
32 2
|
7天前
|
Java API 开发者
深入理解Java中的异常处理机制
本文探讨了Java编程语言中异常处理的核心概念,包括异常类型、异常捕获与抛出、以及最佳实践。通过分析常见的异常场景和处理策略,旨在帮助开发者更好地理解和运用异常处理机制,提高代码的健壮性和可维护性。文章不仅涵盖了基本的try-catch结构,还深入讨论了自定义异常的创建与使用,以及finally块的重要性和应用。此外,还将介绍一些高级技巧,如多异常捕获和嵌套异常处理,为读者提供全面的技术指导。
50 0
|
消息中间件 网络协议 物联网
微服务消息队列(MQTT For IoT)Android Demo使用介绍
目前阿里云官方对于微消息队列 MQTT提供了很多语言的参考示例,但是在实际的使用中发现很多用户在使用Android Sample的时候总是会遇到问题,无法正常调试使用。本文主要介绍Android Sample的使用。
微服务消息队列(MQTT For IoT)Android Demo使用介绍
|
消息中间件 物联网 Android开发
微服务消息队列(MQTT For IoT)Android Demo使用介绍
目前阿里云官方对于微消息队列 MQTT提供了很多语言的参考示例,但是在实际的使用中发现很多用户在使用Android Sample的时候总是会遇到问题,无法正常调试使用。本文主要介绍Android Sample的使用。
3153 0
|
7天前
|
搜索推荐 前端开发 API
探索安卓开发中的自定义视图:打造个性化用户界面
在安卓应用开发的广阔天地中,自定义视图是一块神奇的画布,让开发者能够突破标准控件的限制,绘制出独一无二的用户界面。本文将带你走进自定义视图的世界,从基础概念到实战技巧,逐步揭示如何在安卓平台上创建和运用自定义视图来提升用户体验。无论你是初学者还是有一定经验的开发者,这篇文章都将为你打开新的视野,让你的应用在众多同质化产品中脱颖而出。
34 19