io_uring/tw.c

Source file repositories/reference/linux-study-clean/io_uring/tw.c

File Facts

System
Linux kernel
Corpus path
io_uring/tw.c
Extension
.c
Size
10132 bytes
Lines
361
Domain
Kernel Services
Bucket
io_uring
Inferred role
Kernel Services: implementation source
Status
source implementation candidate

Why This File Exists

Shared kernel service surface used by multiple subsystems, including helpers, cryptography, virtualization support, and async I/O infrastructure.

Dependency Surface

Detected Declarations

Annotated Snippet

if (!node) {
			if (mpscq_empty(&tctx->task_list))
				break;
			/*
			 * A producer has published a node but hasn't
			 * linked it into the queue yet (see mpscq_pop()).
			 * Give it a chance to finish rather than spinning,
			 * and don't sit on the ctx lock while doing so.
			 */
			ctx_flush_and_put(ctx, ts);
			ctx = NULL;
			cond_resched();
			continue;
		}
		req = container_of(node, struct io_kiocb, io_task_work.node);
		if (req->ctx != ctx) {
			ctx_flush_and_put(ctx, ts);
			ctx = req->ctx;
			mutex_lock(&ctx->uring_lock);
			percpu_ref_get(&ctx->refs);
			ts.cancel = io_should_terminate_tw(ctx);
		}
		INDIRECT_CALL_2(req->io_task_work.func,
				io_poll_task_func, io_req_rw_complete,
				(struct io_tw_req){req}, ts);
		(*count)++;
		if (unlikely(need_resched())) {
			ctx_flush_and_put(ctx, ts);
			ctx = NULL;
			cond_resched();
		}
	}
	ctx_flush_and_put(ctx, ts);

	/*
	 * Relaxed read is enough as only the task itself sets ->in_cancel.
	 * The tctx may also be drained by io_tctx_fallback_work(), in which
	 * case current is a kworker that has no tctx refs to drop.
	 */
	if (unlikely(atomic_read(&tctx->in_cancel)) &&
	    current->io_uring == tctx)
		io_uring_drop_tctx_refs(current);

	trace_io_uring_task_work_run(tctx, *count);
}

void tctx_task_work(struct callback_head *cb)
{
	struct io_uring_task *tctx;
	unsigned int count = 0;

	tctx = container_of(cb, struct io_uring_task, task_work);
	clear_bit(0, &tctx->tw_pending);
	smp_mb__after_atomic();
	tctx_task_work_run(tctx, UINT_MAX, &count);
}

/*
 * Sets IORING_SQ_TASKRUN in the sq_flags shared with userspace, using the
 * RCU protected rings pointer to be safe against concurrent ring resizing.
 */
static void io_ctx_mark_taskrun(struct io_ring_ctx *ctx)
{
	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) {
		struct io_rings *rings;

		guard(rcu)();
		rings = rcu_dereference(ctx->rings_rcu);
		atomic_or(IORING_SQ_TASKRUN, &rings->sq_flags);
	}
}

void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
{
	struct io_ring_ctx *ctx = req->ctx;
	int nr_wait;

	/*
	 * We don't know how many requests there are in the link and whether
	 * they can even be queued lazily, fall back to non-lazy.
	 */
	if (req->flags & IO_REQ_LINK_FLAGS)
		flags &= ~IOU_F_TWQ_LAZY_WAKE;

	/*
	 * The xchg() in mpscq_push() implies a full barrier, which pairs with
	 * the barrier in set_current_state() on the io_cqring_wait() side. This
	 * ensures that either we see the updated ->cq_wait_nr, or waiters going
	 * to sleep will observe the work added to the list, which is similar to
	 * the wait/wake task state sync.

Annotation

Implementation Notes