mqueue: refactor of mqueue

1. mainly for long time within critical_section
2. move out memcpy buffer from enter_critical_section()
3. let mq_send() return fail when mq buffer full in ISR

Signed-off-by: ligd <liguiding1@xiaomi.com>
This commit is contained in:
ligd 2024-09-12 20:41:06 +08:00 committed by Xiang Xiao
parent 083dd03018
commit 1e7631e695
10 changed files with 1137 additions and 1419 deletions

View file

@ -32,10 +32,8 @@ if(NOT CONFIG_DISABLE_MQUEUE)
APPEND
SRCS
mq_send.c
mq_timedsend.c
mq_sndinternal.c
mq_receive.c
mq_timedreceive.c
mq_rcvinternal.c
mq_msgfree.c
mq_msgqalloc.c

View file

@ -26,8 +26,8 @@ endif
ifneq ($(CONFIG_DISABLE_MQUEUE),y)
CSRCS += mq_send.c mq_timedsend.c mq_sndinternal.c mq_receive.c
CSRCS += mq_timedreceive.c mq_rcvinternal.c mq_getattr.c
CSRCS += mq_send.c mq_sndinternal.c mq_receive.c
CSRCS += mq_rcvinternal.c mq_getattr.c
CSRCS += mq_msgfree.c mq_msgqalloc.c mq_msgqfree.c
CSRCS += mq_setattr.c mq_notify.c

View file

@ -31,6 +31,7 @@
#include <nuttx/irq.h>
#include <nuttx/arch.h>
#include <nuttx/kmalloc.h>
#include <nuttx/spinlock.h>
#include "mqueue/mqueue.h"
@ -56,6 +57,8 @@
void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg)
{
irqstate_t flags;
/* If this is a generally available pre-allocated message,
* then just put it back in the free list.
*/
@ -66,7 +69,9 @@ void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg)
* list from interrupt handlers.
*/
flags = spin_lock_irqsave(NULL);
list_add_tail(&g_msgfree, &mqmsg->node);
spin_unlock_irqrestore(NULL, flags);
}
/* If this is a message pre-allocated for interrupts,
@ -79,7 +84,9 @@ void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg)
* list from interrupt handlers.
*/
flags = spin_lock_irqsave(NULL);
list_add_tail(&g_msgfreeirq, &mqmsg->node);
spin_unlock_irqrestore(NULL, flags);
}
/* Otherwise, deallocate it. Note: interrupt handlers

View file

@ -44,66 +44,56 @@
#include "mqueue/mqueue.h"
/****************************************************************************
* Public Functions
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: nxmq_verify_receive
* Name: nxmq_rcvtimeout
*
* Description:
* This is internal, common logic shared by both [nx]mq_receive and
* [nx]mq_timedreceive. This function verifies the input parameters that
* are common to both functions.
* This function is called if the timeout elapses before the message queue
* becomes non-empty.
*
* Input Parameters:
* msgq - Message queue descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* arg - the argument provided when the timeout was configured.
*
* Returned Value:
* On success, zero (OK) is returned. A negated errno value is returned
* on any failure:
* None
*
* EBADF Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the message
* queue.
* EINVAL Invalid 'msg' or 'msgq'
* Assumptions:
*
****************************************************************************/
#ifdef CONFIG_DEBUG_FEATURES
int nxmq_verify_receive(FAR struct file *mq, FAR char *msg, size_t msglen)
static void nxmq_rcvtimeout(wdparm_t arg)
{
FAR struct inode *inode = mq->f_inode;
FAR struct mqueue_inode_s *msgq;
FAR struct tcb_s *wtcb = (FAR struct tcb_s *)(uintptr_t)arg;
irqstate_t flags;
if (inode == NULL)
/* Disable interrupts. This is necessary because an interrupt handler may
* attempt to send a message while we are doing this.
*/
flags = enter_critical_section();
/* It is also possible that an interrupt/context switch beat us to the
* punch and already changed the task's state.
*/
if (wtcb->task_state == TSTATE_WAIT_MQNOTEMPTY)
{
return -EBADF;
/* Restart with task with a timeout error */
nxmq_wait_irq(wtcb, ETIMEDOUT);
}
msgq = inode->i_private;
/* Interrupts may now be re-enabled. */
/* Verify the input parameters */
if (!msg || !msgq)
{
return -EINVAL;
}
if ((mq->f_oflags & O_RDOK) == 0)
{
return -EBADF;
}
if (msglen < (size_t)msgq->maxmsgsize)
{
return -EMSGSIZE;
}
return OK;
leave_critical_section(flags);
}
#endif
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: nxmq_wait_receive
@ -116,9 +106,10 @@ int nxmq_verify_receive(FAR struct file *mq, FAR char *msg, size_t msglen)
*
* Input Parameters:
* msgq - Message queue descriptor
* oflags - flags from user set
* rcvmsg - The caller-provided location in which to return the newly
* received message.
* abstime - If non-NULL, this is the absolute time to wait until a
* message is received.
*
* Returned Value:
* On success, zero (OK) is returned. A negated errno value is returned
@ -134,12 +125,12 @@ int nxmq_verify_receive(FAR struct file *mq, FAR char *msg, size_t msglen)
****************************************************************************/
int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq,
int oflags, FAR struct mqueue_msg_s **rcvmsg)
FAR struct mqueue_msg_s **rcvmsg,
FAR const struct timespec *abstime,
sclock_t ticks)
{
FAR struct mqueue_msg_s *newmsg;
FAR struct tcb_s *rtcb;
DEBUGASSERT(rcvmsg != NULL);
FAR struct tcb_s *rtcb = this_task();
#ifdef CONFIG_CANCELLATION_POINTS
/* nxmq_wait_receive() is not a cancellation point, but it may be called
@ -156,139 +147,90 @@ int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq,
}
#endif
if (abstime)
{
wd_start_realtime(&rtcb->waitdog, abstime,
nxmq_rcvtimeout, (wdparm_t)rtcb);
}
else if (ticks >= 0)
{
wd_start(&rtcb->waitdog, ticks,
nxmq_rcvtimeout, (wdparm_t)rtcb);
}
/* Get the message from the head of the queue */
while ((newmsg = (FAR struct mqueue_msg_s *)
list_remove_head(&msgq->msglist)) == NULL)
{
/* The queue is empty! Should we block until there the above condition
* has been satisfied?
msgq->cmn.nwaitnotempty++;
/* Initialize the 'errcode" used to communication wake-up error
* conditions.
*/
if ((oflags & O_NONBLOCK) == 0)
rtcb->waitobj = msgq;
rtcb->errcode = OK;
/* Remove the tcb task from the running list. */
nxsched_remove_self(rtcb);
/* Add the task to the specified blocked task list */
rtcb->task_state = TSTATE_WAIT_MQNOTEMPTY;
nxsched_add_prioritized(rtcb, MQ_WNELIST(msgq->cmn));
/* Now, perform the context switch */
up_switch_context(this_task(), rtcb);
/* When we resume at this point, either (1) the message queue
* is no longer empty, or (2) the wait has been interrupted by
* a signal. We can detect the latter case be examining the
* errno value (should be either EINTR or ETIMEDOUT).
*/
if (rtcb->errcode != OK)
{
/* Yes.. Block and try again */
rtcb = this_task();
rtcb->waitobj = msgq;
msgq->cmn.nwaitnotempty++;
/* Initialize the 'errcode" used to communication wake-up error
* conditions.
*/
rtcb->errcode = OK;
/* Make sure this is not the idle task, descheduling that
* isn't going to end well.
*/
DEBUGASSERT(!is_idle_task(rtcb));
/* Remove the tcb task from the running list. */
nxsched_remove_self(rtcb);
/* Add the task to the specified blocked task list */
rtcb->task_state = TSTATE_WAIT_MQNOTEMPTY;
nxsched_add_prioritized(rtcb, MQ_WNELIST(msgq->cmn));
/* Now, perform the context switch */
up_switch_context(this_task(), rtcb);
/* When we resume at this point, either (1) the message queue
* is no longer empty, or (2) the wait has been interrupted by
* a signal. We can detect the latter case be examining the
* errno value (should be either EINTR or ETIMEDOUT).
*/
if (rtcb->errcode != OK)
{
return -rtcb->errcode;
}
}
else
{
/* The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description.
*/
return -EAGAIN;
break;
}
}
/* If we got message, then decrement the number of messages in
* the queue while we are still in the critical section
*/
if (newmsg)
if (abstime || ticks >= 0)
{
if (msgq->nmsgs-- == msgq->maxmsgs)
{
nxmq_pollnotify(msgq, POLLOUT);
}
wd_cancel(&rtcb->waitdog);
}
*rcvmsg = newmsg;
return OK;
return -rtcb->errcode;
}
/****************************************************************************
* Name: nxmq_do_receive
* Name: nxmq_notify_receive
*
* Description:
* This is internal, common logic shared by both [nx]mq_receive and
* [nx]mq_timedreceive. This function accepts the message obtained by
* mq_waitmsg, provides the message content to the user, notifies any
* threads that were waiting for the message queue to become non-full,
* and disposes of the message structure
* [nx]mq_timedreceive.
* This function notifies any tasks that are waiting for the message queue
* to become non-empty. This function is called after a message is
* received from the message queue.
*
* Input Parameters:
* msgq - Message queue descriptor
* mqmsg - The message obtained by mq_waitmsg()
* ubuffer - The address of the user provided buffer to receive the message
* prio - The user-provided location to return the message priority.
*
* Returned Value:
* Returns the length of the received message. This function does not
* fail.
*
* Assumptions:
* - The caller has provided all validity checking of the input parameters
* using nxmq_verify_receive.
* - The user buffer, ubuffer, is known to be large enough to accept the
* largest message that an be sent on this message queue
* - Pre-emption should be disabled throughout this call.
*
****************************************************************************/
ssize_t nxmq_do_receive(FAR struct mqueue_inode_s *msgq,
FAR struct mqueue_msg_s *mqmsg,
FAR char *ubuffer, FAR unsigned int *prio)
void nxmq_notify_receive(FAR struct mqueue_inode_s *msgq)
{
FAR struct tcb_s *btcb;
ssize_t rcvmsglen;
/* Get the length of the message (also the return value) */
rcvmsglen = mqmsg->msglen;
/* Copy the message into the caller's buffer */
memcpy(ubuffer, (FAR const void *)mqmsg->mail, rcvmsglen);
/* Copy the message priority as well (if a buffer is provided) */
if (prio)
{
*prio = mqmsg->priority;
}
/* We are done with the message. Deallocate it now. */
nxmq_free_msg(mqmsg);
/* Check if any tasks are waiting for the MQ not full event. */
@ -331,8 +273,4 @@ ssize_t nxmq_do_receive(FAR struct mqueue_inode_s *msgq,
up_switch_context(btcb, rtcb);
}
}
/* Return the length of the message transferred to the user buffer */
return rcvmsglen;
}

