date: 2019-03-25
tags: OS 6.828
这里会记录阅读6.828课程lecture note的我的个人笔记。可能会中英混杂,不是很适合外人阅读,也请见谅。
有的时候线程需要等待某些事件,比如
wait
)。对于这些事件,如果就是用一个spinlock显然太浪费了。更好的解决方法是用一些可以原生的coordination primitives来把CPU yield出来,这些primitives包括:
等等(注意sleep & wake的condition variable实际上有异曲同工之妙)
sleep
// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void *chan, struct spinlock *lk) {
struct proc *p = myproc();
if(p == 0)
panic("sleep");
if(lk == 0)
panic("sleep without lk");
// Must acquire ptable.lock in order to
// change p->state and then call sched.
// Once we hold ptable.lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup runs with ptable.lock locked),
// so it's okay to release lk.
if(lk != &ptable.lock){ //DOC: sleeplock0
acquire(&ptable.lock); //DOC: sleeplock1
release(lk);
}
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
if(lk != &ptable.lock){ //DOC: sleeplock2
release(&ptable.lock);
acquire(lk);
}
}
sleep
的思路很简单,就是把p->state
设置为SLEEPING
然后释放掉当前的锁,把p->chan
设置为传入的channel,然后调用sched
也就是进行context switch。在返回的时候重新acquire
那个锁。
需要注意的是,对于sleep来acquire的锁必须要只能用sleep进行acquire,也就是不能有一个函数直接就acquire了,不然的话这边锁刚一释放,那边就直接acquire肯定是不行的。如果前后运行两个sleep,第二个会因为第一个已经acquire了ptable
而卡在那个if
的地方。而进行wake之后,需要等第一个进程把ptable.lock
释放才行。而且还要注意最后的acquire(lk)
会在实际应用函数中被抵消,如下面iderw
的最后。
注意,在返回的时候应当是在ptable.lock
是被aquire的状态的,所以直到锁重新被获取前是不会有中断的。这也是实现sleep的非常重要的一点。
然后是wakeup
// Wake up all processes sleeping on chan.
void
wakeup(void *chan)
{
acquire(&ptable.lock);
wakeup1(chan);
release(&ptable.lock);
}
// Wake up all processes sleeping on chan.
// The ptable lock must be held.
static void
wakeup1(void *chan)
{
struct proc *p;
for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
if(p->state == SLEEPING && p->chan == chan)
p->state = RUNNABLE;
}
wakeup
就更简单了,把所有的在某个channel上的sleeping的channel的状态都改为RUNNABLE
。
注意有可能一次会有多个进程被唤醒了。比如说pipe对应的好几个进程都被唤醒了,但是只有1个能继续运行,对于其他的进行来说,最好继续sleep
,所以sleep
往往是用一个循环来进行的,来防止这种spurious wakeup。
sleep和wakeup的两个问题是
我们来看一个xv6里面的例子,iderw
和ideintr
。这里IDE指Integrated Drive Electronics,可以理解成和硬盘的接口。
在看这两个函数之前,需要清楚ide.c
中最重要的一个变量是idequeue
,其保存了当前在处理的buffer queue,就是说会先处理idequeue
对应的buffer,之后是idequeue->qnext
的,一次类推。在在使用这个队列的时候不许要hold idelock
。
下面我们来看这两个函数,iderw
表示对buffer进行读写处理。
// Sync buf with disk.
// If B_DIRTY is set, write buf to disk, clear B_DIRTY, set B_VALID.
// Else if B_VALID is not set, read buf from disk, set B_VALID.
void
iderw(struct buf *b) {
struct buf **pp;
if(!holdingsleep(&b->lock))
panic("iderw: buf not locked");
if((b->flags & (B_VALID|B_DIRTY)) == B_VALID)
panic("iderw: nothing to do");
if(b->dev != 0 && !havedisk1)
panic("iderw: ide disk 1 not present");
acquire(&idelock); //DOC:acquire-lock
// Append b to idequeue.
b->qnext = 0;
for(pp=&idequeue; *pp; pp=&(*pp)->qnext) //DOC:insert-queue
;
*pp = b;
// Start disk if necessary.
if(idequeue == b)
idestart(b);
// Wait for request to finish.
while((b->flags & (B_VALID|B_DIRTY)) != B_VALID){
sleep(b, &idelock);
}
release(&idelock);
}
就是把buf *b
放在idequeue
的最后面,如果不能马上处理这个buffer,就sleep
,用的锁是idelock
,channel是b
。注意这里就用了前面说到的循环。
当disk read完成了的时候就会调用ideintr
。
// Interrupt handler.
void
ideintr(void)
{
struct buf *b;
// First queued buffer is the active request.
acquire(&idelock);
if((b = idequeue) == 0){
release(&idelock);
return;
}
idequeue = b->qnext;
// Read data if needed.
if(!(b->flags & B_DIRTY) && idewait(1) >= 0)
insl(0x1f0, b->data, BSIZE/4);
// Wake process waiting for this buf.
b->flags |= B_VALID;
b->flags &= ~B_DIRTY;
wakeup(b);
// Start disk on next buf in queue.
if(idequeue != 0)
idestart(idequeue);
release(&idelock);
}
就是把读写完的东西处理完之后调用wakeup(b)
。
如果iderw
在调用sleep
之前私自释放了idelock
,那么在sleep
之前,就有可能调用了ideintr
,调用sleep
之后就不会有wakeup
了,这就导致了"lost wakeup"。
目标:
sleep
的循环,不要在释放条件锁和p->state = SLEEPING
之间调用wakeup
。SLEEPING
状态下释放condition lockxv6的策略:
wakeup
要hold ptable.lock
和条件锁。ptable.lock
或条件锁 diagram:
|----idelock----|
|---ptable.lock---|
|----idelock----|
|-ptable.lock-|
从而让这两个东西不能同时运行。这也是为什么需要sleep
有一个lock argument。
人们还发明了不少sequence coordination primitives,它们都需要解决wakeup problem。如:
除了上面IDE的例子,另外一个使用了sleep & wakeup的是pipe
。我们在shell
的那次作业中使用过其接口,现在来看看其内部是如何实现的。
一个pipe的数据结构是这样的:
struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
int readopen; // read fd is still open
int writeopen; // write fd is still open
};
我们直接来看piperead
和pipewrite
:
int
pipewrite(struct pipe *p, char *addr, int n){
int i;
acquire(&p->lock);
for(i = 0; i < n; i++){
while(p->nwrite == p->nread + PIPESIZE){ //DOC: pipewrite-full
if(p->readopen == 0 || myproc()->killed){
release(&p->lock);
return -1;
}
wakeup(&p->nread);
sleep(&p->nwrite, &p->lock); //DOC: pipewrite-sleep
}
p->data[p->nwrite++ % PIPESIZE] = addr[i];
}
wakeup(&p->nread); //DOC: pipewrite-wakeup1
release(&p->lock);
return n;
}
int
piperead(struct pipe *p, char *addr, int n){
int i;
acquire(&p->lock);
while(p->nread == p->nwrite && p->writeopen){ //DOC: pipe-empty
if(myproc()->killed){
release(&p->lock);
return -1;
}
sleep(&p->nread, &p->lock); //DOC: piperead-sleep
}
for(i = 0; i < n; i++){ //DOC: piperead-copy
if(p->nread == p->nwrite)
break;
addr[i] = p->data[p->nread++ % PIPESIZE];
}
wakeup(&p->nwrite); //DOC: piperead-wakeup
release(&p->lock);
return i;
}
感觉就是一个1 writer 1 reader的writer reader problem。通过sleep
和wakeup
实现需要的blocking。
注意这里的sleep
对应的channel,以及在piperead
的最后仍然是要wakeup(&p->nwrite)
的。这是因为在读之前有可能p->data
满了,从而write运行了sleep
,读了一点之后就可以继续写入了,需要唤醒。
如果要kill
一个sleeping的thread,用上一讲提到的给这个进程加一个p->kill
的flag,等其唤醒之后进入中断来自己销毁是不合理的,因为进程可能要很久之后才能被唤醒。
在kill
的时候,把所有的SLEEPING
都标记成RUNNABLE
,也就是把他们都唤醒。
// Kill the process with the given pid.
// Process won't exit until it returns
// to user space (see trap in trap.c).
int
kill(int pid){
struct proc *p;
acquire(&ptable.lock);
for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
if(p->pid == pid){
p->killed = 1;
// Wake process from sleep if necessary.
if(p->state == SLEEPING)
p->state = RUNNABLE;
release(&ptable.lock);
return 0;
}
}
release(&ptable.lock);
return -1;
}
但是注意sleep
往往是在一个循环里的,有的函数在循环里面就检查了p->kill
从而进行销毁,如piperead
这是方便的,但是有的则不检查,如iderw
,不检查的就会继续一段时间直到其进入中断了,如iderw
会至少执行到system call结束。
iderw
不检查p->kill
?if reading, calling FS code expects to see data in the disk buffer!
if writing (or reading), might be halfway through create()
quitting now leaves on-disk FS inconsistent. (这里没太明白...)
在xv6中,如果kill
的进程是user space,那么下一次有system call或者time interrupt的时候就会exit
。而如果target在kernel,就target就不会再执行user instruction但是可能会在kernel停留很久(kernel这部分没明白...)
lost wakeup
JOS在kernel中禁止了中断,所以在condition check和sleep之间不会有wakeup
terminate while sleeping
JOS没有像create这样的blocking multi-step system call,因为既没有file system,也没有disk driver。
唯一个blocking call是IPC的recv()
recv
再说吧...)