sched/wqueue: Refactor delayed and periodical workqueue.

This commit refactors the logic of workqueue processing delayed and periodic work, and changes the timer to be set in `work_thread`. The improvements of this change are as follows:
- Fixed the memory reuse problem of the original periodic workqueue implementation.
- By removing the `wdog_s` structure in the `work_s` structure, the memory overhead of each `work_s` structure is reduced by about 30 bytes.
- Set the timer for each workqueue instead of each work, which improves system performance.
- Simplified the workqueue cancel logic.

Signed-off-by: ouyangxiangzhen <ouyangxiangzhen@xiaomi.com>
This commit is contained in:
ouyangxiangzhen 2025-04-17 11:41:22 +08:00 committed by Xiang Xiao
parent 9dbb9b49c6
commit 6f72f5481d
7 changed files with 297 additions and 190 deletions

View file

@ -249,16 +249,11 @@ typedef CODE void (*worker_t)(FAR void *arg);
struct work_s
{
struct list_node node; /* Implements a double linked list */
clock_t qtime; /* Time work queued */
union
{
struct wdog_s timer; /* Delay expiry timer */
struct wdog_period_s ptimer; /* Period expiry timer */
} u;
worker_t worker; /* Work callback */
FAR void *arg; /* Callback argument */
FAR struct kwork_wqueue_s *wq; /* Work queue */
struct list_node node; /* Implements a double linked list */
clock_t qtime; /* Time work queued */
clock_t period; /* Periodical delay ticks */
worker_t worker; /* Work callback */
FAR void *arg; /* Callback argument */
};
/* This is an enumeration of the various events that may be

View file

@ -89,6 +89,16 @@ static int work_qqueue(FAR struct usr_wqueue_s *wqueue,
work->arg = arg; /* Callback argument */
work->qtime = clock() + delay; /* Delay until work performed */
/* delay+1 is to prevent the insufficient sleep time if we are
* currently near the boundary to the next tick.
* | current_tick | current_tick + 1 | current_tick + 2 | .... |
* | ^ Here we get the current tick
* In this case we delay 1 tick, timer will be triggered at
* current_tick + 1, which is not enough for at least 1 tick.
*/
work->qtime += 1;
/* Insert the work into the wait queue sorted by the expired time. */
list_for_every_entry(&wqueue->q, curr, struct work_s, node)

View file

