文章系列

1.linux工作队列 - workqueue总览
2.linux工作队列 - workqueue_struct创建
3.linux工作队列 - 把work_struct加入工作队列
4.linux工作队列 - work_struct被调用过程

work_struct被调用过程

work_struct被调用在函数worker_thread中进行,代码如下:

static int worker_thread(void *__worker)
{
    struct worker *worker = __worker;
    struct worker_pool *pool = worker->pool;

    /* tell the scheduler that this is a workqueue worker */
    worker->task->flags |= PF_WQ_WORKER;
woke_up:
    spin_lock_irq(&pool->lock);

    /* am I supposed to die? */
    if (unlikely(worker->flags & WORKER_DIE)) {----------判断worker是否die,是的话就从worker-pool中删除并返回,不是的话往下继续执行
        spin_unlock_irq(&pool->lock);
        WARN_ON_ONCE(!list_empty(&worker->entry));
        worker->task->flags &= ~PF_WQ_WORKER;

        set_task_comm(worker->task, "kworker/dying");
        ida_simple_remove(&pool->worker_ida, worker->id);
        worker_detach_from_pool(worker, pool);
        kfree(worker);
        return 0;
    }

    worker_leave_idle(worker);
recheck:
    /* no more worker necessary? */
    if (!need_more_worker(pool))----------判断是否有work有待执行,没有的话sleep
        goto sleep;

    /* do we need to manage? */
    if (unlikely(!may_start_working(pool)) && manage_workers(worker))
        goto recheck;

    /*
     * ->scheduled list can only be filled while a worker is
     * preparing to process a work or actually processing it.
     * Make sure nobody diddled with it while I was sleeping.
     */
    WARN_ON_ONCE(!list_empty(&worker->scheduled));

    /*
     * Finish PREP stage.  We're guaranteed to have at least one idle
     * worker or that someone else has already assumed the manager
     * role.  This is where @worker starts participating in concurrency
     * management if applicable and concurrency management is restored
     * after being rebound.  See rebind_workers() for details.
     */
    worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);

    do {---------------------------------------循环执行每一个work
        struct work_struct *work =
            list_first_entry(&pool->worklist,
                     struct work_struct, entry);

        pool->watchdog_ts = jiffies;

        if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
            /* optimization path, not strictly necessary */
            process_one_work(worker, work);
            if (unlikely(!list_empty(&worker->scheduled)))
                process_scheduled_works(worker);
        } else {
            move_linked_works(work, &worker->scheduled, NULL);-----把work加入到worker的scheduled中
            process_scheduled_works(worker);----------在此执行scheduled中的work
        }
    } while (keep_working(pool));

    worker_set_flags(worker, WORKER_PREP);
sleep:
    /*
     * pool->lock is held and there's no work to process and no need to
     * manage, sleep.  Workers are woken up only while holding
     * pool->lock or from local cpu, so setting the current state
     * before releasing pool->lock is enough to prevent losing any
     * event.
     */
    worker_enter_idle(worker);
    __set_current_state(TASK_INTERRUPTIBLE);
    spin_unlock_irq(&pool->lock);
    schedule();----------------------------------执行调度
    goto woke_up;
}

process_scheduled_works函数会调用函数process_one_work以真正执行work中的函数:

static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{
    struct pool_workqueue *pwq = get_work_pwq(work);
    struct worker_pool *pool = worker->pool;
    bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
    int work_color;
    struct worker *collision;
#ifdef CONFIG_LOCKDEP
    /*
     * It is permissible to free the struct work_struct from
     * inside the function that is called from it, this we need to
     * take into account for lockdep too.  To avoid bogus "held
     * lock freed" warnings as well as problems when looking into
     * work->lockdep_map, make a copy and use that here.
     */
    struct lockdep_map lockdep_map;

    lockdep_copy_map(&lockdep_map, &work->lockdep_map);
#endif
    /* ensure we're on the correct CPU */
    WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
             raw_smp_processor_id() != pool->cpu);

    /*
     * A single work shouldn't be executed concurrently by
     * multiple workers on a single cpu.  Check whether anyone is
     * already processing the work.  If so, defer the work to the
     * currently executing one.
     */
    collision = find_worker_executing_work(pool, work);
    if (unlikely(collision)) {
        move_linked_works(work, &collision->scheduled, NULL);
        return;
    }

    /* claim and dequeue */
    debug_work_deactivate(work);
    hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
    worker->current_work = work;
    worker->current_func = work->func;--------func转移到worker中
    worker->current_pwq = pwq;
    work_color = get_work_color(work);

    list_del_init(&work->entry);--------------把work从worker pool中删除

    /*
     * CPU intensive works don't participate in concurrency management.
     * They're the scheduler's responsibility.  This takes @worker out
     * of concurrency management and the next code block will chain
     * execution of the pending work items.
     */
    if (unlikely(cpu_intensive))
        worker_set_flags(worker, WORKER_CPU_INTENSIVE);

    /*
     * Wake up another worker if necessary.  The condition is always
     * false for normal per-cpu workers since nr_running would always
     * be >= 1 at this point.  This is used to chain execution of the
     * pending work items for WORKER_NOT_RUNNING workers such as the
     * UNBOUND and CPU_INTENSIVE ones.
     */
    if (need_more_worker(pool))
        wake_up_worker(pool);

    /*
     * Record the last pool and clear PENDING which should be the last
     * update to @work.  Also, do this inside @pool->lock so that
     * PENDING and queued state changes happen together while IRQ is
     * disabled.
     */
    set_work_pool_and_clear_pending(work, pool->id);

    spin_unlock_irq(&pool->lock);

    lock_map_acquire_read(&pwq->wq->lockdep_map);
    lock_map_acquire(&lockdep_map);
    trace_workqueue_execute_start(work);
    worker->current_func(work);------------------执行work->func
    /*
     * While we must be careful to not use "work" after this, the trace
     * point will only record its address.
     */
    trace_workqueue_execute_end(work);
    lock_map_release(&lockdep_map);
    lock_map_release(&pwq->wq->lockdep_map);

    if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
        pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
               "     last function: %pf\n",
               current->comm, preempt_count(), task_pid_nr(current),
               worker->current_func);
        debug_show_held_locks(current);
        dump_stack();
    }

    /*
     * The following prevents a kworker from hogging CPU on !PREEMPT
     * kernels, where a requeueing work item waiting for something to
     * happen could deadlock with stop_machine as such work item could
     * indefinitely requeue itself while all other CPUs are trapped in
     * stop_machine. At the same time, report a quiescent RCU state so
     * the same condition doesn't freeze RCU.
     */
    cond_resched_rcu_qs();

    spin_lock_irq(&pool->lock);

    /* clear cpu intensive status */
    if (unlikely(cpu_intensive))
        worker_clr_flags(worker, WORKER_CPU_INTENSIVE);

    /* we're done with it, release */
    hash_del(&worker->hentry);
    worker->current_work = NULL;
    worker->current_func = NULL;
    worker->current_pwq = NULL;
    worker->desc_valid = false;
    pwq_dec_nr_in_flight(pwq, work_color);
}
Logo

更多推荐