View file

@ -32,6 +32,7 @@
#include <errno.h>
#include <mqueue.h>
#include <debug.h>
#include <fcntl.h>
#include <nuttx/irq.h>
#include <nuttx/arch.h>
@ -40,10 +41,418 @@
#include "mqueue/mqueue.h"
/****************************************************************************
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: nxmq_verify_receive
*
* Description:
* This is internal, common logic shared by both [nx]mq_receive and
* [nx]mq_timedreceive. This function verifies the input parameters that
* are common to both functions.
*
* Input Parameters:
* msgq - Message queue descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
*
* Returned Value:
* On success, zero (OK) is returned. A negated errno value is returned
* on any failure:
*
* EBADF Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the message
* queue.
* EINVAL Invalid 'msg' or 'msgq'
*
****************************************************************************/
#ifdef CONFIG_DEBUG_FEATURES
static int nxmq_verify_receive(FAR struct file *mq,
FAR char *msg, size_t msglen)
{
FAR struct inode *inode = mq->f_inode;
FAR struct mqueue_inode_s *msgq;
if (inode == NULL)
{
return -EBADF;
}
msgq = inode->i_private;
/* Verify the input parameters */
if (!msg || !msgq)
{
return -EINVAL;
}
if ((mq->f_oflags & O_RDOK) == 0)
{
return -EBADF;
}
if (msglen < (size_t)msgq->maxmsgsize)
{
return -EMSGSIZE;
}
return OK;
}
#endif
/****************************************************************************
* Name: file_mq_timedreceive_internal
*
* Description:
* This is an internal function of file_mq_timedreceive()/
* file_mq_tickreceive(), please refer to the detailed description for
* more information.
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
*
* Returned Value:
* On success, the length of the selected message in bytes is returned.
* On failure, -1 (ERROR) is returned and the errno is set appropriately:
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set
* for the message queue description referred to by 'mqdes'.
* EPERM Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
* EINVAL Invalid 'msg' or 'mqdes' or 'abstime'
* ETIMEDOUT The call timed out before a message could be transferred.
*
****************************************************************************/
static
ssize_t file_mq_timedreceive_internal(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
FAR const struct timespec *abstime,
sclock_t ticks)
{
FAR struct mqueue_inode_s *msgq;
FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags;
ssize_t ret = 0;
DEBUGASSERT(up_interrupt_context() == false);
/* Verify the input parameters */
if (abstime && (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000))
{
return -EINVAL;
}
if (mq == NULL)
{
return -EINVAL;
}
#ifdef CONFIG_DEBUG_FEATURES
/* Verify the input parameters and, in case of an error, set
* errno appropriately.
*/
ret = nxmq_verify_receive(mq, msg, msglen);
if (ret < 0)
{
return ret;
}
#endif
msgq = mq->f_inode->i_private;
/* Furthermore, nxmq_wait_receive() expects to have interrupts disabled
* because messages can be sent from interrupt level.
*/
flags = enter_critical_section();
/* Get the message from the message queue */
mqmsg = (FAR struct mqueue_msg_s *)list_remove_head(&msgq->msglist);
if (mqmsg == NULL)
{
if ((mq->f_oflags & O_NONBLOCK) != 0)
{
leave_critical_section(flags);
return -EAGAIN;
}
/* Wait & get the message from the message queue */
ret = nxmq_wait_receive(msgq, &mqmsg, abstime, ticks);
if (ret < 0)
{
leave_critical_section(flags);
return ret;
}
}
/* If we got message, then decrement the number of messages in
* the queue while we are still in the critical section
*/
if (msgq->nmsgs-- == msgq->maxmsgs)
{
nxmq_pollnotify(msgq, POLLOUT);
}
/* Notify all threads waiting for a message in the message queue */
nxmq_notify_receive(msgq);
leave_critical_section(flags);
/* Return the message to the caller */
if (prio)
{
*prio = mqmsg->priority;
}
memcpy(msg, mqmsg->mail, mqmsg->msglen);
ret = mqmsg->msglen;
/* Free the message structure */
nxmq_free_msg(mqmsg);
return ret;
}
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: file_mq_timedreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mq." If the message queue is empty
* and O_NONBLOCK was not set, file_mq_timedreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* file_mq_timedreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
*
* Returned Value:
* On success, the length of the selected message in bytes is returned.
* On failure, -1 (ERROR) is returned and the errno is set appropriately:
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set
* for the message queue description referred to by 'mqdes'.
* EPERM Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
* EINVAL Invalid 'msg' or 'mqdes' or 'abstime'
* ETIMEDOUT The call timed out before a message could be transferred.
*
****************************************************************************/
ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
FAR const struct timespec *abstime)
{
return file_mq_timedreceive_internal(mq, msg, msglen, prio, abstime, -1);
}
/****************************************************************************
* Name: file_mq_tickreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mq." If the message queue is empty
* and O_NONBLOCK was not set, file_mq_tickreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* file_mq_tickreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/
ssize_t file_mq_tickreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
sclock_t ticks)
{
return file_mq_timedreceive_internal(mq, msg, msglen, prio, NULL, ticks);
}
/****************************************************************************
* Name: nxmq_timedreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mqdes." If the message queue is empty
* and O_NONBLOCK was not set, nxmq_timedreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* nxmq_timedreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
*
* Returned Value:
* On success, the length of the selected message in bytes is returned.
* On failure, -1 (ERROR) is returned and the errno is set appropriately:
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set
* for the message queue description referred to by 'mqdes'.
* EPERM Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
* EINVAL Invalid 'msg' or 'mqdes' or 'abstime'
* ETIMEDOUT The call timed out before a message could be transferred.
*
****************************************************************************/
ssize_t nxmq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
FAR unsigned int *prio,
FAR const struct timespec *abstime)
{
FAR struct file *filep;
ssize_t ret;
ret = fs_getfilep(mqdes, &filep);
if (ret < 0)
{
return ret;
}
ret = file_mq_timedreceive_internal(filep, msg, msglen, prio, abstime, -1);
fs_putfilep(filep);
return ret;
}
/****************************************************************************
* Name: mq_timedreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mqdes." If the size of the buffer in
* bytes (msglen) is less than the "mq_msgsize" attribute of the message
* queue, mq_timedreceive will return an error. Otherwise, the selected
* message is removed from the queue and copied to "msg."
*
* If the message queue is empty and O_NONBLOCK was not set,
* mq_timedreceive() will block until a message is added to the message
* queue (or until a timeout occurs). If more than one task is waiting
* to receive a message, only the task with the highest priority that has
* waited the longest will be unblocked.
*
* mq_timedreceive() behaves just like mq_receive(), except that if the
* queue is empty and the O_NONBLOCK flag is not enabled for the message
* queue description, then abstime points to a structure which specifies a
* ceiling on the time for which the call will block. This ceiling is an
* absolute timeout in seconds and nanoseconds since the Epoch (midnight
* on the morning of 1 January 1970).
*
* If no message is available, and the timeout has already expired by the
* time of the call, mq_timedreceive() returns immediately.
*
* Input Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
*
* Returned Value:
* On success, the length of the selected message in bytes is returned.
* On failure, -1 (ERROR) is returned and the errno is set appropriately:
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set
* for the message queue description referred to by 'mqdes'.
* EPERM Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
* EINVAL Invalid 'msg' or 'mqdes' or 'abstime'
* ETIMEDOUT The call timed out before a message could be transferred.
*
****************************************************************************/
ssize_t mq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
FAR unsigned int *prio,
FAR const struct timespec *abstime)
{
int ret;
/* mq_timedreceive() is a cancellation point */
enter_cancellation_point();
/* Let nxmq_timedreceive do all of the work */
ret = nxmq_timedreceive(mqdes, msg, msglen, prio, abstime);
if (ret < 0)
{
set_errno(-ret);
ret = ERROR;
}
leave_cancellation_point();
return ret;
}
/****************************************************************************
* Name: file_mq_receive
*
@ -75,50 +484,7 @@
ssize_t file_mq_receive(FAR struct file *mq, FAR char *msg, size_t msglen,
FAR unsigned int *prio)
{
FAR struct mqueue_inode_s *msgq;
FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags;
ssize_t ret;
DEBUGASSERT(up_interrupt_context() == false);
/* Verify the input parameters and, in case of an error, set
* errno appropriately.
*/
ret = nxmq_verify_receive(mq, msg, msglen);
if (ret < 0)
{
return ret;
}
msgq = mq->f_inode->i_private;
/* Furthermore, nxmq_wait_receive() expects to have interrupts disabled
* because messages can be sent from interrupt level.
*/
flags = enter_critical_section();
/* Get the message from the message queue */
ret = nxmq_wait_receive(msgq, mq->f_oflags, &mqmsg);
/* Check if we got a message from the message queue. We might
* not have a message if:
*
* - The message queue is empty and O_NONBLOCK is set in the mq
* - The wait was interrupted by a signal
*/
if (ret == OK)
{
ret = nxmq_do_receive(msgq, mqmsg, msg, prio);
}
leave_critical_section(flags);
return ret;
return file_mq_timedreceive_internal(mq, msg, msglen, prio, NULL, -1);
}
/****************************************************************************

View file

@ -31,17 +31,578 @@
#include <errno.h>
#include <mqueue.h>
#include <sys/types.h>
#include <fcntl.h>
#include <nuttx/arch.h>
#include <nuttx/cancelpt.h>
#include <nuttx/kmalloc.h>
#include <nuttx/spinlock.h>
#include <nuttx/irq.h>
#include "mqueue/mqueue.h"
/****************************************************************************
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: nxmq_verify_send
*
* Description:
* This is internal, common logic shared by both [nx]mq_send and
* [nx]mq_timesend. This function verifies the input parameters that are
* common to both functions.
*
* Input Parameters:
* msgq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
*
* Returned Value:
* On success, 0 (OK) is returned. On failure, a negated errno value is
* returned.
*
* EINVAL Either msg or msgq is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
*
****************************************************************************/
#ifdef CONFIG_DEBUG_FEATURES
static int nxmq_verify_send(FAR FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio)
{
FAR struct inode *inode = mq->f_inode;
FAR struct mqueue_inode_s *msgq;
if (inode == NULL)
{
return -EBADF;
}
msgq = inode->i_private;
/* Verify the input parameters */
if (msg == NULL || msgq == NULL || prio >= MQ_PRIO_MAX)
{
return -EINVAL;
}
if ((mq->f_oflags & O_WROK) == 0)
{
return -EBADF;
}
if (msglen > (size_t)msgq->maxmsgsize)
{
return -EMSGSIZE;
}
return OK;
}
#endif
/****************************************************************************
* Name: nxmq_alloc_msg
*
* Description:
* The nxmq_alloc_msg function will get a free message for use by the
* operating system. The message will be allocated from the g_msgfree
* list.
*
* If the list is empty AND the message is NOT being allocated from the
* interrupt level, then the message will be allocated. If a message
* cannot be obtained, the operating system is dead and therefore cannot
* continue.
*
* If the list is empty AND the message IS being allocated from the
* interrupt level. This function will attempt to get a message from
* the g_msgfreeirq list. If this is unsuccessful, the calling interrupt
* handler will be notified.
*
* Input Parameters:
* None
*
* Returned Value:
* A reference to the allocated msg structure. On a failure to allocate,
* this function PANICs.
*
****************************************************************************/
static FAR struct mqueue_msg_s *nxmq_alloc_msg(uint16_t maxmsgsize)
{
FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags;
/* Try to get the message from the generally available free list. */
flags = spin_lock_irqsave(NULL);
mqmsg = (FAR struct mqueue_msg_s *)list_remove_head(&g_msgfree);
spin_unlock_irqrestore(NULL, flags);
if (mqmsg == NULL)
{
/* If we were called from an interrupt handler, then try to get the
* message from generally available list of messages. If this fails,
* then try the list of messages reserved for interrupt handlers
*/
if (up_interrupt_context())
{
/* Try the free list reserved for interrupt handlers */
flags = spin_lock_irqsave(NULL);
mqmsg = (FAR struct mqueue_msg_s *)list_remove_head(&g_msgfreeirq);
spin_unlock_irqrestore(NULL, flags);
}
/* We were not called from an interrupt handler. */
else
{
/* If we cannot a message from the free list, then we will have to
* allocate one.
*/
mqmsg = kmm_malloc((sizeof (struct mqueue_msg_s)));
/* Check if we allocated the message */
if (mqmsg != NULL)
{
/* Yes... remember that this message was dynamically
* allocated.
*/
mqmsg->type = MQ_ALLOC_DYN;
}
}
}
return mqmsg;
}
/****************************************************************************
* Name: nxmq_add_queue
*
* Description:
* This is internal, common logic shared by both [nx]mq_send and
* [nx]mq_timesend. This function adds the specified message (msg) to the
* message queue (msgq). Then it notifies any tasks that were waiting
* for message queue notifications setup by mq_notify. And, finally, it
* awakens any tasks that were waiting for the message not empty event.
*
* Input Parameters:
* msgq - Message queue descriptor
* msg - Message to send
*
* Returned Value:
* This function always returns OK.
*
****************************************************************************/
static void nxmq_add_queue(FAR struct mqueue_inode_s *msgq,
FAR struct mqueue_msg_s *mqmsg,
unsigned int prio)
{
FAR struct mqueue_msg_s *prev = NULL;
FAR struct mqueue_msg_s *next;
/* Insert the new message in the message queue
* Search the message list to find the location to insert the new
* message. Each is list is maintained in ascending priority order.
*/
list_for_every_entry(&msgq->msglist, next, struct mqueue_msg_s, node)
{
if (prio > next->priority)
{
break;
}
else
{
prev = next;
}
}
/* Add the message at the right place */
if (prev)
{
list_add_after(&prev->node, &mqmsg->node);
}
else
{
list_add_head(&msgq->msglist, &mqmsg->node);
}
}
/****************************************************************************
* Name: file_mq_timedsend_internal
*
* Description:
* This is an internal function of file_mq_timedsend()/file_mq_ticksend(),
* please refer to the detailed description for more information.
*
* Input Parameters:
* mq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* abstime - the absolute time to wait until a timeout is decleared
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedsend() for the list list valid return values).
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by mq.
* EINVAL Either msg or mq is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
static
int file_mq_timedsend_internal(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio,
FAR const struct timespec *abstime,
sclock_t ticks)
{
FAR struct mqueue_inode_s *msgq;
FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags;
int ret = 0;
/* Verify the input parameters */
if (abstime && (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000))
{
return -EINVAL;
}
if (mq == NULL)
{
return -EINVAL;
}
#ifdef CONFIG_DEBUG_FEATURES
/* Verify the input parameters on any failures to verify. */
ret = nxmq_verify_send(mq, msg, msglen, prio);
if (ret < 0)
{
return ret;
}
#endif
msgq = mq->f_inode->i_private;
/* Pre-allocate a message structure */
mqmsg = nxmq_alloc_msg(msgq->maxmsgsize);
if (!mqmsg)
{
return -ENOMEM;
}
memcpy(mqmsg->mail, msg, msglen);
mqmsg->priority = prio;
mqmsg->msglen = msglen;
/* Disable interruption */
flags = enter_critical_section();
if (msgq->nmsgs >= msgq->maxmsgs)
{
/* Verify that the message is full and we can't wait */
if ((up_interrupt_context() || (mq->f_oflags & O_NONBLOCK) != 0))
{
ret = -EAGAIN;
goto out;
}
/* The message queue is full. We will need to wait for the message
* queue to become non-full.
*/
ret = nxmq_wait_send(msgq, abstime, ticks);
if (ret < 0)
{
goto out;
}
}
/* Add the message to the message queue */
nxmq_add_queue(msgq, mqmsg, prio);
/* Increment the count of messages in the queue */
if (msgq->nmsgs++ == 0)
{
nxmq_pollnotify(msgq, POLLIN);
}
/* Notify any tasks that are waiting for a message to become available */
nxmq_notify_send(msgq);
out:
leave_critical_section(flags);
if (ret < 0)
{
nxmq_free_msg(mqmsg);
}
return ret;
}
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: file_mq_timedsend
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mq). file_mq_timedsend() behaves just like mq_send(), except that if
* the queue is full and the O_NONBLOCK flag is not enabled for the
* message queue description, then abstime points to a structure which
* specifies a ceiling on the time for which the call will block.
*
* file_mq_timedsend() is functionally equivalent to mq_timedsend() except
* that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedsend() for a more complete description of the
* behavior of this function
*
* Input Parameters:
* mq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* abstime - the absolute time to wait until a timeout is decleared
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedsend() for the list list valid return values).
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by mq.
* EINVAL Either msg or mq is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio,
FAR const struct timespec *abstime)
{
return file_mq_timedsend_internal(mq, msg, msglen, prio, abstime, -1);
}
/****************************************************************************
* Name: file_mq_ticksend
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mq). file_mq_ticksend() behaves just like mq_send(), except that if
* the queue is full and the O_NONBLOCK flag is not enabled for the
* message queue description, then abstime points to a structure which
* specifies a ceiling on the time for which the call will block.
*
* file_mq_ticksend() is functionally equivalent to mq_timedsend() except
* that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedsend() for a more complete description of the
* behavior of this function
*
* Input Parameters:
* mq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedsend() for the list list valid return values).
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by mq.
* EINVAL Either msg or mq is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
int file_mq_ticksend(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio, sclock_t ticks)
{
return file_mq_timedsend_internal(mq, msg, msglen, prio, NULL, ticks);
}
/****************************************************************************
* Name: nxmq_timedsend
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mqdes). nxmq_timedsend() behaves just like mq_send(), except
* that if the queue is full and the O_NONBLOCK flag is not enabled for
* the message queue description, then abstime points to a structure which
* specifies a ceiling on the time for which the call will block.
*
* nxmq_timedsend() is functionally equivalent to mq_timedsend() except
* that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedsend() for a more complete description of the
* behavior of this function
*
* Input Parameters:
* mqdes - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* abstime - the absolute time to wait until a timeout is decleared
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedsend() for the list list valid return values).
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by mqdes.
* EINVAL Either msg or mqdes is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
int nxmq_timedsend(mqd_t mqdes, FAR const char *msg, size_t msglen,
unsigned int prio, FAR const struct timespec *abstime)
{
FAR struct file *filep;
int ret;
ret = fs_getfilep(mqdes, &filep);
if (ret < 0)
{
return ret;
}
ret = file_mq_timedsend_internal(filep, msg, msglen, prio, abstime, -1);
fs_putfilep(filep);
return ret;
}
/****************************************************************************
* Name: mq_timedsend
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mqdes). The "msglen" parameter specifies the length of the message
* in bytes pointed to by "msg." This length must not exceed the maximum
* message length from the mq_getattr().
*
* If the message queue is not full, mq_timedsend() place the message in
* the message queue at the position indicated by the "prio" argrument.
* Messages with higher priority will be inserted before lower priority
* messages. The value of "prio" must not exceed MQ_PRIO_MAX.
*
* If the specified message queue is full and O_NONBLOCK is not set in the
* message queue, then mq_timedsend() will block until space becomes
* available to the queue the message or a timeout occurs.
*
* mq_timedsend() behaves just like mq_send(), except that if the queue
* is full and the O_NONBLOCK flag is not enabled for the message queue
* description, then abstime points to a structure which specifies a
* ceiling on the time for which the call will block. This ceiling is an
* absolute timeout in seconds and nanoseconds since the Epoch (midnight
* on the morning of 1 January 1970).
*
* If the message queue is full, and the timeout has already expired by
* the time of the call, mq_timedsend() returns immediately.
*
* Input Parameters:
* mqdes - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* abstime - the absolute time to wait until a timeout is decleared
*
* Returned Value:
* On success, mq_send() returns 0 (OK); on error, -1 (ERROR)
* is returned, with errno set to indicate the error:
*
* EAGAIN The queue was full, and the O_NONBLOCK flag was set for the
* message queue description referred to by mqdes.
* EINVAL Either msg or mqdes is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
* Assumptions/restrictions:
*
****************************************************************************/
int mq_timedsend(mqd_t mqdes, FAR const char *msg, size_t msglen,
unsigned int prio, FAR const struct timespec *abstime)
{
int ret;
/* mq_timedsend() is a cancellation point */
enter_cancellation_point();
/* Let nxmq_send() do all of the work */
ret = nxmq_timedsend(mqdes, msg, msglen, prio, abstime);
if (ret < 0)
{
set_errno(-ret);
ret = ERROR;
}
leave_cancellation_point();
return ret;
}
/****************************************************************************
* Name: file_mq_send
*
@ -73,72 +634,7 @@
int file_mq_send(FAR struct file *mq, FAR const char *msg, size_t msglen,
unsigned int prio)
{
FAR struct mqueue_inode_s *msgq;
FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags;
int ret;
/* Verify the input parameters -- setting errno appropriately
* on any failures to verify.
*/
ret = nxmq_verify_send(mq, msg, msglen, prio);
if (ret < 0)
{
return ret;
}
msgq = mq->f_inode->i_private;
/* Allocate a message structure:
* - Immediately if we are called from an interrupt handler.
* - Immediately if the message queue is not full, or
* - After successfully waiting for the message queue to become
* non-FULL. This would fail with EAGAIN, EINTR, or ETIMEDOUT.
*/
flags = enter_critical_section();
if (!up_interrupt_context()) /* In an interrupt handler? */
{
/* No.. Not in an interrupt handler. Is the message queue FULL? */
if (msgq->nmsgs >= msgq->maxmsgs)
{
/* Yes.. the message queue is full. Wait for space to become
* available in the message queue.
*/
ret = nxmq_wait_send(msgq, mq->f_oflags);
}
}
/* ret can only be negative if nxmq_wait_send failed */
if (ret == OK)
{
/* Now allocate the message. */
mqmsg = nxmq_alloc_msg();
DEBUGASSERT(mqmsg != NULL);
/* Check if the message was successfully allocated */
/* The allocation was successful (implying that we can also send the
* message). Perform the message send.
*
* NOTE: There is a race condition here: What if a message is added by
* interrupt related logic so that queue again becomes non-empty.
* That is handled because nxmq_do_send() will permit the maxmsgs limit
* to be exceeded in that case.
*/
ret = nxmq_do_send(msgq, mqmsg, msg, msglen, prio);
}
leave_critical_section(flags);
return ret;
return file_mq_timedsend_internal(mq, msg, msglen, prio, NULL, -1);
}
/****************************************************************************
@ -181,7 +677,7 @@ int nxmq_send(mqd_t mqdes, FAR const char *msg, size_t msglen,
return ret;
}
ret = file_mq_send(filep, msg, msglen, prio);
ret = file_mq_timedsend_internal(filep, msg, msglen, prio, NULL, -1);
fs_putfilep(filep);
return ret;
}

View file

@ -37,7 +37,6 @@
#include <debug.h>
#include <nuttx/irq.h>
#include <nuttx/kmalloc.h>
#include <nuttx/arch.h>
#include <nuttx/sched.h>
#include <nuttx/cancelpt.h>
@ -46,144 +45,57 @@
#include "mqueue/mqueue.h"
/****************************************************************************
* Public Functions
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: nxmq_verify_send
* Name: nxmq_sndtimeout
*
* Description:
* This is internal, common logic shared by both [nx]mq_send and
* [nx]mq_timesend. This function verifies the input parameters that are
* common to both functions.
* This function is called if the timeout elapses before the message queue
* becomes non-full.
*
* Input Parameters:
* msgq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* arg - The argument that was provided when the timeout was configured.
*
* Returned Value:
* On success, 0 (OK) is returned. On failure, a negated errno value is
* returned.
*
* EINVAL Either msg or msgq is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
*
****************************************************************************/
#ifdef CONFIG_DEBUG_FEATURES
int nxmq_verify_send(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio)
{
FAR struct inode *inode = mq->f_inode;
FAR struct mqueue_inode_s *msgq;
if (inode == NULL)
{
return -EBADF;
}
msgq = inode->i_private;
/* Verify the input parameters */
if (msg == NULL || msgq == NULL || prio >= MQ_PRIO_MAX)
{
return -EINVAL;
}
if ((mq->f_oflags & O_WROK) == 0)
{
return -EBADF;
}
if (msglen > (size_t)msgq->maxmsgsize)
{
return -EMSGSIZE;
}
return OK;
}
#endif
/****************************************************************************
* Name: nxmq_alloc_msg
*
* Description:
* The nxmq_alloc_msg function will get a free message for use by the
* operating system. The message will be allocated from the g_msgfree
* list.
*
* If the list is empty AND the message is NOT being allocated from the
* interrupt level, then the message will be allocated. If a message
* cannot be obtained, the operating system is dead and therefore cannot
* continue.
*
* If the list is empty AND the message IS being allocated from the
* interrupt level. This function will attempt to get a message from
* the g_msgfreeirq list. If this is unsuccessful, the calling interrupt
* handler will be notified.
*
* Input Parameters:
* None
*
* Returned Value:
* A reference to the allocated msg structure. On a failure to allocate,
* this function PANICs.
* Assumptions:
*
****************************************************************************/
FAR struct mqueue_msg_s *nxmq_alloc_msg(void)
static void nxmq_sndtimeout(wdparm_t arg)
{
FAR struct list_node *mqmsg;
FAR struct tcb_s *wtcb = (FAR struct tcb_s *)(uintptr_t)arg;
irqstate_t flags;
/* Try to get the message from the generally available free list. */
/* Disable interrupts. This is necessary because an interrupt handler may
* attempt to send a message while we are doing this.
*/
mqmsg = list_remove_head(&g_msgfree);
if (mqmsg == NULL)
flags = enter_critical_section();
/* It is also possible that an interrupt/context switch beat us to the
* punch and already changed the task's state.
*/
if (wtcb->task_state == TSTATE_WAIT_MQNOTFULL)
{
/* If we were called from an interrupt handler, then try to get the
* message from generally available list of messages. If this fails,
* then try the list of messages reserved for interrupt handlers
*/
/* Restart with task with a timeout error */
if (up_interrupt_context())
{
/* Try the free list reserved for interrupt handlers */
mqmsg = list_remove_head(&g_msgfreeirq);
}
/* We were not called from an interrupt handler. */
else
{
/* If we cannot a message from the free list, then we will have to
* allocate one.
*/
mqmsg = (FAR struct list_node *)
kmm_malloc((sizeof (struct mqueue_msg_s)));
/* Check if we allocated the message */
if (mqmsg != NULL)
{
/* Yes... remember that this message was dynamically
* allocated.
*/
((FAR struct mqueue_msg_s *)mqmsg)->type = MQ_ALLOC_DYN;
}
}
nxmq_wait_irq(wtcb, ETIMEDOUT);
}
return (FAR struct mqueue_msg_s *)mqmsg;
/* Interrupts may now be re-enabled. */
leave_critical_section(flags);
}
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: nxmq_wait_send
*
@ -194,7 +106,7 @@ FAR struct mqueue_msg_s *nxmq_alloc_msg(void)
*
* Input Parameters:
* msgq - Message queue descriptor
* oflags - flags from user set
* abstime - The absolute time to wait until
*
* Returned Value:
* On success, nxmq_wait_send() returns 0 (OK); a negated errno value is
@ -212,9 +124,11 @@ FAR struct mqueue_msg_s *nxmq_alloc_msg(void)
*
****************************************************************************/
int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags)
int nxmq_wait_send(FAR struct mqueue_inode_s *msgq,
FAR const struct timespec *abstime,
sclock_t ticks)
{
FAR struct tcb_s *rtcb;
FAR struct tcb_s *rtcb = this_task();
#ifdef CONFIG_CANCELLATION_POINTS
/* nxmq_wait_send() is not a cancellation point, but may be called via
@ -231,6 +145,17 @@ int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags)
}
#endif
if (abstime)
{
wd_start_realtime(&rtcb->waitdog, abstime,
nxmq_sndtimeout, (wdparm_t)rtcb);
}
else if (ticks >= 0)
{
wd_start(&rtcb->waitdog, ticks,
nxmq_sndtimeout, (wdparm_t)rtcb);
}
/* Verify that the queue is indeed full as the caller thinks */
/* Loop until there are fewer than max allowable messages in the
@ -239,17 +164,6 @@ int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags)
while (msgq->nmsgs >= msgq->maxmsgs)
{
/* Should we block until there is sufficient space in the
* message queue?
*/
if ((oflags & O_NONBLOCK) != 0)
{
/* No... We will return an error to the caller. */
return -EAGAIN;
}
/* Block until the message queue is no longer full.
* When we are unblocked, we will try again
*/
@ -291,86 +205,41 @@ int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags)
if (rtcb->errcode != OK)
{
return -rtcb->errcode;
break;
}
}
return OK;
if (abstime || ticks >= 0)
{
wd_cancel(&rtcb->waitdog);
}
return -rtcb->errcode;
}
/****************************************************************************
* Name: nxmq_do_send
* Name: nxmq_notify_send
*
* Description:
* This is internal, common logic shared by both [nx]mq_send and
* [nx]mq_timesend. This function adds the specified message (msg) to the
* message queue (msgq). Then it notifies any tasks that were waiting
* for message queue notifications setup by mq_notify. And, finally, it
* awakens any tasks that were waiting for the message not empty event.
* This function is called when a message is sent to a message queue.
* It will notify any tasks that are waiting for the message queue to be
* non-full.
*
* Input Parameters:
* msgq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
*
* Returned Value:
* This function always returns OK.
* None
*
* Assumptions/restrictions:
* - Executes within a critical section established by the caller.
*
****************************************************************************/
int nxmq_do_send(FAR struct mqueue_inode_s *msgq,
FAR struct mqueue_msg_s *mqmsg,
FAR const char *msg, size_t msglen, unsigned int prio)
void nxmq_notify_send(FAR struct mqueue_inode_s *msgq)
{
FAR struct mqueue_msg_s *prev = NULL;
FAR struct mqueue_msg_s *next;
FAR struct tcb_s *btcb;
/* Construct the message header info */
mqmsg->priority = prio;
mqmsg->msglen = msglen;
/* Copy the message data into the message */
memcpy((FAR void *)mqmsg->mail, (FAR const void *)msg, msglen);
/* Insert the new message in the message queue
* Search the message list to find the location to insert the new
* message. Each is list is maintained in ascending priority order.
*/
list_for_every_entry(&msgq->msglist, next, struct mqueue_msg_s, node)
{
if (prio > next->priority)
{
break;
}
else
{
prev = next;
}
}
/* Add the message at the right place */
if (prev)
{
list_add_after(&prev->node, &mqmsg->node);
}
else
{
list_add_head(&msgq->msglist, &mqmsg->node);
}
/* Increment the count of messages in the queue */
if (msgq->nmsgs++ == 0)
{
nxmq_pollnotify(msgq, POLLIN);
}
/* Check if we need to notify any tasks that are attached to the
* message queue
*/
@ -437,6 +306,4 @@ int nxmq_do_send(FAR struct mqueue_inode_s *msgq,
up_switch_context(btcb, rtcb);
}
}
return OK;
}