@ -94,7 +94,7 @@ static void work_process(FAR struct usr_wqueue_s *wqueue)
FAR struct work_s *work;
worker_t worker;
FAR void *arg;
sclock_t elapsed;
clock_t tick;
clock_t next;
int ret;
@ -126,18 +126,18 @@ static void work_process(FAR struct usr_wqueue_s *wqueue)
* zero will always execute immediately.
*/
elapsed = clock() - work->qtime;
tick = clock();
/* Is this delay work ready? */
if (elapsed >= 0)
if (clock_compare(work->qtime, tick))
{
/* Remove the ready-to-execute work from the list */
list_delete(&work->node);
/* Extract the work description from the entry (in case the work
* instance by the re-used after it has been de-queued).
* instance by the reused after it has been de-queued).
*/
worker = work->worker;

View file

@ -46,7 +46,7 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync,
FAR struct work_s *work)
{
irqstate_t flags;
int ret = -ENOENT;
int ret = OK;
if (wqueue == NULL || work == NULL)
{
@ -59,17 +59,37 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync,
*/
flags = spin_lock_irqsave(&wqueue->lock);
if (work->worker != NULL)
/* Check whether we own the work structure. */
if (!work_available(work))
{
/* Remove the entry from the work queue and make sure that it is
* marked as available (i.e., the worker field is nullified).
*/
bool is_head = list_is_head(&wqueue->pending, &work->node);
/* Seize the ownership from the work thread. */
work->worker = NULL;
wd_cancel(&work->u.timer);
list_delete(&work->node);
ret = OK;
/* If the head of the pending queue has changed, we should reset
* the wqueue timer.
*/
if (is_head)
{
if (!list_is_empty(&wqueue->pending))
{
work = list_first_entry(&wqueue->pending, struct work_s, node);
ret = wd_start_abstick(&wqueue->timer, work->qtime,
work_timer_expired, (wdparm_t)wqueue);
}
else
{
wd_cancel(&wqueue->timer);
}
}
}
else if (!up_interrupt_context() && !sched_idletask() && sync)
{

View file

@ -40,67 +40,6 @@
#ifdef CONFIG_SCHED_WORKQUEUE
/****************************************************************************
* Pre-processor Definitions
****************************************************************************/
#define queue_work(wqueue, work) \
do \
{ \
list_add_tail(&(wqueue)->q, &(work)->node); \
if ((wqueue)->wait_count > 0) /* There are threads waiting for sem. */ \
{ \
(wqueue)->wait_count--; \
nxsem_post(&(wqueue)->sem); \
} \
} \
while (0)
/****************************************************************************
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: work_timer_expiry
****************************************************************************/
static void work_timer_expiry(wdparm_t arg)
{
FAR struct work_s *work = (FAR struct work_s *)arg;
irqstate_t flags = spin_lock_irqsave(&work->wq->lock);
sched_lock();
/* We have being canceled */
if (work->worker != NULL)
{
queue_work(work->wq, work);
}
spin_unlock_irqrestore(&work->wq->lock, flags);
sched_unlock();
}
static bool work_is_canceling(FAR struct kworker_s *kworkers, int nthreads,
FAR struct work_s *work)
{
int wndx;
for (wndx = 0; wndx < nthreads; wndx++)
{
if (kworkers[wndx].work == work)
{
if (kworkers[wndx].wait_count > 0)
{
return true;
}
}
}
return false;
}
/****************************************************************************
* Public Functions
****************************************************************************/
@ -141,65 +80,67 @@ int work_queue_period_wq(FAR struct kwork_wqueue_s *wqueue,
FAR void *arg, clock_t delay, clock_t period)
{
irqstate_t flags;
int ret = OK;
clock_t expected;
bool wake = false;
int ret = OK;
if (wqueue == NULL || work == NULL || worker == NULL)
{
return -EINVAL;
}
/* Ensure the work has been canceled. */
work_cancel_wq(wqueue, work);
/* delay+1 is to prevent the insufficient sleep time if we are
* currently near the boundary to the next tick.
* | current_tick | current_tick + 1 | current_tick + 2 | .... |
* | ^ Here we get the current tick
* In this case we delay 1 tick, timer will be triggered at
* current_tick + 1, which is not enough for at least 1 tick.
*/
expected = clock_systime_ticks() + delay + 1;
/* Interrupts are disabled so that this logic can be called from with
* task logic or from interrupt handling logic.
*/
flags = spin_lock_irqsave(&wqueue->lock);
sched_lock();
/* Remove the entry from the timer and work queue. */
if (work->worker != NULL)
{
/* Remove the entry from the work queue and make sure that it is
* marked as available (i.e., the worker field is nullified).
*/
work->worker = NULL;
wd_cancel(&work->u.timer);
list_delete(&work->node);
}
if (work_is_canceling(wqueue->worker, wqueue->nthreads, work))
{
goto out;
}
/* Initialize the work structure. */
work->worker = worker; /* Work callback. non-NULL means queued */
work->arg = arg; /* Callback argument */
work->wq = wqueue; /* Work queue */
work->worker = worker; /* Work callback. non-NULL means queued */
work->arg = arg; /* Callback argument */
work->qtime = expected; /* Expected time */
work->period = period; /* Periodical delay */
/* Queue the new work */
/* Insert to the pending list of the wqueue. */
if (!delay)
if (delay)
{
queue_work(wqueue, work);
}
else if (period == 0)
{
ret = wd_start(&work->u.timer, delay,
work_timer_expiry, (wdparm_t)work);
if (work_insert_pending(wqueue, work))
{
/* Start the timer if the work is the earliest expired work. */
ret = wd_start_abstick(&wqueue->timer, work->qtime,
work_timer_expired, (wdparm_t)wqueue);
}
}
else
{
ret = wd_start_period(&work->u.ptimer, delay, period,
work_timer_expiry, (wdparm_t)work);
list_add_tail(&wqueue->expired, &work->node);
wake = true;
}
out:
spin_unlock_irqrestore(&wqueue->lock, flags);
sched_unlock();
if (wake)
{
nxsem_post(&wqueue->sem);
}
return ret;
}

View file

@ -83,11 +83,14 @@
struct hp_wqueue_s g_hpwork =
{
LIST_INITIAL_VALUE(g_hpwork.q),
SEM_INITIALIZER(0),
SEM_INITIALIZER(0),
SP_UNLOCKED,
CONFIG_SCHED_HPNTHREADS,
{
LIST_INITIAL_VALUE(g_hpwork.wq.expired),
LIST_INITIAL_VALUE(g_hpwork.wq.pending),
SEM_INITIALIZER(0),
SEM_INITIALIZER(0),
SP_UNLOCKED,
CONFIG_SCHED_HPNTHREADS,
}
};
#endif /* CONFIG_SCHED_HPWORK */
@ -97,11 +100,14 @@ struct hp_wqueue_s g_hpwork =
struct lp_wqueue_s g_lpwork =
{
LIST_INITIAL_VALUE(g_lpwork.q),
SEM_INITIALIZER(0),
SEM_INITIALIZER(0),
SP_UNLOCKED,
CONFIG_SCHED_LPNTHREADS,
{
LIST_INITIAL_VALUE(g_lpwork.wq.expired),
LIST_INITIAL_VALUE(g_lpwork.wq.pending),
SEM_INITIALIZER(0),
SEM_INITIALIZER(0),
SP_UNLOCKED,
CONFIG_SCHED_LPNTHREADS,
}
};
#endif /* CONFIG_SCHED_LPWORK */
@ -110,6 +116,51 @@ struct lp_wqueue_s g_lpwork =
* Private Functions
****************************************************************************/
static inline_function
void work_dispatch(FAR struct kwork_wqueue_s *wq)
{
FAR struct work_s *work;
FAR struct work_s *next;
unsigned int count = 0;
clock_t ticks = clock_systime_ticks();
/* Wake up the worker thread once there is expired work.
* If some worker threads are busy, here the callback will
* wake up another waiting work thread.
*
* Becareful of the special case that the pending work
* has been canceled but the timer is expired.
* In this case we should not wake up any worker thread.
*/
list_for_every_entry_safe(&wq->pending, work, next, struct work_s, node)
{
/* Check whether the work has expired. */
if (!clock_compare(work->qtime, ticks))
{
wd_start_abstick(&wq->timer, work->qtime,
work_timer_expired, (wdparm_t)wq);
break;
}
/* Expired work will be moved to tail of the expired queue. */
list_delete(&work->node);
list_add_tail(&wq->expired, &work->node);
/* Note that the thread execution this function is also
* a worker thread, which has already been woken up by the timer.
* So only `count - 1` semaphore will be posted.
*/
if (count++ > 0)
{
nxsem_post(&wq->sem);
}
}
}
/****************************************************************************
* Name: work_thread
*
@ -135,11 +186,11 @@ struct lp_wqueue_s g_lpwork =
static int work_thread(int argc, FAR char *argv[])
{
FAR struct kwork_wqueue_s *wqueue;
FAR struct kworker_s *kworker;
FAR struct work_s *work;
worker_t worker;
irqstate_t flags;
FAR void *arg;
FAR struct kworker_s *kworker;
FAR struct work_s *work;
worker_t worker;
irqstate_t flags;
FAR void *arg;
/* Get the handle from argv */
@ -148,33 +199,38 @@ static int work_thread(int argc, FAR char *argv[])
kworker = (FAR struct kworker_s *)
((uintptr_t)strtoul(argv[2], NULL, 16));
flags = spin_lock_irqsave(&wqueue->lock);
sched_lock();
/* Loop forever */
/* Loop until wqueue->exit != 0.
* Since the only way to set wqueue->exit is to call work_queue_free(),
* there is no need for entering the critical section.
*/
while (!wqueue->exit)
{
/* And check each entry in the work queue. Since we have disabled
/* And check first entry in the work queue. Since we have disabled
* interrupts we know: (1) we will not be suspended unless we do
* so ourselves, and (2) there will be no changes to the work queue
*/
/* Remove the ready-to-execute work from the list */
flags = spin_lock_irqsave(&wqueue->lock);
sched_lock();
while (!list_is_empty(&wqueue->q))
/* If the wqueue timer is expired and non-active, it indicates that
* there might be expired work in the pending queue.
*/
if (!WDOG_ISACTIVE(&wqueue->timer))
{
work = list_first_entry(&wqueue->q, struct work_s, node);
work_dispatch(wqueue);
}
if (!list_is_empty(&wqueue->expired))
{
work = list_first_entry(&wqueue->expired, struct work_s, node);
list_delete(&work->node);
if (work->worker == NULL)
{
continue;
}
/* Extract the work description from the entry (in case the work
* instance will be re-used after it has been de-queued).
/* Extract the work description from the entry (in case the
* work instance will be reused after it has been de-queued).
*/
worker = work->worker;
@ -183,21 +239,42 @@ static int work_thread(int argc, FAR char *argv[])
arg = work->arg;
/* Mark the work as no longer being queued */
/* Check whether the work is periodic. */
work->worker = NULL;
if (work->period != 0)
{
/* Calculate next expiration qtime. */
work->qtime += work->period;
/* Enqueue to the waiting queue */
if (work_insert_pending(wqueue, work))
{
/* We should reset timer if the work is the earliest. */
wd_start_abstick(&wqueue->timer, work->qtime,
work_timer_expired, (wdparm_t)wqueue);
}
}
else
{
/* Return the work structure ownership to the work owner. */
work->worker = NULL;
}
/* Mark the thread busy */
kworker->work = work;
spin_unlock_irqrestore(&wqueue->lock, flags);
sched_unlock();
/* Do the work. Re-enable interrupts while the work is being
* performed... we don't have any idea how long this will take!
*/
spin_unlock_irqrestore(&wqueue->lock, flags);
sched_unlock();
CALL_WORKER(worker, arg);
flags = spin_lock_irqsave(&wqueue->lock);
sched_lock();
@ -215,22 +292,13 @@ static int work_thread(int argc, FAR char *argv[])
}
}
/* Then process queued work. work_process will not return until: (1)
* there is no further work in the work queue, and (2) semaphore is
* posted.
*/
wqueue->wait_count++;
spin_unlock_irqrestore(&wqueue->lock, flags);
sched_unlock();
nxsem_wait_uninterruptible(&wqueue->sem);
flags = spin_lock_irqsave(&wqueue->lock);
sched_lock();
}
/* Wait for the semaphore to be posted by the wqueue timer. */
spin_unlock_irqrestore(&wqueue->lock, flags);
sched_unlock();
nxsem_wait_uninterruptible(&wqueue->sem);
}
nxsem_post(&wqueue->exsem);
return OK;
@ -303,6 +371,27 @@ static int work_thread_create(FAR const char *name, int priority,
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: work_timer_expired
*
* Description:
* The wqueue timer callback.
*
* Input Parameters:
* arg - The work queue.
*
****************************************************************************/
void work_timer_expired(wdparm_t arg)
{
/* The work time expired callback will wake up at least one worker thread
* to dispatch the expired work.
*/
FAR struct kwork_wqueue_s *wq = (FAR struct kwork_wqueue_s *)arg;
nxsem_post(&wq->sem);
}
/****************************************************************************
* Name: work_queue_create
*
@ -349,7 +438,9 @@ FAR struct kwork_wqueue_s *work_queue_create(FAR const char *name,
/* Initialize the work queue structure */
list_initialize(&wqueue->q);
list_initialize(&wqueue->expired);
list_initialize(&wqueue->pending);
wqueue->timer.func = NULL;
nxsem_init(&wqueue->sem, 0, 0);
nxsem_init(&wqueue->exsem, 0, 0);
wqueue->nthreads = nthreads;
@ -392,6 +483,8 @@ int work_queue_free(FAR struct kwork_wqueue_s *wqueue)
return -EINVAL;
}
wd_cancel(&wqueue->timer);
/* Mark the work queue as exiting */
wqueue->exit = true;

View file

@ -66,14 +66,15 @@ struct kworker_s
struct kwork_wqueue_s
{
struct list_node q; /* The queue of pending work */
sem_t sem; /* The counting semaphore of the wqueue */
sem_t exsem; /* Sync waiting for thread exit */
spinlock_t lock; /* Spinlock */
uint8_t nthreads; /* Number of worker threads */
bool exit; /* A flag to request the thread to exit */
int16_t wait_count;
struct kworker_s worker[0]; /* Describes a worker thread */
struct list_node expired; /* The queue of expired work. */
struct list_node pending; /* The queue of pending work. */
sem_t sem; /* The counting semaphore of the wqueue */
sem_t exsem; /* Sync waiting for thread exit */
spinlock_t lock; /* Spinlock */
uint8_t nthreads; /* Number of worker threads */
bool exit; /* A flag to request the thread to exit */
struct wdog_s timer; /* Timer to pending. */
struct kworker_s worker[0]; /* Describes a worker thread */
};
/* This structure defines the state of one high-priority work queue. This
@ -83,17 +84,11 @@ struct kwork_wqueue_s
#ifdef CONFIG_SCHED_HPWORK
struct hp_wqueue_s
{
struct list_node q; /* The queue of pending work */
sem_t sem; /* The counting semaphore of the wqueue */
sem_t exsem; /* Sync waiting for thread exit */
spinlock_t lock; /* Spinlock */
uint8_t nthreads; /* Number of worker threads */
bool exit; /* A flag to request the thread to exit */
int16_t wait_count;
struct kwork_wqueue_s wq;
/* Describes each thread in the high priority queue's thread pool */
struct kworker_s worker[CONFIG_SCHED_HPNTHREADS];
struct kworker_s worker[CONFIG_SCHED_HPNTHREADS];
};
#endif
@ -104,17 +99,11 @@ struct hp_wqueue_s
#ifdef CONFIG_SCHED_LPWORK
struct lp_wqueue_s
{
struct list_node q; /* The queue of pending work */
sem_t sem; /* The counting semaphore of the wqueue */
sem_t exsem; /* Sync waiting for thread exit */
spinlock_t lock; /* Spinlock */
uint8_t nthreads; /* Number of worker threads */
bool exit; /* A flag to request the thread to exit */
int16_t wait_count;
struct kwork_wqueue_s wq;
/* Describes each thread in the low priority queue's thread pool */
struct kworker_s worker[CONFIG_SCHED_LPNTHREADS];
struct kworker_s worker[CONFIG_SCHED_LPNTHREADS];
};
#endif
@ -159,6 +148,65 @@ static inline_function FAR struct kwork_wqueue_s *work_qid2wq(int qid)
}
}
/****************************************************************************
* Name: work_insert_pending
*
* Description:
* Internal public function to insert the work to the workqueue.
* Require wqueue != NULL and work != NULL.
*
* Input Parameters:
* wqueue - The work queue.
* work - The work to be inserted.
*
* Returned Value:
* Return whether the work is inserted at the head of the pending queue.
*
****************************************************************************/
static inline_function
bool work_insert_pending(FAR struct kwork_wqueue_s *wqueue,
FAR struct work_s *work)
{
struct work_s *curr;
DEBUGASSERT(wqueue != NULL && work != NULL);
/* Insert the work into the wait queue sorted by the expired time. */
list_for_every_entry(&wqueue->pending, curr, struct work_s, node)
{
if (!clock_compare(curr->qtime, work->qtime))
{
break;
}
}
/* After the insertion, we do not violate the invariant that
* the wait queue is sorted by the expired time. Because
* curr->qtime > work->qtime.
* In the case of the wqueue is empty, we insert
* the work at the head of the wait queue.
*/
list_add_before(&curr->node, &work->node);
return list_is_head(&wqueue->pending, &work->node);
}
/****************************************************************************
* Name: work_timer_expired
*
* Description:
* The wqueue timer callback.
*
* Input Parameters:
* arg - The work queue.
*
****************************************************************************/
void work_timer_expired(wdparm_t arg);
/****************************************************************************
* Name: work_start_highpri
*