/*
* Sets up an aio uring context, and returns the fd. Applications asks for a
* ring size, we return the actual sq/cq ring sizes (among other things) in the
* params structure passed in.
*/staticlongio_uring_setup(u32entries,structio_uring_params__user*params){structio_uring_paramsp;longret;inti;// 用户态拷贝到内核态
if(copy_from_user(&p,params,sizeof(p)))return-EFAULT;// 确认保留区域没有被赋值
for(i=0;i<ARRAY_SIZE(p.resv);i++){if(p.resv[i])return-EINVAL;}// 检查 flags 参数
if(p.flags&~(IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL|IORING_SETUP_SQ_AFF))return-EINVAL;// 分配内存空间,创建 workqueue,创建 fd 等
ret=io_uring_create(entries,&p);if(ret<0)returnret;// 内核态拷贝回用户态
if(copy_to_user(params,&p,sizeof(p)))return-EFAULT;returnret;}SYSCALL_DEFINE2(io_uring_setup,u32,entries,structio_uring_params__user*,params){returnio_uring_setup(entries,params);}
staticintio_uring_create(unsignedentries,structio_uring_params*p){structuser_struct*user=NULL;structio_ring_ctx*ctx;boolaccount_mem;intret;if(!entries||entries>IORING_MAX_ENTRIES)return-EINVAL;/*
* Use twice as many entries for the CQ ring. It's possible for the
* application to drive a higher depth than the size of the SQ ring,
* since the sqes are only used at submission time. This allows for
* some flexibility in overcommitting a bit.
*/p->sq_entries=roundup_pow_of_two(entries);p->cq_entries=2*p->sq_entries;user=get_uid(current_user());// 允许对共享内存段进行锁定
account_mem=!capable(CAP_IPC_LOCK);if(account_mem){// 不能对共享内存段进行锁定,就需要增加当前可以锁定的内存
ret=io_account_mem(user,ring_pages(p->sq_entries,p->cq_entries));if(ret){free_uid(user);returnret;}}ctx=io_ring_ctx_alloc(p);if(!ctx){if(account_mem)io_unaccount_mem(user,ring_pages(p->sq_entries,p->cq_entries));free_uid(user);return-ENOMEM;}ctx->compat=in_compat_syscall();ctx->account_mem=account_mem;ctx->user=user;ctx->creds=get_current_cred();if(!ctx->creds){ret=-ENOMEM;gotoerr;}// 申请 io_rings SQEs
ret=io_allocate_scq_urings(ctx,p);if(ret)gotoerr;// 初始化 workqueue,[初始化内核线程用于进行 IO poll]
ret=io_sq_offload_start(ctx,p);if(ret)gotoerr;memset(&p->sq_off,0,sizeof(p->sq_off));p->sq_off.head=offsetof(structio_rings,sq.head);p->sq_off.tail=offsetof(structio_rings,sq.tail);p->sq_off.ring_mask=offsetof(structio_rings,sq_ring_mask);p->sq_off.ring_entries=offsetof(structio_rings,sq_ring_entries);p->sq_off.flags=offsetof(structio_rings,sq_flags);p->sq_off.dropped=offsetof(structio_rings,sq_dropped);p->sq_off.array=(char*)ctx->sq_array-(char*)ctx->rings;memset(&p->cq_off,0,sizeof(p->cq_off));p->cq_off.head=offsetof(structio_rings,cq.head);p->cq_off.tail=offsetof(structio_rings,cq.tail);p->cq_off.ring_mask=offsetof(structio_rings,cq_ring_mask);p->cq_off.ring_entries=offsetof(structio_rings,cq_ring_entries);p->cq_off.overflow=offsetof(structio_rings,cq_overflow);p->cq_off.cqes=offsetof(structio_rings,cqes);/*
* Install ring fd as the very last thing, so we don't risk someone
* having closed it before we finish setup
*/// 创建 fd 便于用户态访问 ctx
ret=io_uring_get_fd(ctx);if(ret<0)gotoerr;p->features=IORING_FEAT_SINGLE_MMAP;returnret;err:io_ring_ctx_wait_and_kill(ctx);returnret;}
staticintio_sq_offload_start(structio_ring_ctx*ctx,structio_uring_params*p){intret;mmgrab(current->mm);ctx->sqo_mm=current->mm;if(ctx->flags&IORING_SETUP_SQPOLL){// IORING_SETUP_SQPOLL 将会创建一个内核线程来 poll SQ
ret=-EPERM;if(!capable(CAP_SYS_ADMIN))gotoerr;ctx->sq_thread_idle=msecs_to_jiffies(p->sq_thread_idle);if(!ctx->sq_thread_idle)ctx->sq_thread_idle=HZ;if(p->flags&IORING_SETUP_SQ_AFF){intcpu=p->sq_thread_cpu;ret=-EINVAL;if(cpu>=nr_cpu_ids)gotoerr;if(!cpu_online(cpu))gotoerr;ctx->sqo_thread=kthread_create_on_cpu(io_sq_thread,ctx,cpu,"io_uring-sq");}else{ctx->sqo_thread=kthread_create(io_sq_thread,ctx,"io_uring-sq");}if(IS_ERR(ctx->sqo_thread)){ret=PTR_ERR(ctx->sqo_thread);ctx->sqo_thread=NULL;gotoerr;}wake_up_process(ctx->sqo_thread);}elseif(p->flags&IORING_SETUP_SQ_AFF){/* Can't have SQ_AFF without SQPOLL */ret=-EINVAL;gotoerr;}/* Do QD, or 2 * CPUS, whatever is smallest */ctx->sqo_wq[0]=alloc_workqueue("io_ring-wq",WQ_UNBOUND|WQ_FREEZABLE,min(ctx->sq_entries-1,2*num_online_cpus()));if(!ctx->sqo_wq[0]){ret=-ENOMEM;gotoerr;}/*
* This is for buffered writes, where we want to limit the parallelism
* due to file locking in file systems. As "normal" buffered writes
* should parellelize on writeout quite nicely, limit us to having 2
* pending. This avoids massive contention on the inode when doing
* buffered async writes.
*/// 对 buffer 写的 workqueue 深度进行限制,减少锁争用开销?
ctx->sqo_wq[1]=alloc_workqueue("io_ring-write-wq",WQ_UNBOUND|WQ_FREEZABLE,2);if(!ctx->sqo_wq[1]){ret=-ENOMEM;gotoerr;}return0;err:io_finish_async(ctx);mmdrop(ctx->sqo_mm);ctx->sqo_mm=NULL;returnret;}
SYSCALL_DEFINE6(io_uring_enter,unsignedint,fd,u32,to_submit,u32,min_complete,u32,flags,constsigset_t__user*,sig,size_t,sigsz){structio_ring_ctx*ctx;longret=-EBADF;intsubmitted=0;structfdf;if(flags&~(IORING_ENTER_GETEVENTS|IORING_ENTER_SQ_WAKEUP))return-EINVAL;f=fdget(fd);if(!f.file)return-EBADF;ret=-EOPNOTSUPP;if(f.file->f_op!=&io_uring_fops)gotoout_fput;ret=-ENXIO;ctx=f.file->private_data;if(!percpu_ref_tryget(&ctx->refs))gotoout_fput;/*
* For SQ polling, the thread will do all submissions and completions.
* Just return the requested submit count, and wake the thread if
* we were asked to.
*/ret=0;if(ctx->flags&IORING_SETUP_SQPOLL){// 唤醒内核中的 sq_thread
if(flags&IORING_ENTER_SQ_WAKEUP)wake_up(&ctx->sqo_wait);submitted=to_submit;}elseif(to_submit){// 主动提交请求
to_submit=min(to_submit,ctx->sq_entries);mutex_lock(&ctx->uring_lock);submitted=io_ring_submit(ctx,to_submit);mutex_unlock(&ctx->uring_lock);if(submitted!=to_submit)gotoout;}if(flags&IORING_ENTER_GETEVENTS){// 等待指定数量的请求完成
unsignednr_events=0;min_complete=min(min_complete,ctx->cq_entries);if(ctx->flags&IORING_SETUP_IOPOLL){ret=io_iopoll_check(ctx,&nr_events,min_complete);}else{ret=io_cqring_wait(ctx,min_complete,sig,sigsz);}}out:percpu_ref_put(&ctx->refs);out_fput:fdput(f);returnsubmitted?submitted:ret;}
SYSCALL_DEFINE4(io_uring_register,unsignedint,fd,unsignedint,opcode,void__user*,arg,unsignedint,nr_args){structio_ring_ctx*ctx;longret=-EBADF;structfdf;f=fdget(fd);if(!f.file)return-EBADF;ret=-EOPNOTSUPP;if(f.file->f_op!=&io_uring_fops)gotoout_fput;ctx=f.file->private_data;mutex_lock(&ctx->uring_lock);// 核心函数
ret=__io_uring_register(ctx,opcode,arg,nr_args);mutex_unlock(&ctx->uring_lock);out_fput:fdput(f);returnret;}staticint__io_uring_register(structio_ring_ctx*ctx,unsignedopcode,void__user*arg,unsignednr_args)__releases(ctx->uring_lock)__acquires(ctx->uring_lock){intret;/*
* We're inside the ring mutex, if the ref is already dying, then
* someone else killed the ctx or is already going through
* io_uring_register().
*/if(percpu_ref_is_dying(&ctx->refs))return-ENXIO;percpu_ref_kill(&ctx->refs);/*
* Drop uring mutex before waiting for references to exit. If another
* thread is currently inside io_uring_enter() it might need to grab
* the uring_lock to make progress. If we hold it here across the drain
* wait, then we can deadlock. It's safe to drop the mutex here, since
* no new references will come in after we've killed the percpu ref.
*/mutex_unlock(&ctx->uring_lock);wait_for_completion(&ctx->ctx_done);mutex_lock(&ctx->uring_lock);// 根据 opcode 注册/释放相应的缓冲区资源
switch(opcode){caseIORING_REGISTER_BUFFERS:ret=io_sqe_buffer_register(ctx,arg,nr_args);break;caseIORING_UNREGISTER_BUFFERS:ret=-EINVAL;if(arg||nr_args)break;ret=io_sqe_buffer_unregister(ctx);break;caseIORING_REGISTER_FILES:ret=io_sqe_files_register(ctx,arg,nr_args);break;caseIORING_UNREGISTER_FILES:ret=-EINVAL;if(arg||nr_args)break;ret=io_sqe_files_unregister(ctx);break;caseIORING_REGISTER_EVENTFD:ret=-EINVAL;if(nr_args!=1)break;ret=io_eventfd_register(ctx,arg);break;caseIORING_UNREGISTER_EVENTFD:ret=-EINVAL;if(arg||nr_args)break;ret=io_eventfd_unregister(ctx);break;default:ret=-EINVAL;break;}/* bring the ctx back to life */reinit_completion(&ctx->ctx_done);percpu_ref_reinit(&ctx->refs);returnret;}
staticintio_sq_thread(void*data){structio_ring_ctx*ctx=data;structmm_struct*cur_mm=NULL;conststructcred*old_cred;mm_segment_told_fs;DEFINE_WAIT(wait);unsignedinflight;unsignedlongtimeout;// 通知主线程,sqo 线程已经启动
complete(&ctx->sqo_thread_started);old_fs=get_fs();set_fs(USER_DS);old_cred=override_creds(ctx->creds);// 线程的主循环
timeout=inflight=0;while(!kthread_should_park()){boolmm_fault=false;unsignedintto_submit;// 如果 inflight 不为 0,说明有请求正在处理中
if(inflight){unsignednr_events=0;if(ctx->flags&IORING_SETUP_IOPOLL){/*
* inflight is the count of the maximum possible
* entries we submitted, but it can be smaller
* if we dropped some of them. If we don't have
* poll entries available, then we know that we
* have nothing left to poll for. Reset the
* inflight count to zero in that case.
*/mutex_lock(&ctx->uring_lock);// iopoll 模式下,sqo 还需要负责执行 poll
if(!list_empty(&ctx->poll_list))io_iopoll_getevents(ctx,&nr_events,0);elseinflight=0;mutex_unlock(&ctx->uring_lock);}else{/*
* Normal IO, just pretend everything completed.
* We don't have to poll completions for that.
*/// 非 iopoll 模式下,直接将 inflight 设置为 0
nr_events=inflight;}inflight-=nr_events;if(!inflight)timeout=jiffies+ctx->sq_thread_idle;}// 获取 sq ring 中的 sqe 数量
to_submit=io_sqring_entries(ctx);if(!to_submit){/*
* Drop cur_mm before scheduling, we can't hold it for
* long periods (or over schedule()). Do this before
* adding ourselves to the waitqueue, as the unuse/drop
* may sleep.
*/if(cur_mm){unuse_mm(cur_mm);mmput(cur_mm);cur_mm=NULL;}/*
* We're polling. If we're within the defined idle
* period, then let us spin without work before going
* to sleep.
*/// 有未完成的请求,或者还在 idle 时间内,就继续循环,暂缓请求的下发
if(inflight||!time_after(jiffies,timeout)){cond_resched();continue;}prepare_to_wait(&ctx->sqo_wait,&wait,TASK_INTERRUPTIBLE);/* Tell userspace we may need a wakeup call */ctx->rings->sq_flags|=IORING_SQ_NEED_WAKEUP;/* make sure to read SQ tail after writing flags */smp_mb();to_submit=io_sqring_entries(ctx);if(!to_submit){if(kthread_should_park()){finish_wait(&ctx->sqo_wait,&wait);break;}if(signal_pending(current))flush_signals(current);schedule();finish_wait(&ctx->sqo_wait,&wait);ctx->rings->sq_flags&=~IORING_SQ_NEED_WAKEUP;continue;}finish_wait(&ctx->sqo_wait,&wait);ctx->rings->sq_flags&=~IORING_SQ_NEED_WAKEUP;}/* Unless all new commands are FIXED regions, grab mm */if(!cur_mm){mm_fault=!mmget_not_zero(ctx->sqo_mm);if(!mm_fault){use_mm(ctx->sqo_mm);cur_mm=ctx->sqo_mm;}}to_submit=min(to_submit,ctx->sq_entries);// 提交请求
inflight+=io_submit_sqes(ctx,to_submit,cur_mm!=NULL,mm_fault);/* Commit SQ ring head once we've consumed all SQEs */// 更新 sq ring 的 head
io_commit_sqring(ctx);}set_fs(old_fs);if(cur_mm){unuse_mm(cur_mm);mmput(cur_mm);}revert_creds(old_cred);kthread_parkme();return0;}