View file

@ -1,435 +0,0 @@
/****************************************************************************
* sched/mqueue/mq_timedreceive.c
*
* SPDX-License-Identifier: Apache-2.0
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
/****************************************************************************
* Included Files
****************************************************************************/
#include <nuttx/config.h>
#include <sys/types.h>
#include <stdint.h>
#include <stdbool.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <mqueue.h>
#include <debug.h>
#include <nuttx/irq.h>
#include <nuttx/arch.h>
#include <nuttx/wdog.h>
#include <nuttx/mqueue.h>
#include <nuttx/cancelpt.h>
#include "sched/sched.h"
#include "clock/clock.h"
#include "mqueue/mqueue.h"
/****************************************************************************
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: nxmq_rcvtimeout
*
* Description:
* This function is called if the timeout elapses before the message queue
* becomes non-empty.
*
* Input Parameters:
* arg - the argument provided when the timeout was configured.
*
* Returned Value:
* None
*
* Assumptions:
*
****************************************************************************/
static void nxmq_rcvtimeout(wdparm_t arg)
{
FAR struct tcb_s *wtcb = (FAR struct tcb_s *)(uintptr_t)arg;
irqstate_t flags;
/* Disable interrupts. This is necessary because an interrupt handler may
* attempt to send a message while we are doing this.
*/
flags = enter_critical_section();
/* It is also possible that an interrupt/context switch beat us to the
* punch and already changed the task's state.
*/
if (wtcb->task_state == TSTATE_WAIT_MQNOTEMPTY)
{
/* Restart with task with a timeout error */
nxmq_wait_irq(wtcb, ETIMEDOUT);
}
/* Interrupts may now be re-enabled. */
leave_critical_section(flags);
}
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: file_mq_timedreceive_internal
*
* Description:
* This is an internal function of file_mq_timedreceive()/
* file_mq_tickreceive(), please refer to the detailed description for
* more information.
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/
static ssize_t
file_mq_timedreceive_internal(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
FAR const struct timespec *abstime,
sclock_t ticks)
{
FAR struct tcb_s *rtcb = this_task();
FAR struct mqueue_inode_s *msgq;
FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags;
int ret;
DEBUGASSERT(up_interrupt_context() == false);
/* Verify the input parameters and, in case of an error, set
* errno appropriately.
*/
ret = nxmq_verify_receive(mq, msg, msglen);
if (ret < 0)
{
return ret;
}
msgq = mq->f_inode->i_private;
/* Furthermore, nxmq_wait_receive() expects to have interrupts disabled
* because messages can be sent from interrupt level.
*/
flags = enter_critical_section();
/* Check if the message queue is empty. If it is NOT empty, then we
* will not need to start timer.
*/
if (list_is_empty(&msgq->msglist))
{
if (abstime != NULL)
{
if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)
{
ret = -EINVAL;
}
else
{
/* Convert the timespec to clock ticks.
* We must have interrupts disabled here so that
* this time stays valid until the wait begins.
*/
clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
}
/* Handle any time-related errors */
if (ret != OK)
{
goto errout_in_critical_section;
}
}
/* If the time has already expired and the message queue is empty,
* return immediately.
*/
if (ticks <= 0)
{
ret = -ETIMEDOUT;
goto errout_in_critical_section;
}
/* Start the watchdog */
wd_start(&rtcb->waitdog, ticks, nxmq_rcvtimeout, (wdparm_t)rtcb);
}
/* Get the message from the message queue */
ret = nxmq_wait_receive(msgq, mq->f_oflags, &mqmsg);
/* Stop the watchdog timer (this is not harmful in the case where
* it was never started)
*/
wd_cancel(&rtcb->waitdog);
/* Check if we got a message from the message queue. We might
* not have a message if:
*
* - The message queue is empty and O_NONBLOCK is set in the mqdes
* - The wait was interrupted by a signal
* - The watchdog timeout expired
*/
if (ret == OK)
{
DEBUGASSERT(mqmsg != NULL);
ret = nxmq_do_receive(msgq, mqmsg, msg, prio);
}
/* We can now restore interrupts */
errout_in_critical_section:
leave_critical_section(flags);
return ret;
}
/****************************************************************************
* Name: file_mq_timedreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mq." If the message queue is empty
* and O_NONBLOCK was not set, file_mq_timedreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* file_mq_timedreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/
ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
FAR const struct timespec *abstime)
{
return file_mq_timedreceive_internal(mq, msg, msglen, prio, abstime, 0);
}
/****************************************************************************
* Name: file_mq_tickreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mq." If the message queue is empty
* and O_NONBLOCK was not set, file_mq_tickreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* file_mq_tickreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/
ssize_t file_mq_tickreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
sclock_t ticks)
{
return file_mq_timedreceive_internal(mq, msg, msglen, prio, NULL, ticks);
}
/****************************************************************************
* Name: nxmq_timedreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mqdes." If the message queue is empty
* and O_NONBLOCK was not set, nxmq_timedreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* nxmq_timedreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/
ssize_t nxmq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
FAR unsigned int *prio,
FAR const struct timespec *abstime)
{
FAR struct file *filep;
ssize_t ret;
ret = fs_getfilep(mqdes, &filep);
if (ret < 0)
{
return ret;
}
ret = file_mq_timedreceive_internal(filep, msg, msglen, prio, abstime, 0);
fs_putfilep(filep);
return ret;
}
/****************************************************************************
* Name: mq_timedreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mqdes." If the size of the buffer in
* bytes (msglen) is less than the "mq_msgsize" attribute of the message
* queue, mq_timedreceive will return an error. Otherwise, the selected
* message is removed from the queue and copied to "msg."
*
* If the message queue is empty and O_NONBLOCK was not set,
* mq_timedreceive() will block until a message is added to the message
* queue (or until a timeout occurs). If more than one task is waiting
* to receive a message, only the task with the highest priority that has
* waited the longest will be unblocked.
*
* mq_timedreceive() behaves just like mq_receive(), except that if the
* queue is empty and the O_NONBLOCK flag is not enabled for the message
* queue description, then abstime points to a structure which specifies a
* ceiling on the time for which the call will block. This ceiling is an
* absolute timeout in seconds and nanoseconds since the Epoch (midnight
* on the morning of 1 January 1970).
*
* If no message is available, and the timeout has already expired by the
* time of the call, mq_timedreceive() returns immediately.
*
* Input Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
*
* Returned Value:
* On success, the length of the selected message in bytes is returned.
* On failure, -1 (ERROR) is returned and the errno is set appropriately:
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set
* for the message queue description referred to by 'mqdes'.
* EPERM Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
* EINVAL Invalid 'msg' or 'mqdes' or 'abstime'
* ETIMEDOUT The call timed out before a message could be transferred.
*
****************************************************************************/
ssize_t mq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
FAR unsigned int *prio,
FAR const struct timespec *abstime)
{
int ret;
/* mq_timedreceive() is a cancellation point */
enter_cancellation_point();
/* Let nxmq_timedreceive do all of the work */
ret = nxmq_timedreceive(mqdes, msg, msglen, prio, abstime);
if (ret < 0)
{
set_errno(-ret);
ret = ERROR;
}
leave_cancellation_point();
return ret;
}

