diff --git a/sched/mqueue/CMakeLists.txt b/sched/mqueue/CMakeLists.txt index e36f1ddccc..3eea3b931b 100644 --- a/sched/mqueue/CMakeLists.txt +++ b/sched/mqueue/CMakeLists.txt @@ -32,10 +32,8 @@ if(NOT CONFIG_DISABLE_MQUEUE) APPEND SRCS mq_send.c - mq_timedsend.c mq_sndinternal.c mq_receive.c - mq_timedreceive.c mq_rcvinternal.c mq_msgfree.c mq_msgqalloc.c diff --git a/sched/mqueue/Make.defs b/sched/mqueue/Make.defs index e4ff7a3f88..e1f3324946 100644 --- a/sched/mqueue/Make.defs +++ b/sched/mqueue/Make.defs @@ -26,8 +26,8 @@ endif ifneq ($(CONFIG_DISABLE_MQUEUE),y) -CSRCS += mq_send.c mq_timedsend.c mq_sndinternal.c mq_receive.c -CSRCS += mq_timedreceive.c mq_rcvinternal.c mq_getattr.c +CSRCS += mq_send.c mq_sndinternal.c mq_receive.c +CSRCS += mq_rcvinternal.c mq_getattr.c CSRCS += mq_msgfree.c mq_msgqalloc.c mq_msgqfree.c CSRCS += mq_setattr.c mq_notify.c diff --git a/sched/mqueue/mq_msgfree.c b/sched/mqueue/mq_msgfree.c index df9522b806..846218eb1b 100644 --- a/sched/mqueue/mq_msgfree.c +++ b/sched/mqueue/mq_msgfree.c @@ -31,6 +31,7 @@ #include #include #include +#include #include "mqueue/mqueue.h" @@ -56,6 +57,8 @@ void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg) { + irqstate_t flags; + /* If this is a generally available pre-allocated message, * then just put it back in the free list. */ @@ -66,7 +69,9 @@ void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg) * list from interrupt handlers. */ + flags = spin_lock_irqsave(NULL); list_add_tail(&g_msgfree, &mqmsg->node); + spin_unlock_irqrestore(NULL, flags); } /* If this is a message pre-allocated for interrupts, @@ -79,7 +84,9 @@ void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg) * list from interrupt handlers. */ + flags = spin_lock_irqsave(NULL); list_add_tail(&g_msgfreeirq, &mqmsg->node); + spin_unlock_irqrestore(NULL, flags); } /* Otherwise, deallocate it. Note: interrupt handlers diff --git a/sched/mqueue/mq_rcvinternal.c b/sched/mqueue/mq_rcvinternal.c index f3861c1ca0..b97e0aa918 100644 --- a/sched/mqueue/mq_rcvinternal.c +++ b/sched/mqueue/mq_rcvinternal.c @@ -44,66 +44,56 @@ #include "mqueue/mqueue.h" /**************************************************************************** - * Public Functions + * Private Functions ****************************************************************************/ /**************************************************************************** - * Name: nxmq_verify_receive + * Name: nxmq_rcvtimeout * * Description: - * This is internal, common logic shared by both [nx]mq_receive and - * [nx]mq_timedreceive. This function verifies the input parameters that - * are common to both functions. + * This function is called if the timeout elapses before the message queue + * becomes non-empty. * * Input Parameters: - * msgq - Message queue descriptor - * msg - Buffer to receive the message - * msglen - Size of the buffer in bytes + * arg - the argument provided when the timeout was configured. * * Returned Value: - * On success, zero (OK) is returned. A negated errno value is returned - * on any failure: + * None * - * EBADF Message queue opened not opened for reading. - * EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the message - * queue. - * EINVAL Invalid 'msg' or 'msgq' + * Assumptions: * ****************************************************************************/ -#ifdef CONFIG_DEBUG_FEATURES -int nxmq_verify_receive(FAR struct file *mq, FAR char *msg, size_t msglen) +static void nxmq_rcvtimeout(wdparm_t arg) { - FAR struct inode *inode = mq->f_inode; - FAR struct mqueue_inode_s *msgq; + FAR struct tcb_s *wtcb = (FAR struct tcb_s *)(uintptr_t)arg; + irqstate_t flags; - if (inode == NULL) + /* Disable interrupts. This is necessary because an interrupt handler may + * attempt to send a message while we are doing this. + */ + + flags = enter_critical_section(); + + /* It is also possible that an interrupt/context switch beat us to the + * punch and already changed the task's state. + */ + + if (wtcb->task_state == TSTATE_WAIT_MQNOTEMPTY) { - return -EBADF; + /* Restart with task with a timeout error */ + + nxmq_wait_irq(wtcb, ETIMEDOUT); } - msgq = inode->i_private; + /* Interrupts may now be re-enabled. */ - /* Verify the input parameters */ - - if (!msg || !msgq) - { - return -EINVAL; - } - - if ((mq->f_oflags & O_RDOK) == 0) - { - return -EBADF; - } - - if (msglen < (size_t)msgq->maxmsgsize) - { - return -EMSGSIZE; - } - - return OK; + leave_critical_section(flags); } -#endif + +/**************************************************************************** + * Public Functions + ****************************************************************************/ /**************************************************************************** * Name: nxmq_wait_receive @@ -116,9 +106,10 @@ int nxmq_verify_receive(FAR struct file *mq, FAR char *msg, size_t msglen) * * Input Parameters: * msgq - Message queue descriptor - * oflags - flags from user set * rcvmsg - The caller-provided location in which to return the newly * received message. + * abstime - If non-NULL, this is the absolute time to wait until a + * message is received. * * Returned Value: * On success, zero (OK) is returned. A negated errno value is returned @@ -134,12 +125,12 @@ int nxmq_verify_receive(FAR struct file *mq, FAR char *msg, size_t msglen) ****************************************************************************/ int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq, - int oflags, FAR struct mqueue_msg_s **rcvmsg) + FAR struct mqueue_msg_s **rcvmsg, + FAR const struct timespec *abstime, + sclock_t ticks) { FAR struct mqueue_msg_s *newmsg; - FAR struct tcb_s *rtcb; - - DEBUGASSERT(rcvmsg != NULL); + FAR struct tcb_s *rtcb = this_task(); #ifdef CONFIG_CANCELLATION_POINTS /* nxmq_wait_receive() is not a cancellation point, but it may be called @@ -156,139 +147,90 @@ int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq, } #endif + if (abstime) + { + wd_start_realtime(&rtcb->waitdog, abstime, + nxmq_rcvtimeout, (wdparm_t)rtcb); + } + else if (ticks >= 0) + { + wd_start(&rtcb->waitdog, ticks, + nxmq_rcvtimeout, (wdparm_t)rtcb); + } + /* Get the message from the head of the queue */ while ((newmsg = (FAR struct mqueue_msg_s *) list_remove_head(&msgq->msglist)) == NULL) { - /* The queue is empty! Should we block until there the above condition - * has been satisfied? + msgq->cmn.nwaitnotempty++; + + /* Initialize the 'errcode" used to communication wake-up error + * conditions. */ - if ((oflags & O_NONBLOCK) == 0) + rtcb->waitobj = msgq; + rtcb->errcode = OK; + + /* Remove the tcb task from the running list. */ + + nxsched_remove_self(rtcb); + + /* Add the task to the specified blocked task list */ + + rtcb->task_state = TSTATE_WAIT_MQNOTEMPTY; + nxsched_add_prioritized(rtcb, MQ_WNELIST(msgq->cmn)); + + /* Now, perform the context switch */ + + up_switch_context(this_task(), rtcb); + + /* When we resume at this point, either (1) the message queue + * is no longer empty, or (2) the wait has been interrupted by + * a signal. We can detect the latter case be examining the + * errno value (should be either EINTR or ETIMEDOUT). + */ + + if (rtcb->errcode != OK) { - /* Yes.. Block and try again */ - - rtcb = this_task(); - rtcb->waitobj = msgq; - msgq->cmn.nwaitnotempty++; - - /* Initialize the 'errcode" used to communication wake-up error - * conditions. - */ - - rtcb->errcode = OK; - - /* Make sure this is not the idle task, descheduling that - * isn't going to end well. - */ - - DEBUGASSERT(!is_idle_task(rtcb)); - - /* Remove the tcb task from the running list. */ - - nxsched_remove_self(rtcb); - - /* Add the task to the specified blocked task list */ - - rtcb->task_state = TSTATE_WAIT_MQNOTEMPTY; - nxsched_add_prioritized(rtcb, MQ_WNELIST(msgq->cmn)); - - /* Now, perform the context switch */ - - up_switch_context(this_task(), rtcb); - - /* When we resume at this point, either (1) the message queue - * is no longer empty, or (2) the wait has been interrupted by - * a signal. We can detect the latter case be examining the - * errno value (should be either EINTR or ETIMEDOUT). - */ - - if (rtcb->errcode != OK) - { - return -rtcb->errcode; - } - } - else - { - /* The queue was empty, and the O_NONBLOCK flag was set for the - * message queue description. - */ - - return -EAGAIN; + break; } } - /* If we got message, then decrement the number of messages in - * the queue while we are still in the critical section - */ - - if (newmsg) + if (abstime || ticks >= 0) { - if (msgq->nmsgs-- == msgq->maxmsgs) - { - nxmq_pollnotify(msgq, POLLOUT); - } + wd_cancel(&rtcb->waitdog); } *rcvmsg = newmsg; - return OK; + return -rtcb->errcode; } /**************************************************************************** - * Name: nxmq_do_receive + * Name: nxmq_notify_receive * * Description: * This is internal, common logic shared by both [nx]mq_receive and - * [nx]mq_timedreceive. This function accepts the message obtained by - * mq_waitmsg, provides the message content to the user, notifies any - * threads that were waiting for the message queue to become non-full, - * and disposes of the message structure + * [nx]mq_timedreceive. + * This function notifies any tasks that are waiting for the message queue + * to become non-empty. This function is called after a message is + * received from the message queue. * * Input Parameters: * msgq - Message queue descriptor - * mqmsg - The message obtained by mq_waitmsg() - * ubuffer - The address of the user provided buffer to receive the message - * prio - The user-provided location to return the message priority. * * Returned Value: * Returns the length of the received message. This function does not * fail. * * Assumptions: - * - The caller has provided all validity checking of the input parameters - * using nxmq_verify_receive. - * - The user buffer, ubuffer, is known to be large enough to accept the - * largest message that an be sent on this message queue * - Pre-emption should be disabled throughout this call. * ****************************************************************************/ -ssize_t nxmq_do_receive(FAR struct mqueue_inode_s *msgq, - FAR struct mqueue_msg_s *mqmsg, - FAR char *ubuffer, FAR unsigned int *prio) +void nxmq_notify_receive(FAR struct mqueue_inode_s *msgq) { FAR struct tcb_s *btcb; - ssize_t rcvmsglen; - - /* Get the length of the message (also the return value) */ - - rcvmsglen = mqmsg->msglen; - - /* Copy the message into the caller's buffer */ - - memcpy(ubuffer, (FAR const void *)mqmsg->mail, rcvmsglen); - - /* Copy the message priority as well (if a buffer is provided) */ - - if (prio) - { - *prio = mqmsg->priority; - } - - /* We are done with the message. Deallocate it now. */ - - nxmq_free_msg(mqmsg); /* Check if any tasks are waiting for the MQ not full event. */ @@ -331,8 +273,4 @@ ssize_t nxmq_do_receive(FAR struct mqueue_inode_s *msgq, up_switch_context(btcb, rtcb); } } - - /* Return the length of the message transferred to the user buffer */ - - return rcvmsglen; } diff --git a/sched/mqueue/mq_receive.c b/sched/mqueue/mq_receive.c index 1c2c26e133..6fd024c69d 100644 --- a/sched/mqueue/mq_receive.c +++ b/sched/mqueue/mq_receive.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -40,10 +41,418 @@ #include "mqueue/mqueue.h" +/**************************************************************************** + * Private Functions + ****************************************************************************/ + +/**************************************************************************** + * Name: nxmq_verify_receive + * + * Description: + * This is internal, common logic shared by both [nx]mq_receive and + * [nx]mq_timedreceive. This function verifies the input parameters that + * are common to both functions. + * + * Input Parameters: + * msgq - Message queue descriptor + * msg - Buffer to receive the message + * msglen - Size of the buffer in bytes + * + * Returned Value: + * On success, zero (OK) is returned. A negated errno value is returned + * on any failure: + * + * EBADF Message queue opened not opened for reading. + * EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the message + * queue. + * EINVAL Invalid 'msg' or 'msgq' + * + ****************************************************************************/ + +#ifdef CONFIG_DEBUG_FEATURES +static int nxmq_verify_receive(FAR struct file *mq, + FAR char *msg, size_t msglen) +{ + FAR struct inode *inode = mq->f_inode; + FAR struct mqueue_inode_s *msgq; + + if (inode == NULL) + { + return -EBADF; + } + + msgq = inode->i_private; + + /* Verify the input parameters */ + + if (!msg || !msgq) + { + return -EINVAL; + } + + if ((mq->f_oflags & O_RDOK) == 0) + { + return -EBADF; + } + + if (msglen < (size_t)msgq->maxmsgsize) + { + return -EMSGSIZE; + } + + return OK; +} +#endif + +/**************************************************************************** + * Name: file_mq_timedreceive_internal + * + * Description: + * This is an internal function of file_mq_timedreceive()/ + * file_mq_tickreceive(), please refer to the detailed description for + * more information. + * + * Input Parameters: + * mq - Message Queue Descriptor + * msg - Buffer to receive the message + * msglen - Size of the buffer in bytes + * prio - If not NULL, the location to store message priority. + * abstime - the absolute time to wait until a timeout is declared. + * + * Returned Value: + * On success, the length of the selected message in bytes is returned. + * On failure, -1 (ERROR) is returned and the errno is set appropriately: + * + * EAGAIN The queue was empty, and the O_NONBLOCK flag was set + * for the message queue description referred to by 'mqdes'. + * EPERM Message queue opened not opened for reading. + * EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the + * message queue. + * EINTR The call was interrupted by a signal handler. + * EINVAL Invalid 'msg' or 'mqdes' or 'abstime' + * ETIMEDOUT The call timed out before a message could be transferred. + * + ****************************************************************************/ + +static +ssize_t file_mq_timedreceive_internal(FAR struct file *mq, FAR char *msg, + size_t msglen, FAR unsigned int *prio, + FAR const struct timespec *abstime, + sclock_t ticks) +{ + FAR struct mqueue_inode_s *msgq; + FAR struct mqueue_msg_s *mqmsg; + irqstate_t flags; + ssize_t ret = 0; + + DEBUGASSERT(up_interrupt_context() == false); + + /* Verify the input parameters */ + + if (abstime && (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)) + { + return -EINVAL; + } + + if (mq == NULL) + { + return -EINVAL; + } + +#ifdef CONFIG_DEBUG_FEATURES + /* Verify the input parameters and, in case of an error, set + * errno appropriately. + */ + + ret = nxmq_verify_receive(mq, msg, msglen); + if (ret < 0) + { + return ret; + } +#endif + + msgq = mq->f_inode->i_private; + + /* Furthermore, nxmq_wait_receive() expects to have interrupts disabled + * because messages can be sent from interrupt level. + */ + + flags = enter_critical_section(); + + /* Get the message from the message queue */ + + mqmsg = (FAR struct mqueue_msg_s *)list_remove_head(&msgq->msglist); + if (mqmsg == NULL) + { + if ((mq->f_oflags & O_NONBLOCK) != 0) + { + leave_critical_section(flags); + return -EAGAIN; + } + + /* Wait & get the message from the message queue */ + + ret = nxmq_wait_receive(msgq, &mqmsg, abstime, ticks); + if (ret < 0) + { + leave_critical_section(flags); + return ret; + } + } + + /* If we got message, then decrement the number of messages in + * the queue while we are still in the critical section + */ + + if (msgq->nmsgs-- == msgq->maxmsgs) + { + nxmq_pollnotify(msgq, POLLOUT); + } + + /* Notify all threads waiting for a message in the message queue */ + + nxmq_notify_receive(msgq); + + leave_critical_section(flags); + + /* Return the message to the caller */ + + if (prio) + { + *prio = mqmsg->priority; + } + + memcpy(msg, mqmsg->mail, mqmsg->msglen); + ret = mqmsg->msglen; + + /* Free the message structure */ + + nxmq_free_msg(mqmsg); + + return ret; +} + /**************************************************************************** * Public Functions ****************************************************************************/ +/**************************************************************************** + * Name: file_mq_timedreceive + * + * Description: + * This function receives the oldest of the highest priority messages from + * the message queue specified by "mq." If the message queue is empty + * and O_NONBLOCK was not set, file_mq_timedreceive() will block until a + * message is added to the message queue (or until a timeout occurs). + * + * file_mq_timedreceive() is an internal OS interface. It is functionally + * equivalent to mq_timedreceive() except that: + * + * - It is not a cancellation point, and + * - It does not modify the errno value. + * + * See comments with mq_timedreceive() for a more complete description of + * the behavior of this function + * + * Input Parameters: + * mq - Message Queue Descriptor + * msg - Buffer to receive the message + * msglen - Size of the buffer in bytes + * prio - If not NULL, the location to store message priority. + * abstime - the absolute time to wait until a timeout is declared. + * + * Returned Value: + * On success, the length of the selected message in bytes is returned. + * On failure, -1 (ERROR) is returned and the errno is set appropriately: + * + * EAGAIN The queue was empty, and the O_NONBLOCK flag was set + * for the message queue description referred to by 'mqdes'. + * EPERM Message queue opened not opened for reading. + * EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the + * message queue. + * EINTR The call was interrupted by a signal handler. + * EINVAL Invalid 'msg' or 'mqdes' or 'abstime' + * ETIMEDOUT The call timed out before a message could be transferred. + * + ****************************************************************************/ + +ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg, + size_t msglen, FAR unsigned int *prio, + FAR const struct timespec *abstime) +{ + return file_mq_timedreceive_internal(mq, msg, msglen, prio, abstime, -1); +} + +/**************************************************************************** + * Name: file_mq_tickreceive + * + * Description: + * This function receives the oldest of the highest priority messages from + * the message queue specified by "mq." If the message queue is empty + * and O_NONBLOCK was not set, file_mq_tickreceive() will block until a + * message is added to the message queue (or until a timeout occurs). + * + * file_mq_tickreceive() is an internal OS interface. It is functionally + * equivalent to mq_timedreceive() except that: + * + * - It is not a cancellation point, and + * - It does not modify the errno value. + * + * See comments with mq_timedreceive() for a more complete description of + * the behavior of this function + * + * Input Parameters: + * mq - Message Queue Descriptor + * msg - Buffer to receive the message + * msglen - Size of the buffer in bytes + * prio - If not NULL, the location to store message priority. + * ticks - Ticks to wait from the start time until the semaphore is + * posted. + * + * Returned Value: + * This is an internal OS interface and should not be used by applications. + * It follows the NuttX internal error return policy: Zero (OK) is + * returned on success. A negated errno value is returned on failure. + * (see mq_timedreceive() for the list list valid return values). + * + ****************************************************************************/ + +ssize_t file_mq_tickreceive(FAR struct file *mq, FAR char *msg, + size_t msglen, FAR unsigned int *prio, + sclock_t ticks) +{ + return file_mq_timedreceive_internal(mq, msg, msglen, prio, NULL, ticks); +} + +/**************************************************************************** + * Name: nxmq_timedreceive + * + * Description: + * This function receives the oldest of the highest priority messages from + * the message queue specified by "mqdes." If the message queue is empty + * and O_NONBLOCK was not set, nxmq_timedreceive() will block until a + * message is added to the message queue (or until a timeout occurs). + * + * nxmq_timedreceive() is an internal OS interface. It is functionally + * equivalent to mq_timedreceive() except that: + * + * - It is not a cancellation point, and + * - It does not modify the errno value. + * + * See comments with mq_timedreceive() for a more complete description of + * the behavior of this function + * + * Input Parameters: + * mqdes - Message Queue Descriptor + * msg - Buffer to receive the message + * msglen - Size of the buffer in bytes + * prio - If not NULL, the location to store message priority. + * abstime - the absolute time to wait until a timeout is declared. + * + * Returned Value: + * On success, the length of the selected message in bytes is returned. + * On failure, -1 (ERROR) is returned and the errno is set appropriately: + * + * EAGAIN The queue was empty, and the O_NONBLOCK flag was set + * for the message queue description referred to by 'mqdes'. + * EPERM Message queue opened not opened for reading. + * EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the + * message queue. + * EINTR The call was interrupted by a signal handler. + * EINVAL Invalid 'msg' or 'mqdes' or 'abstime' + * ETIMEDOUT The call timed out before a message could be transferred. + * + ****************************************************************************/ + +ssize_t nxmq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen, + FAR unsigned int *prio, + FAR const struct timespec *abstime) +{ + FAR struct file *filep; + ssize_t ret; + + ret = fs_getfilep(mqdes, &filep); + if (ret < 0) + { + return ret; + } + + ret = file_mq_timedreceive_internal(filep, msg, msglen, prio, abstime, -1); + fs_putfilep(filep); + return ret; +} + +/**************************************************************************** + * Name: mq_timedreceive + * + * Description: + * This function receives the oldest of the highest priority messages from + * the message queue specified by "mqdes." If the size of the buffer in + * bytes (msglen) is less than the "mq_msgsize" attribute of the message + * queue, mq_timedreceive will return an error. Otherwise, the selected + * message is removed from the queue and copied to "msg." + * + * If the message queue is empty and O_NONBLOCK was not set, + * mq_timedreceive() will block until a message is added to the message + * queue (or until a timeout occurs). If more than one task is waiting + * to receive a message, only the task with the highest priority that has + * waited the longest will be unblocked. + * + * mq_timedreceive() behaves just like mq_receive(), except that if the + * queue is empty and the O_NONBLOCK flag is not enabled for the message + * queue description, then abstime points to a structure which specifies a + * ceiling on the time for which the call will block. This ceiling is an + * absolute timeout in seconds and nanoseconds since the Epoch (midnight + * on the morning of 1 January 1970). + * + * If no message is available, and the timeout has already expired by the + * time of the call, mq_timedreceive() returns immediately. + * + * Input Parameters: + * mqdes - Message Queue Descriptor + * msg - Buffer to receive the message + * msglen - Size of the buffer in bytes + * prio - If not NULL, the location to store message priority. + * abstime - the absolute time to wait until a timeout is declared. + * + * Returned Value: + * On success, the length of the selected message in bytes is returned. + * On failure, -1 (ERROR) is returned and the errno is set appropriately: + * + * EAGAIN The queue was empty, and the O_NONBLOCK flag was set + * for the message queue description referred to by 'mqdes'. + * EPERM Message queue opened not opened for reading. + * EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the + * message queue. + * EINTR The call was interrupted by a signal handler. + * EINVAL Invalid 'msg' or 'mqdes' or 'abstime' + * ETIMEDOUT The call timed out before a message could be transferred. + * + ****************************************************************************/ + +ssize_t mq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen, + FAR unsigned int *prio, + FAR const struct timespec *abstime) +{ + int ret; + + /* mq_timedreceive() is a cancellation point */ + + enter_cancellation_point(); + + /* Let nxmq_timedreceive do all of the work */ + + ret = nxmq_timedreceive(mqdes, msg, msglen, prio, abstime); + if (ret < 0) + { + set_errno(-ret); + ret = ERROR; + } + + leave_cancellation_point(); + return ret; +} + /**************************************************************************** * Name: file_mq_receive * @@ -75,50 +484,7 @@ ssize_t file_mq_receive(FAR struct file *mq, FAR char *msg, size_t msglen, FAR unsigned int *prio) { - FAR struct mqueue_inode_s *msgq; - FAR struct mqueue_msg_s *mqmsg; - irqstate_t flags; - ssize_t ret; - - DEBUGASSERT(up_interrupt_context() == false); - - /* Verify the input parameters and, in case of an error, set - * errno appropriately. - */ - - ret = nxmq_verify_receive(mq, msg, msglen); - if (ret < 0) - { - return ret; - } - - msgq = mq->f_inode->i_private; - - /* Furthermore, nxmq_wait_receive() expects to have interrupts disabled - * because messages can be sent from interrupt level. - */ - - flags = enter_critical_section(); - - /* Get the message from the message queue */ - - ret = nxmq_wait_receive(msgq, mq->f_oflags, &mqmsg); - - /* Check if we got a message from the message queue. We might - * not have a message if: - * - * - The message queue is empty and O_NONBLOCK is set in the mq - * - The wait was interrupted by a signal - */ - - if (ret == OK) - { - ret = nxmq_do_receive(msgq, mqmsg, msg, prio); - } - - leave_critical_section(flags); - - return ret; + return file_mq_timedreceive_internal(mq, msg, msglen, prio, NULL, -1); } /**************************************************************************** diff --git a/sched/mqueue/mq_send.c b/sched/mqueue/mq_send.c index fb20eab5ce..ba32f0fd46 100644 --- a/sched/mqueue/mq_send.c +++ b/sched/mqueue/mq_send.c @@ -31,17 +31,578 @@ #include #include #include +#include #include #include +#include +#include #include #include "mqueue/mqueue.h" +/**************************************************************************** + * Private Functions + ****************************************************************************/ + +/**************************************************************************** + * Name: nxmq_verify_send + * + * Description: + * This is internal, common logic shared by both [nx]mq_send and + * [nx]mq_timesend. This function verifies the input parameters that are + * common to both functions. + * + * Input Parameters: + * msgq - Message queue descriptor + * msg - Message to send + * msglen - The length of the message in bytes + * prio - The priority of the message + * + * Returned Value: + * On success, 0 (OK) is returned. On failure, a negated errno value is + * returned. + * + * EINVAL Either msg or msgq is NULL or the value of prio is invalid. + * EBADF Message queue opened not opened for writing. + * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the + * message queue. + * + ****************************************************************************/ + +#ifdef CONFIG_DEBUG_FEATURES +static int nxmq_verify_send(FAR FAR struct file *mq, FAR const char *msg, + size_t msglen, unsigned int prio) +{ + FAR struct inode *inode = mq->f_inode; + FAR struct mqueue_inode_s *msgq; + + if (inode == NULL) + { + return -EBADF; + } + + msgq = inode->i_private; + + /* Verify the input parameters */ + + if (msg == NULL || msgq == NULL || prio >= MQ_PRIO_MAX) + { + return -EINVAL; + } + + if ((mq->f_oflags & O_WROK) == 0) + { + return -EBADF; + } + + if (msglen > (size_t)msgq->maxmsgsize) + { + return -EMSGSIZE; + } + + return OK; +} +#endif + +/**************************************************************************** + * Name: nxmq_alloc_msg + * + * Description: + * The nxmq_alloc_msg function will get a free message for use by the + * operating system. The message will be allocated from the g_msgfree + * list. + * + * If the list is empty AND the message is NOT being allocated from the + * interrupt level, then the message will be allocated. If a message + * cannot be obtained, the operating system is dead and therefore cannot + * continue. + * + * If the list is empty AND the message IS being allocated from the + * interrupt level. This function will attempt to get a message from + * the g_msgfreeirq list. If this is unsuccessful, the calling interrupt + * handler will be notified. + * + * Input Parameters: + * None + * + * Returned Value: + * A reference to the allocated msg structure. On a failure to allocate, + * this function PANICs. + * + ****************************************************************************/ + +static FAR struct mqueue_msg_s *nxmq_alloc_msg(uint16_t maxmsgsize) +{ + FAR struct mqueue_msg_s *mqmsg; + irqstate_t flags; + + /* Try to get the message from the generally available free list. */ + + flags = spin_lock_irqsave(NULL); + mqmsg = (FAR struct mqueue_msg_s *)list_remove_head(&g_msgfree); + spin_unlock_irqrestore(NULL, flags); + if (mqmsg == NULL) + { + /* If we were called from an interrupt handler, then try to get the + * message from generally available list of messages. If this fails, + * then try the list of messages reserved for interrupt handlers + */ + + if (up_interrupt_context()) + { + /* Try the free list reserved for interrupt handlers */ + + flags = spin_lock_irqsave(NULL); + mqmsg = (FAR struct mqueue_msg_s *)list_remove_head(&g_msgfreeirq); + spin_unlock_irqrestore(NULL, flags); + } + + /* We were not called from an interrupt handler. */ + + else + { + /* If we cannot a message from the free list, then we will have to + * allocate one. + */ + + mqmsg = kmm_malloc((sizeof (struct mqueue_msg_s))); + + /* Check if we allocated the message */ + + if (mqmsg != NULL) + { + /* Yes... remember that this message was dynamically + * allocated. + */ + + mqmsg->type = MQ_ALLOC_DYN; + } + } + } + + return mqmsg; +} + +/**************************************************************************** + * Name: nxmq_add_queue + * + * Description: + * This is internal, common logic shared by both [nx]mq_send and + * [nx]mq_timesend. This function adds the specified message (msg) to the + * message queue (msgq). Then it notifies any tasks that were waiting + * for message queue notifications setup by mq_notify. And, finally, it + * awakens any tasks that were waiting for the message not empty event. + * + * Input Parameters: + * msgq - Message queue descriptor + * msg - Message to send + * + * Returned Value: + * This function always returns OK. + * + ****************************************************************************/ + +static void nxmq_add_queue(FAR struct mqueue_inode_s *msgq, + FAR struct mqueue_msg_s *mqmsg, + unsigned int prio) +{ + FAR struct mqueue_msg_s *prev = NULL; + FAR struct mqueue_msg_s *next; + + /* Insert the new message in the message queue + * Search the message list to find the location to insert the new + * message. Each is list is maintained in ascending priority order. + */ + + list_for_every_entry(&msgq->msglist, next, struct mqueue_msg_s, node) + { + if (prio > next->priority) + { + break; + } + else + { + prev = next; + } + } + + /* Add the message at the right place */ + + if (prev) + { + list_add_after(&prev->node, &mqmsg->node); + } + else + { + list_add_head(&msgq->msglist, &mqmsg->node); + } +} + +/**************************************************************************** + * Name: file_mq_timedsend_internal + * + * Description: + * This is an internal function of file_mq_timedsend()/file_mq_ticksend(), + * please refer to the detailed description for more information. + * + * Input Parameters: + * mq - Message queue descriptor + * msg - Message to send + * msglen - The length of the message in bytes + * prio - The priority of the message + * abstime - the absolute time to wait until a timeout is decleared + * ticks - Ticks to wait from the start time until the semaphore is + * posted. + * + * Returned Value: + * This is an internal OS interface and should not be used by applications. + * It follows the NuttX internal error return policy: Zero (OK) is + * returned on success. A negated errno value is returned on failure. + * (see mq_timedsend() for the list list valid return values). + * + * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the + * message queue description referred to by mq. + * EINVAL Either msg or mq is NULL or the value of prio is invalid. + * EBADF Message queue opened not opened for writing. + * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the + * message queue. + * EINTR The call was interrupted by a signal handler. + * + ****************************************************************************/ + +static +int file_mq_timedsend_internal(FAR struct file *mq, FAR const char *msg, + size_t msglen, unsigned int prio, + FAR const struct timespec *abstime, + sclock_t ticks) +{ + FAR struct mqueue_inode_s *msgq; + FAR struct mqueue_msg_s *mqmsg; + irqstate_t flags; + int ret = 0; + + /* Verify the input parameters */ + + if (abstime && (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)) + { + return -EINVAL; + } + + if (mq == NULL) + { + return -EINVAL; + } + +#ifdef CONFIG_DEBUG_FEATURES + /* Verify the input parameters on any failures to verify. */ + + ret = nxmq_verify_send(mq, msg, msglen, prio); + if (ret < 0) + { + return ret; + } +#endif + + msgq = mq->f_inode->i_private; + + /* Pre-allocate a message structure */ + + mqmsg = nxmq_alloc_msg(msgq->maxmsgsize); + if (!mqmsg) + { + return -ENOMEM; + } + + memcpy(mqmsg->mail, msg, msglen); + mqmsg->priority = prio; + mqmsg->msglen = msglen; + + /* Disable interruption */ + + flags = enter_critical_section(); + + if (msgq->nmsgs >= msgq->maxmsgs) + { + /* Verify that the message is full and we can't wait */ + + if ((up_interrupt_context() || (mq->f_oflags & O_NONBLOCK) != 0)) + { + ret = -EAGAIN; + goto out; + } + + /* The message queue is full. We will need to wait for the message + * queue to become non-full. + */ + + ret = nxmq_wait_send(msgq, abstime, ticks); + if (ret < 0) + { + goto out; + } + } + + /* Add the message to the message queue */ + + nxmq_add_queue(msgq, mqmsg, prio); + + /* Increment the count of messages in the queue */ + + if (msgq->nmsgs++ == 0) + { + nxmq_pollnotify(msgq, POLLIN); + } + + /* Notify any tasks that are waiting for a message to become available */ + + nxmq_notify_send(msgq); + +out: + leave_critical_section(flags); + + if (ret < 0) + { + nxmq_free_msg(mqmsg); + } + + return ret; +} + /**************************************************************************** * Public Functions ****************************************************************************/ +/**************************************************************************** + * Name: file_mq_timedsend + * + * Description: + * This function adds the specified message (msg) to the message queue + * (mq). file_mq_timedsend() behaves just like mq_send(), except that if + * the queue is full and the O_NONBLOCK flag is not enabled for the + * message queue description, then abstime points to a structure which + * specifies a ceiling on the time for which the call will block. + * + * file_mq_timedsend() is functionally equivalent to mq_timedsend() except + * that: + * + * - It is not a cancellation point, and + * - It does not modify the errno value. + * + * See comments with mq_timedsend() for a more complete description of the + * behavior of this function + * + * Input Parameters: + * mq - Message queue descriptor + * msg - Message to send + * msglen - The length of the message in bytes + * prio - The priority of the message + * abstime - the absolute time to wait until a timeout is decleared + * + * Returned Value: + * This is an internal OS interface and should not be used by applications. + * It follows the NuttX internal error return policy: Zero (OK) is + * returned on success. A negated errno value is returned on failure. + * (see mq_timedsend() for the list list valid return values). + * + * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the + * message queue description referred to by mq. + * EINVAL Either msg or mq is NULL or the value of prio is invalid. + * EBADF Message queue opened not opened for writing. + * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the + * message queue. + * EINTR The call was interrupted by a signal handler. + * + ****************************************************************************/ + +int file_mq_timedsend(FAR struct file *mq, FAR const char *msg, + size_t msglen, unsigned int prio, + FAR const struct timespec *abstime) +{ + return file_mq_timedsend_internal(mq, msg, msglen, prio, abstime, -1); +} + +/**************************************************************************** + * Name: file_mq_ticksend + * + * Description: + * This function adds the specified message (msg) to the message queue + * (mq). file_mq_ticksend() behaves just like mq_send(), except that if + * the queue is full and the O_NONBLOCK flag is not enabled for the + * message queue description, then abstime points to a structure which + * specifies a ceiling on the time for which the call will block. + * + * file_mq_ticksend() is functionally equivalent to mq_timedsend() except + * that: + * + * - It is not a cancellation point, and + * - It does not modify the errno value. + * + * See comments with mq_timedsend() for a more complete description of the + * behavior of this function + * + * Input Parameters: + * mq - Message queue descriptor + * msg - Message to send + * msglen - The length of the message in bytes + * prio - The priority of the message + * ticks - Ticks to wait from the start time until the semaphore is + * posted. + * + * Returned Value: + * This is an internal OS interface and should not be used by applications. + * It follows the NuttX internal error return policy: Zero (OK) is + * returned on success. A negated errno value is returned on failure. + * (see mq_timedsend() for the list list valid return values). + * + * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the + * message queue description referred to by mq. + * EINVAL Either msg or mq is NULL or the value of prio is invalid. + * EBADF Message queue opened not opened for writing. + * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the + * message queue. + * EINTR The call was interrupted by a signal handler. + * + ****************************************************************************/ + +int file_mq_ticksend(FAR struct file *mq, FAR const char *msg, + size_t msglen, unsigned int prio, sclock_t ticks) +{ + return file_mq_timedsend_internal(mq, msg, msglen, prio, NULL, ticks); +} + +/**************************************************************************** + * Name: nxmq_timedsend + * + * Description: + * This function adds the specified message (msg) to the message queue + * (mqdes). nxmq_timedsend() behaves just like mq_send(), except + * that if the queue is full and the O_NONBLOCK flag is not enabled for + * the message queue description, then abstime points to a structure which + * specifies a ceiling on the time for which the call will block. + * + * nxmq_timedsend() is functionally equivalent to mq_timedsend() except + * that: + * + * - It is not a cancellation point, and + * - It does not modify the errno value. + * + * See comments with mq_timedsend() for a more complete description of the + * behavior of this function + * + * Input Parameters: + * mqdes - Message queue descriptor + * msg - Message to send + * msglen - The length of the message in bytes + * prio - The priority of the message + * abstime - the absolute time to wait until a timeout is decleared + * + * Returned Value: + * This is an internal OS interface and should not be used by applications. + * It follows the NuttX internal error return policy: Zero (OK) is + * returned on success. A negated errno value is returned on failure. + * (see mq_timedsend() for the list list valid return values). + * + * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the + * message queue description referred to by mqdes. + * EINVAL Either msg or mqdes is NULL or the value of prio is invalid. + * EBADF Message queue opened not opened for writing. + * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the + * message queue. + * EINTR The call was interrupted by a signal handler. + * + ****************************************************************************/ + +int nxmq_timedsend(mqd_t mqdes, FAR const char *msg, size_t msglen, + unsigned int prio, FAR const struct timespec *abstime) +{ + FAR struct file *filep; + int ret; + + ret = fs_getfilep(mqdes, &filep); + if (ret < 0) + { + return ret; + } + + ret = file_mq_timedsend_internal(filep, msg, msglen, prio, abstime, -1); + fs_putfilep(filep); + return ret; +} + +/**************************************************************************** + * Name: mq_timedsend + * + * Description: + * This function adds the specified message (msg) to the message queue + * (mqdes). The "msglen" parameter specifies the length of the message + * in bytes pointed to by "msg." This length must not exceed the maximum + * message length from the mq_getattr(). + * + * If the message queue is not full, mq_timedsend() place the message in + * the message queue at the position indicated by the "prio" argrument. + * Messages with higher priority will be inserted before lower priority + * messages. The value of "prio" must not exceed MQ_PRIO_MAX. + * + * If the specified message queue is full and O_NONBLOCK is not set in the + * message queue, then mq_timedsend() will block until space becomes + * available to the queue the message or a timeout occurs. + * + * mq_timedsend() behaves just like mq_send(), except that if the queue + * is full and the O_NONBLOCK flag is not enabled for the message queue + * description, then abstime points to a structure which specifies a + * ceiling on the time for which the call will block. This ceiling is an + * absolute timeout in seconds and nanoseconds since the Epoch (midnight + * on the morning of 1 January 1970). + * + * If the message queue is full, and the timeout has already expired by + * the time of the call, mq_timedsend() returns immediately. + * + * Input Parameters: + * mqdes - Message queue descriptor + * msg - Message to send + * msglen - The length of the message in bytes + * prio - The priority of the message + * abstime - the absolute time to wait until a timeout is decleared + * + * Returned Value: + * On success, mq_send() returns 0 (OK); on error, -1 (ERROR) + * is returned, with errno set to indicate the error: + * + * EAGAIN The queue was full, and the O_NONBLOCK flag was set for the + * message queue description referred to by mqdes. + * EINVAL Either msg or mqdes is NULL or the value of prio is invalid. + * EBADF Message queue opened not opened for writing. + * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the + * message queue. + * EINTR The call was interrupted by a signal handler. + * + * Assumptions/restrictions: + * + ****************************************************************************/ + +int mq_timedsend(mqd_t mqdes, FAR const char *msg, size_t msglen, + unsigned int prio, FAR const struct timespec *abstime) +{ + int ret; + + /* mq_timedsend() is a cancellation point */ + + enter_cancellation_point(); + + /* Let nxmq_send() do all of the work */ + + ret = nxmq_timedsend(mqdes, msg, msglen, prio, abstime); + if (ret < 0) + { + set_errno(-ret); + ret = ERROR; + } + + leave_cancellation_point(); + return ret; +} + /**************************************************************************** * Name: file_mq_send * @@ -73,72 +634,7 @@ int file_mq_send(FAR struct file *mq, FAR const char *msg, size_t msglen, unsigned int prio) { - FAR struct mqueue_inode_s *msgq; - FAR struct mqueue_msg_s *mqmsg; - irqstate_t flags; - int ret; - - /* Verify the input parameters -- setting errno appropriately - * on any failures to verify. - */ - - ret = nxmq_verify_send(mq, msg, msglen, prio); - if (ret < 0) - { - return ret; - } - - msgq = mq->f_inode->i_private; - - /* Allocate a message structure: - * - Immediately if we are called from an interrupt handler. - * - Immediately if the message queue is not full, or - * - After successfully waiting for the message queue to become - * non-FULL. This would fail with EAGAIN, EINTR, or ETIMEDOUT. - */ - - flags = enter_critical_section(); - - if (!up_interrupt_context()) /* In an interrupt handler? */ - { - /* No.. Not in an interrupt handler. Is the message queue FULL? */ - - if (msgq->nmsgs >= msgq->maxmsgs) - { - /* Yes.. the message queue is full. Wait for space to become - * available in the message queue. - */ - - ret = nxmq_wait_send(msgq, mq->f_oflags); - } - } - - /* ret can only be negative if nxmq_wait_send failed */ - - if (ret == OK) - { - /* Now allocate the message. */ - - mqmsg = nxmq_alloc_msg(); - DEBUGASSERT(mqmsg != NULL); - - /* Check if the message was successfully allocated */ - - /* The allocation was successful (implying that we can also send the - * message). Perform the message send. - * - * NOTE: There is a race condition here: What if a message is added by - * interrupt related logic so that queue again becomes non-empty. - * That is handled because nxmq_do_send() will permit the maxmsgs limit - * to be exceeded in that case. - */ - - ret = nxmq_do_send(msgq, mqmsg, msg, msglen, prio); - } - - leave_critical_section(flags); - - return ret; + return file_mq_timedsend_internal(mq, msg, msglen, prio, NULL, -1); } /**************************************************************************** @@ -181,7 +677,7 @@ int nxmq_send(mqd_t mqdes, FAR const char *msg, size_t msglen, return ret; } - ret = file_mq_send(filep, msg, msglen, prio); + ret = file_mq_timedsend_internal(filep, msg, msglen, prio, NULL, -1); fs_putfilep(filep); return ret; } diff --git a/sched/mqueue/mq_sndinternal.c b/sched/mqueue/mq_sndinternal.c index 1e1214cedf..2dcb24beaf 100644 --- a/sched/mqueue/mq_sndinternal.c +++ b/sched/mqueue/mq_sndinternal.c @@ -37,7 +37,6 @@ #include #include -#include #include #include #include @@ -46,144 +45,57 @@ #include "mqueue/mqueue.h" /**************************************************************************** - * Public Functions + * Private Functions ****************************************************************************/ /**************************************************************************** - * Name: nxmq_verify_send + * Name: nxmq_sndtimeout * * Description: - * This is internal, common logic shared by both [nx]mq_send and - * [nx]mq_timesend. This function verifies the input parameters that are - * common to both functions. + * This function is called if the timeout elapses before the message queue + * becomes non-full. * * Input Parameters: - * msgq - Message queue descriptor - * msg - Message to send - * msglen - The length of the message in bytes - * prio - The priority of the message + * arg - The argument that was provided when the timeout was configured. * * Returned Value: - * On success, 0 (OK) is returned. On failure, a negated errno value is - * returned. - * - * EINVAL Either msg or msgq is NULL or the value of prio is invalid. - * EBADF Message queue opened not opened for writing. - * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the - * message queue. - * - ****************************************************************************/ - -#ifdef CONFIG_DEBUG_FEATURES -int nxmq_verify_send(FAR struct file *mq, FAR const char *msg, - size_t msglen, unsigned int prio) -{ - FAR struct inode *inode = mq->f_inode; - FAR struct mqueue_inode_s *msgq; - - if (inode == NULL) - { - return -EBADF; - } - - msgq = inode->i_private; - - /* Verify the input parameters */ - - if (msg == NULL || msgq == NULL || prio >= MQ_PRIO_MAX) - { - return -EINVAL; - } - - if ((mq->f_oflags & O_WROK) == 0) - { - return -EBADF; - } - - if (msglen > (size_t)msgq->maxmsgsize) - { - return -EMSGSIZE; - } - - return OK; -} -#endif - -/**************************************************************************** - * Name: nxmq_alloc_msg - * - * Description: - * The nxmq_alloc_msg function will get a free message for use by the - * operating system. The message will be allocated from the g_msgfree - * list. - * - * If the list is empty AND the message is NOT being allocated from the - * interrupt level, then the message will be allocated. If a message - * cannot be obtained, the operating system is dead and therefore cannot - * continue. - * - * If the list is empty AND the message IS being allocated from the - * interrupt level. This function will attempt to get a message from - * the g_msgfreeirq list. If this is unsuccessful, the calling interrupt - * handler will be notified. - * - * Input Parameters: * None * - * Returned Value: - * A reference to the allocated msg structure. On a failure to allocate, - * this function PANICs. + * Assumptions: * ****************************************************************************/ -FAR struct mqueue_msg_s *nxmq_alloc_msg(void) +static void nxmq_sndtimeout(wdparm_t arg) { - FAR struct list_node *mqmsg; + FAR struct tcb_s *wtcb = (FAR struct tcb_s *)(uintptr_t)arg; + irqstate_t flags; - /* Try to get the message from the generally available free list. */ + /* Disable interrupts. This is necessary because an interrupt handler may + * attempt to send a message while we are doing this. + */ - mqmsg = list_remove_head(&g_msgfree); - if (mqmsg == NULL) + flags = enter_critical_section(); + + /* It is also possible that an interrupt/context switch beat us to the + * punch and already changed the task's state. + */ + + if (wtcb->task_state == TSTATE_WAIT_MQNOTFULL) { - /* If we were called from an interrupt handler, then try to get the - * message from generally available list of messages. If this fails, - * then try the list of messages reserved for interrupt handlers - */ + /* Restart with task with a timeout error */ - if (up_interrupt_context()) - { - /* Try the free list reserved for interrupt handlers */ - - mqmsg = list_remove_head(&g_msgfreeirq); - } - - /* We were not called from an interrupt handler. */ - - else - { - /* If we cannot a message from the free list, then we will have to - * allocate one. - */ - - mqmsg = (FAR struct list_node *) - kmm_malloc((sizeof (struct mqueue_msg_s))); - - /* Check if we allocated the message */ - - if (mqmsg != NULL) - { - /* Yes... remember that this message was dynamically - * allocated. - */ - - ((FAR struct mqueue_msg_s *)mqmsg)->type = MQ_ALLOC_DYN; - } - } + nxmq_wait_irq(wtcb, ETIMEDOUT); } - return (FAR struct mqueue_msg_s *)mqmsg; + /* Interrupts may now be re-enabled. */ + + leave_critical_section(flags); } +/**************************************************************************** + * Public Functions + ****************************************************************************/ + /**************************************************************************** * Name: nxmq_wait_send * @@ -194,7 +106,7 @@ FAR struct mqueue_msg_s *nxmq_alloc_msg(void) * * Input Parameters: * msgq - Message queue descriptor - * oflags - flags from user set + * abstime - The absolute time to wait until * * Returned Value: * On success, nxmq_wait_send() returns 0 (OK); a negated errno value is @@ -212,9 +124,11 @@ FAR struct mqueue_msg_s *nxmq_alloc_msg(void) * ****************************************************************************/ -int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags) +int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, + FAR const struct timespec *abstime, + sclock_t ticks) { - FAR struct tcb_s *rtcb; + FAR struct tcb_s *rtcb = this_task(); #ifdef CONFIG_CANCELLATION_POINTS /* nxmq_wait_send() is not a cancellation point, but may be called via @@ -231,6 +145,17 @@ int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags) } #endif + if (abstime) + { + wd_start_realtime(&rtcb->waitdog, abstime, + nxmq_sndtimeout, (wdparm_t)rtcb); + } + else if (ticks >= 0) + { + wd_start(&rtcb->waitdog, ticks, + nxmq_sndtimeout, (wdparm_t)rtcb); + } + /* Verify that the queue is indeed full as the caller thinks */ /* Loop until there are fewer than max allowable messages in the @@ -239,17 +164,6 @@ int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags) while (msgq->nmsgs >= msgq->maxmsgs) { - /* Should we block until there is sufficient space in the - * message queue? - */ - - if ((oflags & O_NONBLOCK) != 0) - { - /* No... We will return an error to the caller. */ - - return -EAGAIN; - } - /* Block until the message queue is no longer full. * When we are unblocked, we will try again */ @@ -291,86 +205,41 @@ int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags) if (rtcb->errcode != OK) { - return -rtcb->errcode; + break; } } - return OK; + if (abstime || ticks >= 0) + { + wd_cancel(&rtcb->waitdog); + } + + return -rtcb->errcode; } /**************************************************************************** - * Name: nxmq_do_send + * Name: nxmq_notify_send * * Description: - * This is internal, common logic shared by both [nx]mq_send and - * [nx]mq_timesend. This function adds the specified message (msg) to the - * message queue (msgq). Then it notifies any tasks that were waiting - * for message queue notifications setup by mq_notify. And, finally, it - * awakens any tasks that were waiting for the message not empty event. + * This function is called when a message is sent to a message queue. + * It will notify any tasks that are waiting for the message queue to be + * non-full. * * Input Parameters: * msgq - Message queue descriptor - * msg - Message to send - * msglen - The length of the message in bytes - * prio - The priority of the message * * Returned Value: - * This function always returns OK. + * None + * + * Assumptions/restrictions: + * - Executes within a critical section established by the caller. * ****************************************************************************/ -int nxmq_do_send(FAR struct mqueue_inode_s *msgq, - FAR struct mqueue_msg_s *mqmsg, - FAR const char *msg, size_t msglen, unsigned int prio) +void nxmq_notify_send(FAR struct mqueue_inode_s *msgq) { - FAR struct mqueue_msg_s *prev = NULL; - FAR struct mqueue_msg_s *next; FAR struct tcb_s *btcb; - /* Construct the message header info */ - - mqmsg->priority = prio; - mqmsg->msglen = msglen; - - /* Copy the message data into the message */ - - memcpy((FAR void *)mqmsg->mail, (FAR const void *)msg, msglen); - - /* Insert the new message in the message queue - * Search the message list to find the location to insert the new - * message. Each is list is maintained in ascending priority order. - */ - - list_for_every_entry(&msgq->msglist, next, struct mqueue_msg_s, node) - { - if (prio > next->priority) - { - break; - } - else - { - prev = next; - } - } - - /* Add the message at the right place */ - - if (prev) - { - list_add_after(&prev->node, &mqmsg->node); - } - else - { - list_add_head(&msgq->msglist, &mqmsg->node); - } - - /* Increment the count of messages in the queue */ - - if (msgq->nmsgs++ == 0) - { - nxmq_pollnotify(msgq, POLLIN); - } - /* Check if we need to notify any tasks that are attached to the * message queue */ @@ -437,6 +306,4 @@ int nxmq_do_send(FAR struct mqueue_inode_s *msgq, up_switch_context(btcb, rtcb); } } - - return OK; } diff --git a/sched/mqueue/mq_timedreceive.c b/sched/mqueue/mq_timedreceive.c deleted file mode 100644 index c1bbcd2f73..0000000000 --- a/sched/mqueue/mq_timedreceive.c +++ /dev/null @@ -1,435 +0,0 @@ -/**************************************************************************** - * sched/mqueue/mq_timedreceive.c - * - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The - * ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - * - ****************************************************************************/ - -/**************************************************************************** - * Included Files - ****************************************************************************/ - -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include "sched/sched.h" -#include "clock/clock.h" -#include "mqueue/mqueue.h" - -/**************************************************************************** - * Private Functions - ****************************************************************************/ - -/**************************************************************************** - * Name: nxmq_rcvtimeout - * - * Description: - * This function is called if the timeout elapses before the message queue - * becomes non-empty. - * - * Input Parameters: - * arg - the argument provided when the timeout was configured. - * - * Returned Value: - * None - * - * Assumptions: - * - ****************************************************************************/ - -static void nxmq_rcvtimeout(wdparm_t arg) -{ - FAR struct tcb_s *wtcb = (FAR struct tcb_s *)(uintptr_t)arg; - irqstate_t flags; - - /* Disable interrupts. This is necessary because an interrupt handler may - * attempt to send a message while we are doing this. - */ - - flags = enter_critical_section(); - - /* It is also possible that an interrupt/context switch beat us to the - * punch and already changed the task's state. - */ - - if (wtcb->task_state == TSTATE_WAIT_MQNOTEMPTY) - { - /* Restart with task with a timeout error */ - - nxmq_wait_irq(wtcb, ETIMEDOUT); - } - - /* Interrupts may now be re-enabled. */ - - leave_critical_section(flags); -} - -/**************************************************************************** - * Public Functions - ****************************************************************************/ - -/**************************************************************************** - * Name: file_mq_timedreceive_internal - * - * Description: - * This is an internal function of file_mq_timedreceive()/ - * file_mq_tickreceive(), please refer to the detailed description for - * more information. - * - * Input Parameters: - * mq - Message Queue Descriptor - * msg - Buffer to receive the message - * msglen - Size of the buffer in bytes - * prio - If not NULL, the location to store message priority. - * abstime - the absolute time to wait until a timeout is declared. - * ticks - Ticks to wait from the start time until the semaphore is - * posted. - * - * Returned Value: - * This is an internal OS interface and should not be used by applications. - * It follows the NuttX internal error return policy: Zero (OK) is - * returned on success. A negated errno value is returned on failure. - * (see mq_timedreceive() for the list list valid return values). - * - ****************************************************************************/ - -static ssize_t -file_mq_timedreceive_internal(FAR struct file *mq, FAR char *msg, - size_t msglen, FAR unsigned int *prio, - FAR const struct timespec *abstime, - sclock_t ticks) -{ - FAR struct tcb_s *rtcb = this_task(); - FAR struct mqueue_inode_s *msgq; - FAR struct mqueue_msg_s *mqmsg; - irqstate_t flags; - int ret; - - DEBUGASSERT(up_interrupt_context() == false); - - /* Verify the input parameters and, in case of an error, set - * errno appropriately. - */ - - ret = nxmq_verify_receive(mq, msg, msglen); - if (ret < 0) - { - return ret; - } - - msgq = mq->f_inode->i_private; - - /* Furthermore, nxmq_wait_receive() expects to have interrupts disabled - * because messages can be sent from interrupt level. - */ - - flags = enter_critical_section(); - - /* Check if the message queue is empty. If it is NOT empty, then we - * will not need to start timer. - */ - - if (list_is_empty(&msgq->msglist)) - { - if (abstime != NULL) - { - if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) - { - ret = -EINVAL; - } - else - { - /* Convert the timespec to clock ticks. - * We must have interrupts disabled here so that - * this time stays valid until the wait begins. - */ - - clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks); - } - - /* Handle any time-related errors */ - - if (ret != OK) - { - goto errout_in_critical_section; - } - } - - /* If the time has already expired and the message queue is empty, - * return immediately. - */ - - if (ticks <= 0) - { - ret = -ETIMEDOUT; - goto errout_in_critical_section; - } - - /* Start the watchdog */ - - wd_start(&rtcb->waitdog, ticks, nxmq_rcvtimeout, (wdparm_t)rtcb); - } - - /* Get the message from the message queue */ - - ret = nxmq_wait_receive(msgq, mq->f_oflags, &mqmsg); - - /* Stop the watchdog timer (this is not harmful in the case where - * it was never started) - */ - - wd_cancel(&rtcb->waitdog); - - /* Check if we got a message from the message queue. We might - * not have a message if: - * - * - The message queue is empty and O_NONBLOCK is set in the mqdes - * - The wait was interrupted by a signal - * - The watchdog timeout expired - */ - - if (ret == OK) - { - DEBUGASSERT(mqmsg != NULL); - ret = nxmq_do_receive(msgq, mqmsg, msg, prio); - } - - /* We can now restore interrupts */ - -errout_in_critical_section: - leave_critical_section(flags); - - return ret; -} - -/**************************************************************************** - * Name: file_mq_timedreceive - * - * Description: - * This function receives the oldest of the highest priority messages from - * the message queue specified by "mq." If the message queue is empty - * and O_NONBLOCK was not set, file_mq_timedreceive() will block until a - * message is added to the message queue (or until a timeout occurs). - * - * file_mq_timedreceive() is an internal OS interface. It is functionally - * equivalent to mq_timedreceive() except that: - * - * - It is not a cancellation point, and - * - It does not modify the errno value. - * - * See comments with mq_timedreceive() for a more complete description of - * the behavior of this function - * - * Input Parameters: - * mq - Message Queue Descriptor - * msg - Buffer to receive the message - * msglen - Size of the buffer in bytes - * prio - If not NULL, the location to store message priority. - * abstime - the absolute time to wait until a timeout is declared. - * - * Returned Value: - * This is an internal OS interface and should not be used by applications. - * It follows the NuttX internal error return policy: Zero (OK) is - * returned on success. A negated errno value is returned on failure. - * (see mq_timedreceive() for the list list valid return values). - * - ****************************************************************************/ - -ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg, - size_t msglen, FAR unsigned int *prio, - FAR const struct timespec *abstime) -{ - return file_mq_timedreceive_internal(mq, msg, msglen, prio, abstime, 0); -} - -/**************************************************************************** - * Name: file_mq_tickreceive - * - * Description: - * This function receives the oldest of the highest priority messages from - * the message queue specified by "mq." If the message queue is empty - * and O_NONBLOCK was not set, file_mq_tickreceive() will block until a - * message is added to the message queue (or until a timeout occurs). - * - * file_mq_tickreceive() is an internal OS interface. It is functionally - * equivalent to mq_timedreceive() except that: - * - * - It is not a cancellation point, and - * - It does not modify the errno value. - * - * See comments with mq_timedreceive() for a more complete description of - * the behavior of this function - * - * Input Parameters: - * mq - Message Queue Descriptor - * msg - Buffer to receive the message - * msglen - Size of the buffer in bytes - * prio - If not NULL, the location to store message priority. - * ticks - Ticks to wait from the start time until the semaphore is - * posted. - * - * Returned Value: - * This is an internal OS interface and should not be used by applications. - * It follows the NuttX internal error return policy: Zero (OK) is - * returned on success. A negated errno value is returned on failure. - * (see mq_timedreceive() for the list list valid return values). - * - ****************************************************************************/ - -ssize_t file_mq_tickreceive(FAR struct file *mq, FAR char *msg, - size_t msglen, FAR unsigned int *prio, - sclock_t ticks) -{ - return file_mq_timedreceive_internal(mq, msg, msglen, prio, NULL, ticks); -} - -/**************************************************************************** - * Name: nxmq_timedreceive - * - * Description: - * This function receives the oldest of the highest priority messages from - * the message queue specified by "mqdes." If the message queue is empty - * and O_NONBLOCK was not set, nxmq_timedreceive() will block until a - * message is added to the message queue (or until a timeout occurs). - * - * nxmq_timedreceive() is an internal OS interface. It is functionally - * equivalent to mq_timedreceive() except that: - * - * - It is not a cancellation point, and - * - It does not modify the errno value. - * - * See comments with mq_timedreceive() for a more complete description of - * the behavior of this function - * - * Input Parameters: - * mqdes - Message Queue Descriptor - * msg - Buffer to receive the message - * msglen - Size of the buffer in bytes - * prio - If not NULL, the location to store message priority. - * abstime - the absolute time to wait until a timeout is declared. - * - * Returned Value: - * This is an internal OS interface and should not be used by applications. - * It follows the NuttX internal error return policy: Zero (OK) is - * returned on success. A negated errno value is returned on failure. - * (see mq_timedreceive() for the list list valid return values). - * - ****************************************************************************/ - -ssize_t nxmq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen, - FAR unsigned int *prio, - FAR const struct timespec *abstime) -{ - FAR struct file *filep; - ssize_t ret; - - ret = fs_getfilep(mqdes, &filep); - if (ret < 0) - { - return ret; - } - - ret = file_mq_timedreceive_internal(filep, msg, msglen, prio, abstime, 0); - fs_putfilep(filep); - return ret; -} - -/**************************************************************************** - * Name: mq_timedreceive - * - * Description: - * This function receives the oldest of the highest priority messages from - * the message queue specified by "mqdes." If the size of the buffer in - * bytes (msglen) is less than the "mq_msgsize" attribute of the message - * queue, mq_timedreceive will return an error. Otherwise, the selected - * message is removed from the queue and copied to "msg." - * - * If the message queue is empty and O_NONBLOCK was not set, - * mq_timedreceive() will block until a message is added to the message - * queue (or until a timeout occurs). If more than one task is waiting - * to receive a message, only the task with the highest priority that has - * waited the longest will be unblocked. - * - * mq_timedreceive() behaves just like mq_receive(), except that if the - * queue is empty and the O_NONBLOCK flag is not enabled for the message - * queue description, then abstime points to a structure which specifies a - * ceiling on the time for which the call will block. This ceiling is an - * absolute timeout in seconds and nanoseconds since the Epoch (midnight - * on the morning of 1 January 1970). - * - * If no message is available, and the timeout has already expired by the - * time of the call, mq_timedreceive() returns immediately. - * - * Input Parameters: - * mqdes - Message Queue Descriptor - * msg - Buffer to receive the message - * msglen - Size of the buffer in bytes - * prio - If not NULL, the location to store message priority. - * abstime - the absolute time to wait until a timeout is declared. - * - * Returned Value: - * On success, the length of the selected message in bytes is returned. - * On failure, -1 (ERROR) is returned and the errno is set appropriately: - * - * EAGAIN The queue was empty, and the O_NONBLOCK flag was set - * for the message queue description referred to by 'mqdes'. - * EPERM Message queue opened not opened for reading. - * EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the - * message queue. - * EINTR The call was interrupted by a signal handler. - * EINVAL Invalid 'msg' or 'mqdes' or 'abstime' - * ETIMEDOUT The call timed out before a message could be transferred. - * - ****************************************************************************/ - -ssize_t mq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen, - FAR unsigned int *prio, - FAR const struct timespec *abstime) -{ - int ret; - - /* mq_timedreceive() is a cancellation point */ - - enter_cancellation_point(); - - /* Let nxmq_timedreceive do all of the work */ - - ret = nxmq_timedreceive(mqdes, msg, msglen, prio, abstime); - if (ret < 0) - { - set_errno(-ret); - ret = ERROR; - } - - leave_cancellation_point(); - return ret; -} diff --git a/sched/mqueue/mq_timedsend.c b/sched/mqueue/mq_timedsend.c deleted file mode 100644 index 84361c6403..0000000000 --- a/sched/mqueue/mq_timedsend.c +++ /dev/null @@ -1,510 +0,0 @@ -/**************************************************************************** - * sched/mqueue/mq_timedsend.c - * - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The - * ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - * - ****************************************************************************/ - -/**************************************************************************** - * Included Files - ****************************************************************************/ - -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include "clock/clock.h" -#include "sched/sched.h" -#include "mqueue/mqueue.h" - -/**************************************************************************** - * Private Functions - ****************************************************************************/ - -/**************************************************************************** - * Name: nxmq_sndtimeout - * - * Description: - * This function is called if the timeout elapses before the message queue - * becomes non-full. - * - * Input Parameters: - * arg - The argument that was provided when the timeout was configured. - * - * Returned Value: - * None - * - * Assumptions: - * - ****************************************************************************/ - -static void nxmq_sndtimeout(wdparm_t arg) -{ - FAR struct tcb_s *wtcb = (FAR struct tcb_s *)(uintptr_t)arg; - irqstate_t flags; - - /* Disable interrupts. This is necessary because an interrupt handler may - * attempt to send a message while we are doing this. - */ - - flags = enter_critical_section(); - - /* It is also possible that an interrupt/context switch beat us to the - * punch and already changed the task's state. - */ - - if (wtcb->task_state == TSTATE_WAIT_MQNOTFULL) - { - /* Restart with task with a timeout error */ - - nxmq_wait_irq(wtcb, ETIMEDOUT); - } - - /* Interrupts may now be re-enabled. */ - - leave_critical_section(flags); -} - -/**************************************************************************** - * Public Functions - ****************************************************************************/ - -/**************************************************************************** - * Name: file_mq_timedsend_internal - * - * Description: - * This is an internal function of file_mq_timedsend()/file_mq_ticksend(), - * please refer to the detailed description for more information. - * - * Input Parameters: - * mq - Message queue descriptor - * msg - Message to send - * msglen - The length of the message in bytes - * prio - The priority of the message - * abstime - the absolute time to wait until a timeout is decleared - * ticks - Ticks to wait from the start time until the semaphore is - * posted. - * - * Returned Value: - * This is an internal OS interface and should not be used by applications. - * It follows the NuttX internal error return policy: Zero (OK) is - * returned on success. A negated errno value is returned on failure. - * (see mq_timedsend() for the list list valid return values). - * - * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the - * message queue description referred to by mq. - * EINVAL Either msg or mq is NULL or the value of prio is invalid. - * EBADF Message queue opened not opened for writing. - * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the - * message queue. - * EINTR The call was interrupted by a signal handler. - * - ****************************************************************************/ - -static int -file_mq_timedsend_internal(FAR struct file *mq, FAR const char *msg, - size_t msglen, unsigned int prio, - FAR const struct timespec *abstime, - sclock_t ticks) -{ - FAR struct tcb_s *rtcb = this_task(); - FAR struct mqueue_inode_s *msgq; - FAR struct mqueue_msg_s *mqmsg; - irqstate_t flags; - int ret; - - /* Verify the input parameters on any failures to verify. */ - - ret = nxmq_verify_send(mq, msg, msglen, prio); - if (ret < 0) - { - return ret; - } - - msgq = mq->f_inode->i_private; - - /* Disable interruption */ - - flags = enter_critical_section(); - - /* Pre-allocate a message structure */ - - mqmsg = nxmq_alloc_msg(); - if (mqmsg == NULL) - { - /* Failed to allocate the message. nxmq_alloc_msg() does not set the - * errno value. - */ - - ret = -ENOMEM; - goto errout_in_critical_section; - } - - /* OpenGroup.org: "Under no circumstance shall the operation fail with a - * timeout if there is sufficient room in the queue to add the message - * immediately. The validity of the abstime parameter need not be checked - * when there is sufficient room in the queue." - * - * Also ignore the time value if for some crazy reason we were called from - * an interrupt handler. This probably really should be an assertion. - * - * NOTE: There is a race condition here: What if a message is added by - * interrupt related logic so that queue again becomes non-empty. That - * is handled because nxmq_do_send() will permit the maxmsgs limit to be - * exceeded in that case. - */ - - if (msgq->nmsgs < msgq->maxmsgs || up_interrupt_context()) - { - /* Do the send with no further checks (possibly exceeding maxmsgs) - * Currently nxmq_do_send() always returns OK. - */ - - goto out_send_message; - } - - /* The message queue is full... We are going to wait. - * Now we must have a valid time value. - */ - - if (abstime != NULL) - { - if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) - { - ret = -EINVAL; - } - else - { - /* We are not in an interrupt handler and the message queue - * is full. Set up a timed wait for the message queue to - * become non-full. - * - * Convert the timespec to clock ticks. We must have interrupts - * disabled here so that this time stays valid until the wait - * begins. - */ - - clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks); - } - - /* Handle any time-related errors */ - - if (ret != OK) - { - nxmq_free_msg(mqmsg); - goto errout_in_critical_section; - } - } - - /* If the time has already expired and the message queue is empty, - * return immediately. - */ - - if (ticks <= 0) - { - ret = -ETIMEDOUT; - nxmq_free_msg(mqmsg); - goto errout_in_critical_section; - } - - /* Start the watchdog and begin the wait for MQ not full */ - - wd_start(&rtcb->waitdog, ticks, nxmq_sndtimeout, (wdparm_t)rtcb); - - /* And wait for the message queue to be non-empty */ - - ret = nxmq_wait_send(msgq, mq->f_oflags); - - /* This may return with an error and errno set to either EINTR - * or ETIMEDOUT. Cancel the watchdog timer in any event. - */ - - wd_cancel(&rtcb->waitdog); - - /* Check if nxmq_wait_send() failed */ - - if (ret == OK) - { - /* If any of the above failed, set the errno. Otherwise, there should - * be space for another message in the message queue. NOW we can - * allocate the message structure. - * - * Currently nxmq_do_send() always returns OK. - */ - -out_send_message: - ret = nxmq_do_send(msgq, mqmsg, msg, msglen, prio); - } - else - { - /* free the message as it can't be sent */ - - nxmq_free_msg(mqmsg); - } - - /* Exit here with (1) the scheduler locked, (2) a message allocated, (3) a - * wdog allocated, and (4) interrupts disabled. - */ - -errout_in_critical_section: - leave_critical_section(flags); - - return ret; -} - -/**************************************************************************** - * Name: file_mq_timedsend - * - * Description: - * This function adds the specified message (msg) to the message queue - * (mq). file_mq_timedsend() behaves just like mq_send(), except that if - * the queue is full and the O_NONBLOCK flag is not enabled for the - * message queue description, then abstime points to a structure which - * specifies a ceiling on the time for which the call will block. - * - * file_mq_timedsend() is functionally equivalent to mq_timedsend() except - * that: - * - * - It is not a cancellation point, and - * - It does not modify the errno value. - * - * See comments with mq_timedsend() for a more complete description of the - * behavior of this function - * - * Input Parameters: - * mq - Message queue descriptor - * msg - Message to send - * msglen - The length of the message in bytes - * prio - The priority of the message - * abstime - the absolute time to wait until a timeout is decleared - * - * Returned Value: - * This is an internal OS interface and should not be used by applications. - * It follows the NuttX internal error return policy: Zero (OK) is - * returned on success. A negated errno value is returned on failure. - * (see mq_timedsend() for the list list valid return values). - * - * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the - * message queue description referred to by mq. - * EINVAL Either msg or mq is NULL or the value of prio is invalid. - * EBADF Message queue opened not opened for writing. - * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the - * message queue. - * EINTR The call was interrupted by a signal handler. - * - ****************************************************************************/ - -int file_mq_timedsend(FAR struct file *mq, FAR const char *msg, - size_t msglen, unsigned int prio, - FAR const struct timespec *abstime) -{ - return file_mq_timedsend_internal(mq, msg, msglen, prio, abstime, 0); -} - -/**************************************************************************** - * Name: file_mq_ticksend - * - * Description: - * This function adds the specified message (msg) to the message queue - * (mq). file_mq_ticksend() behaves just like mq_send(), except that if - * the queue is full and the O_NONBLOCK flag is not enabled for the - * message queue description, then abstime points to a structure which - * specifies a ceiling on the time for which the call will block. - * - * file_mq_ticksend() is functionally equivalent to mq_timedsend() except - * that: - * - * - It is not a cancellation point, and - * - It does not modify the errno value. - * - * See comments with mq_timedsend() for a more complete description of the - * behavior of this function - * - * Input Parameters: - * mq - Message queue descriptor - * msg - Message to send - * msglen - The length of the message in bytes - * prio - The priority of the message - * ticks - Ticks to wait from the start time until the semaphore is - * posted. - * - * Returned Value: - * This is an internal OS interface and should not be used by applications. - * It follows the NuttX internal error return policy: Zero (OK) is - * returned on success. A negated errno value is returned on failure. - * (see mq_timedsend() for the list list valid return values). - * - * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the - * message queue description referred to by mq. - * EINVAL Either msg or mq is NULL or the value of prio is invalid. - * EBADF Message queue opened not opened for writing. - * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the - * message queue. - * EINTR The call was interrupted by a signal handler. - * - ****************************************************************************/ - -int file_mq_ticksend(FAR struct file *mq, FAR const char *msg, - size_t msglen, unsigned int prio, sclock_t ticks) -{ - return file_mq_timedsend_internal(mq, msg, msglen, prio, NULL, ticks); -} - -/**************************************************************************** - * Name: nxmq_timedsend - * - * Description: - * This function adds the specified message (msg) to the message queue - * (mqdes). nxmq_timedsend() behaves just like mq_send(), except - * that if the queue is full and the O_NONBLOCK flag is not enabled for - * the message queue description, then abstime points to a structure which - * specifies a ceiling on the time for which the call will block. - * - * nxmq_timedsend() is functionally equivalent to mq_timedsend() except - * that: - * - * - It is not a cancellation point, and - * - It does not modify the errno value. - * - * See comments with mq_timedsend() for a more complete description of the - * behavior of this function - * - * Input Parameters: - * mqdes - Message queue descriptor - * msg - Message to send - * msglen - The length of the message in bytes - * prio - The priority of the message - * abstime - the absolute time to wait until a timeout is decleared - * - * Returned Value: - * This is an internal OS interface and should not be used by applications. - * It follows the NuttX internal error return policy: Zero (OK) is - * returned on success. A negated errno value is returned on failure. - * (see mq_timedsend() for the list list valid return values). - * - * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the - * message queue description referred to by mqdes. - * EINVAL Either msg or mqdes is NULL or the value of prio is invalid. - * EBADF Message queue opened not opened for writing. - * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the - * message queue. - * EINTR The call was interrupted by a signal handler. - * - ****************************************************************************/ - -int nxmq_timedsend(mqd_t mqdes, FAR const char *msg, size_t msglen, - unsigned int prio, FAR const struct timespec *abstime) -{ - FAR struct file *filep; - int ret; - - ret = fs_getfilep(mqdes, &filep); - if (ret < 0) - { - return ret; - } - - ret = file_mq_timedsend_internal(filep, msg, msglen, prio, abstime, 0); - fs_putfilep(filep); - return ret; -} - -/**************************************************************************** - * Name: mq_timedsend - * - * Description: - * This function adds the specified message (msg) to the message queue - * (mqdes). The "msglen" parameter specifies the length of the message - * in bytes pointed to by "msg." This length must not exceed the maximum - * message length from the mq_getattr(). - * - * If the message queue is not full, mq_timedsend() place the message in - * the message queue at the position indicated by the "prio" argrument. - * Messages with higher priority will be inserted before lower priority - * messages. The value of "prio" must not exceed MQ_PRIO_MAX. - * - * If the specified message queue is full and O_NONBLOCK is not set in the - * message queue, then mq_timedsend() will block until space becomes - * available to the queue the message or a timeout occurs. - * - * mq_timedsend() behaves just like mq_send(), except that if the queue - * is full and the O_NONBLOCK flag is not enabled for the message queue - * description, then abstime points to a structure which specifies a - * ceiling on the time for which the call will block. This ceiling is an - * absolute timeout in seconds and nanoseconds since the Epoch (midnight - * on the morning of 1 January 1970). - * - * If the message queue is full, and the timeout has already expired by - * the time of the call, mq_timedsend() returns immediately. - * - * Input Parameters: - * mqdes - Message queue descriptor - * msg - Message to send - * msglen - The length of the message in bytes - * prio - The priority of the message - * abstime - the absolute time to wait until a timeout is decleared - * - * Returned Value: - * On success, mq_send() returns 0 (OK); on error, -1 (ERROR) - * is returned, with errno set to indicate the error: - * - * EAGAIN The queue was full, and the O_NONBLOCK flag was set for the - * message queue description referred to by mqdes. - * EINVAL Either msg or mqdes is NULL or the value of prio is invalid. - * EBADF Message queue opened not opened for writing. - * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the - * message queue. - * EINTR The call was interrupted by a signal handler. - * - * Assumptions/restrictions: - * - ****************************************************************************/ - -int mq_timedsend(mqd_t mqdes, FAR const char *msg, size_t msglen, - unsigned int prio, FAR const struct timespec *abstime) -{ - int ret; - - /* mq_timedsend() is a cancellation point */ - - enter_cancellation_point(); - - /* Let nxmq_send() do all of the work */ - - ret = nxmq_timedsend(mqdes, msg, msglen, prio, abstime); - if (ret < 0) - { - set_errno(-ret); - ret = ERROR; - } - - leave_cancellation_point(); - return ret; -} diff --git a/sched/mqueue/mqueue.h b/sched/mqueue/mqueue.h index 13bd2eb79e..80b500db2c 100644 --- a/sched/mqueue/mqueue.h +++ b/sched/mqueue/mqueue.h @@ -109,6 +109,9 @@ struct task_group_s; /* Forward reference */ /* Functions defined in mq_initialize.c *************************************/ void nxmq_initialize(void); + +/* mq_msgfree.c *************************************************************/ + void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg); /* mq_waitirq.c *************************************************************/ @@ -117,30 +120,18 @@ void nxmq_wait_irq(FAR struct tcb_s *wtcb, int errcode); /* mq_rcvinternal.c *********************************************************/ -#ifdef CONFIG_DEBUG_FEATURES -int nxmq_verify_receive(FAR struct file *mq, FAR char *msg, size_t msglen); -#else -# define nxmq_verify_receive(msgq, msg, msglen) OK -#endif int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq, - int oflags, FAR struct mqueue_msg_s **rcvmsg); -ssize_t nxmq_do_receive(FAR struct mqueue_inode_s *msgq, - FAR struct mqueue_msg_s *mqmsg, - FAR char *ubuffer, FAR unsigned int *prio); + FAR struct mqueue_msg_s **rcvmsg, + FAR const struct timespec *abstime, + sclock_t ticks); +void nxmq_notify_receive(FAR struct mqueue_inode_s *msgq); /* mq_sndinternal.c *********************************************************/ -#ifdef CONFIG_DEBUG_FEATURES -int nxmq_verify_send(FAR struct file *mq, FAR const char *msg, - size_t msglen, unsigned int prio); -#else -# define nxmq_verify_send(mq, msg, msglen, prio) OK -#endif -FAR struct mqueue_msg_s *nxmq_alloc_msg(void); -int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags); -int nxmq_do_send(FAR struct mqueue_inode_s *msgq, - FAR struct mqueue_msg_s *mqmsg, - FAR const char *msg, size_t msglen, unsigned int prio); +int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, + FAR const struct timespec *abstime, + sclock_t ticks); +void nxmq_notify_send(FAR struct mqueue_inode_s *msgq); /* mq_recover.c *************************************************************/