View file

@ -1,510 +0,0 @@
/****************************************************************************
* sched/mqueue/mq_timedsend.c
*
* SPDX-License-Identifier: Apache-2.0
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
/****************************************************************************
* Included Files
****************************************************************************/
#include <nuttx/config.h>
#include <sys/types.h>
#include <stdint.h>
#include <stdbool.h>
#include <unistd.h>
#include <mqueue.h>
#include <assert.h>
#include <errno.h>
#include <debug.h>
#include <nuttx/irq.h>
#include <nuttx/arch.h>
#include <nuttx/wdog.h>
#include <nuttx/cancelpt.h>
#include "clock/clock.h"
#include "sched/sched.h"
#include "mqueue/mqueue.h"
/****************************************************************************
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: nxmq_sndtimeout
*
* Description:
* This function is called if the timeout elapses before the message queue
* becomes non-full.
*
* Input Parameters:
* arg - The argument that was provided when the timeout was configured.
*
* Returned Value:
* None
*
* Assumptions:
*
****************************************************************************/
static void nxmq_sndtimeout(wdparm_t arg)
{
FAR struct tcb_s *wtcb = (FAR struct tcb_s *)(uintptr_t)arg;
irqstate_t flags;
/* Disable interrupts. This is necessary because an interrupt handler may
* attempt to send a message while we are doing this.
*/
flags = enter_critical_section();
/* It is also possible that an interrupt/context switch beat us to the
* punch and already changed the task's state.
*/
if (wtcb->task_state == TSTATE_WAIT_MQNOTFULL)
{
/* Restart with task with a timeout error */
nxmq_wait_irq(wtcb, ETIMEDOUT);
}
/* Interrupts may now be re-enabled. */
leave_critical_section(flags);
}
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: file_mq_timedsend_internal
*
* Description:
* This is an internal function of file_mq_timedsend()/file_mq_ticksend(),
* please refer to the detailed description for more information.
*
* Input Parameters:
* mq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* abstime - the absolute time to wait until a timeout is decleared
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedsend() for the list list valid return values).
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by mq.
* EINVAL Either msg or mq is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
static int
file_mq_timedsend_internal(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio,
FAR const struct timespec *abstime,
sclock_t ticks)
{
FAR struct tcb_s *rtcb = this_task();
FAR struct mqueue_inode_s *msgq;
FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags;
int ret;
/* Verify the input parameters on any failures to verify. */
ret = nxmq_verify_send(mq, msg, msglen, prio);
if (ret < 0)
{
return ret;
}
msgq = mq->f_inode->i_private;
/* Disable interruption */
flags = enter_critical_section();
/* Pre-allocate a message structure */
mqmsg = nxmq_alloc_msg();
if (mqmsg == NULL)
{
/* Failed to allocate the message. nxmq_alloc_msg() does not set the
* errno value.
*/
ret = -ENOMEM;
goto errout_in_critical_section;
}
/* OpenGroup.org: "Under no circumstance shall the operation fail with a
* timeout if there is sufficient room in the queue to add the message
* immediately. The validity of the abstime parameter need not be checked
* when there is sufficient room in the queue."
*
* Also ignore the time value if for some crazy reason we were called from
* an interrupt handler. This probably really should be an assertion.
*
* NOTE: There is a race condition here: What if a message is added by
* interrupt related logic so that queue again becomes non-empty. That
* is handled because nxmq_do_send() will permit the maxmsgs limit to be
* exceeded in that case.
*/
if (msgq->nmsgs < msgq->maxmsgs || up_interrupt_context())
{
/* Do the send with no further checks (possibly exceeding maxmsgs)
* Currently nxmq_do_send() always returns OK.
*/
goto out_send_message;
}
/* The message queue is full... We are going to wait.
* Now we must have a valid time value.
*/
if (abstime != NULL)
{
if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)
{
ret = -EINVAL;
}
else
{
/* We are not in an interrupt handler and the message queue
* is full. Set up a timed wait for the message queue to
* become non-full.
*
* Convert the timespec to clock ticks. We must have interrupts
* disabled here so that this time stays valid until the wait
* begins.
*/
clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
}
/* Handle any time-related errors */
if (ret != OK)
{
nxmq_free_msg(mqmsg);
goto errout_in_critical_section;
}
}
/* If the time has already expired and the message queue is empty,
* return immediately.
*/
if (ticks <= 0)
{
ret = -ETIMEDOUT;
nxmq_free_msg(mqmsg);
goto errout_in_critical_section;
}
/* Start the watchdog and begin the wait for MQ not full */
wd_start(&rtcb->waitdog, ticks, nxmq_sndtimeout, (wdparm_t)rtcb);
/* And wait for the message queue to be non-empty */
ret = nxmq_wait_send(msgq, mq->f_oflags);
/* This may return with an error and errno set to either EINTR
* or ETIMEDOUT. Cancel the watchdog timer in any event.
*/
wd_cancel(&rtcb->waitdog);
/* Check if nxmq_wait_send() failed */
if (ret == OK)
{
/* If any of the above failed, set the errno. Otherwise, there should
* be space for another message in the message queue. NOW we can
* allocate the message structure.
*
* Currently nxmq_do_send() always returns OK.
*/
out_send_message:
ret = nxmq_do_send(msgq, mqmsg, msg, msglen, prio);
}
else
{
/* free the message as it can't be sent */
nxmq_free_msg(mqmsg);
}
/* Exit here with (1) the scheduler locked, (2) a message allocated, (3) a
* wdog allocated, and (4) interrupts disabled.
*/
errout_in_critical_section:
leave_critical_section(flags);
return ret;
}
/****************************************************************************
* Name: file_mq_timedsend
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mq). file_mq_timedsend() behaves just like mq_send(), except that if
* the queue is full and the O_NONBLOCK flag is not enabled for the
* message queue description, then abstime points to a structure which
* specifies a ceiling on the time for which the call will block.
*
* file_mq_timedsend() is functionally equivalent to mq_timedsend() except
* that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedsend() for a more complete description of the
* behavior of this function
*
* Input Parameters:
* mq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* abstime - the absolute time to wait until a timeout is decleared
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedsend() for the list list valid return values).
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by mq.
* EINVAL Either msg or mq is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio,
FAR const struct timespec *abstime)
{
return file_mq_timedsend_internal(mq, msg, msglen, prio, abstime, 0);
}
/****************************************************************************
* Name: file_mq_ticksend
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mq). file_mq_ticksend() behaves just like mq_send(), except that if
* the queue is full and the O_NONBLOCK flag is not enabled for the
* message queue description, then abstime points to a structure which
* specifies a ceiling on the time for which the call will block.
*
* file_mq_ticksend() is functionally equivalent to mq_timedsend() except
* that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedsend() for a more complete description of the
* behavior of this function
*
* Input Parameters:
* mq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedsend() for the list list valid return values).
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by mq.
* EINVAL Either msg or mq is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
int file_mq_ticksend(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio, sclock_t ticks)
{
return file_mq_timedsend_internal(mq, msg, msglen, prio, NULL, ticks);
}
/****************************************************************************
* Name: nxmq_timedsend
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mqdes). nxmq_timedsend() behaves just like mq_send(), except
* that if the queue is full and the O_NONBLOCK flag is not enabled for
* the message queue description, then abstime points to a structure which
* specifies a ceiling on the time for which the call will block.
*
* nxmq_timedsend() is functionally equivalent to mq_timedsend() except
* that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedsend() for a more complete description of the
* behavior of this function
*
* Input Parameters:
* mqdes - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* abstime - the absolute time to wait until a timeout is decleared
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedsend() for the list list valid return values).
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by mqdes.
* EINVAL Either msg or mqdes is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
int nxmq_timedsend(mqd_t mqdes, FAR const char *msg, size_t msglen,
unsigned int prio, FAR const struct timespec *abstime)
{
FAR struct file *filep;
int ret;
ret = fs_getfilep(mqdes, &filep);
if (ret < 0)
{
return ret;
}
ret = file_mq_timedsend_internal(filep, msg, msglen, prio, abstime, 0);
fs_putfilep(filep);
return ret;
}
/****************************************************************************
* Name: mq_timedsend
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mqdes). The "msglen" parameter specifies the length of the message
* in bytes pointed to by "msg." This length must not exceed the maximum
* message length from the mq_getattr().
*
* If the message queue is not full, mq_timedsend() place the message in
* the message queue at the position indicated by the "prio" argrument.
* Messages with higher priority will be inserted before lower priority
* messages. The value of "prio" must not exceed MQ_PRIO_MAX.
*
* If the specified message queue is full and O_NONBLOCK is not set in the
* message queue, then mq_timedsend() will block until space becomes
* available to the queue the message or a timeout occurs.
*
* mq_timedsend() behaves just like mq_send(), except that if the queue
* is full and the O_NONBLOCK flag is not enabled for the message queue
* description, then abstime points to a structure which specifies a
* ceiling on the time for which the call will block. This ceiling is an
* absolute timeout in seconds and nanoseconds since the Epoch (midnight
* on the morning of 1 January 1970).
*
* If the message queue is full, and the timeout has already expired by
* the time of the call, mq_timedsend() returns immediately.
*
* Input Parameters:
* mqdes - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* abstime - the absolute time to wait until a timeout is decleared
*
* Returned Value:
* On success, mq_send() returns 0 (OK); on error, -1 (ERROR)
* is returned, with errno set to indicate the error:
*
* EAGAIN The queue was full, and the O_NONBLOCK flag was set for the
* message queue description referred to by mqdes.
* EINVAL Either msg or mqdes is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
* Assumptions/restrictions:
*
****************************************************************************/
int mq_timedsend(mqd_t mqdes, FAR const char *msg, size_t msglen,
unsigned int prio, FAR const struct timespec *abstime)
{
int ret;
/* mq_timedsend() is a cancellation point */
enter_cancellation_point();
/* Let nxmq_send() do all of the work */
ret = nxmq_timedsend(mqdes, msg, msglen, prio, abstime);
if (ret < 0)
{
set_errno(-ret);
ret = ERROR;
}
leave_cancellation_point();
return ret;
}

View file

@ -109,6 +109,9 @@ struct task_group_s; /* Forward reference */
/* Functions defined in mq_initialize.c *************************************/
void nxmq_initialize(void);
/* mq_msgfree.c *************************************************************/
void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg);
/* mq_waitirq.c *************************************************************/
@ -117,30 +120,18 @@ void nxmq_wait_irq(FAR struct tcb_s *wtcb, int errcode);
/* mq_rcvinternal.c *********************************************************/
#ifdef CONFIG_DEBUG_FEATURES
int nxmq_verify_receive(FAR struct file *mq, FAR char *msg, size_t msglen);
#else
# define nxmq_verify_receive(msgq, msg, msglen) OK
#endif
int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq,
int oflags, FAR struct mqueue_msg_s **rcvmsg);
ssize_t nxmq_do_receive(FAR struct mqueue_inode_s *msgq,
FAR struct mqueue_msg_s *mqmsg,
FAR char *ubuffer, FAR unsigned int *prio);
FAR struct mqueue_msg_s **rcvmsg,
FAR const struct timespec *abstime,
sclock_t ticks);
void nxmq_notify_receive(FAR struct mqueue_inode_s *msgq);
/* mq_sndinternal.c *********************************************************/
#ifdef CONFIG_DEBUG_FEATURES
int nxmq_verify_send(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio);
#else
# define nxmq_verify_send(mq, msg, msglen, prio) OK
#endif
FAR struct mqueue_msg_s *nxmq_alloc_msg(void);
int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags);
int nxmq_do_send(FAR struct mqueue_inode_s *msgq,
FAR struct mqueue_msg_s *mqmsg,
FAR const char *msg, size_t msglen, unsigned int prio);
int nxmq_wait_send(FAR struct mqueue_inode_s *msgq,
FAR const struct timespec *abstime,
sclock_t ticks);
void nxmq_notify_send(FAR struct mqueue_inode_s *msgq);
/* mq_recover.c *************************************************************/