zhuzilin's Blog

about

6.828 笔记10

date: 2019-03-25
tags: OS  6.828  

这里会记录阅读6.828课程lecture note的我的个人笔记。可能会中英混杂,不是很适合外人阅读,也请见谅。

Lecture 11: Coordination (sleep&wakeup)

sequence coordination

有的时候线程需要等待某些事件,比如

  • disk的读写
  • pipe reader to make space in pipe
  • child exit(wait)。

对于这些事件,如果就是用一个spinlock显然太浪费了。更好的解决方法是用一些可以原生的coordination primitives来把CPU yield出来,这些primitives包括:

  • sleep & wakeup (xv6)
  • condition variables (homework)
  • barriers (homework)

等等(注意sleep & wake的condition variable实际上有异曲同工之妙)

sleep & wakeup

sleep

// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void *chan, struct spinlock *lk) {
  struct proc *p = myproc();
  
  if(p == 0)
    panic("sleep");

  if(lk == 0)
    panic("sleep without lk");

  // Must acquire ptable.lock in order to
  // change p->state and then call sched.
  // Once we hold ptable.lock, we can be
  // guaranteed that we won't miss any wakeup
  // (wakeup runs with ptable.lock locked),
  // so it's okay to release lk.
  if(lk != &ptable.lock){  //DOC: sleeplock0
    acquire(&ptable.lock);  //DOC: sleeplock1
    release(lk);
  }
  // Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;

  sched();

  // Tidy up.
  p->chan = 0;

  // Reacquire original lock.
  if(lk != &ptable.lock){  //DOC: sleeplock2
    release(&ptable.lock);
    acquire(lk);
  }
}

sleep的思路很简单,就是把p->state设置为SLEEPING然后释放掉当前的锁,把p->chan设置为传入的channel,然后调用sched也就是进行context switch。在返回的时候重新acquire那个锁。

需要注意的是,对于sleep来acquire的锁必须要只能用sleep进行acquire,也就是不能有一个函数直接就acquire了,不然的话这边锁刚一释放,那边就直接acquire肯定是不行的。如果前后运行两个sleep,第二个会因为第一个已经acquire了ptable而卡在那个if的地方。而进行wake之后,需要等第一个进程把ptable.lock释放才行。而且还要注意最后的acquire(lk)会在实际应用函数中被抵消,如下面iderw的最后。

注意,在返回的时候应当是在ptable.lock是被aquire的状态的,所以直到锁重新被获取前是不会有中断的。这也是实现sleep的非常重要的一点。

然后是wakeup

// Wake up all processes sleeping on chan.
void
wakeup(void *chan)
{
  acquire(&ptable.lock);
  wakeup1(chan);
  release(&ptable.lock);
}

// Wake up all processes sleeping on chan.
// The ptable lock must be held.
static void
wakeup1(void *chan)
{
  struct proc *p;

  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
    if(p->state == SLEEPING && p->chan == chan)
      p->state = RUNNABLE;
}

wakeup就更简单了,把所有的在某个channel上的sleeping的channel的状态都改为RUNNABLE

注意有可能一次会有多个进程被唤醒了。比如说pipe对应的好几个进程都被唤醒了,但是只有1个能继续运行,对于其他的进行来说,最好继续sleep,所以sleep往往是用一个循环来进行的,来防止这种spurious wakeup。

sleep和wakeup的两个问题是

  • lost wakeup
  • termination while sleeping

xv6 usage example

我们来看一个xv6里面的例子,iderwideintr。这里IDE指Integrated Drive Electronics,可以理解成和硬盘的接口。

在看这两个函数之前,需要清楚ide.c中最重要的一个变量是idequeue,其保存了当前在处理的buffer queue,就是说会先处理idequeue对应的buffer,之后是idequeue->qnext的,一次类推。在在使用这个队列的时候不许要hold idelock

下面我们来看这两个函数,iderw表示对buffer进行读写处理。

// Sync buf with disk.
// If B_DIRTY is set, write buf to disk, clear B_DIRTY, set B_VALID.
// Else if B_VALID is not set, read buf from disk, set B_VALID.
void
iderw(struct buf *b) {
  struct buf **pp;

  if(!holdingsleep(&b->lock))
    panic("iderw: buf not locked");
  if((b->flags & (B_VALID|B_DIRTY)) == B_VALID)
    panic("iderw: nothing to do");
  if(b->dev != 0 && !havedisk1)
    panic("iderw: ide disk 1 not present");

  acquire(&idelock);  //DOC:acquire-lock

  // Append b to idequeue.
  b->qnext = 0;
  for(pp=&idequeue; *pp; pp=&(*pp)->qnext)  //DOC:insert-queue
    ;
  *pp = b;

  // Start disk if necessary.
  if(idequeue == b)
    idestart(b);

  // Wait for request to finish.
  while((b->flags & (B_VALID|B_DIRTY)) != B_VALID){
    sleep(b, &idelock);
  }

  release(&idelock);
}

就是把buf *b放在idequeue的最后面,如果不能马上处理这个buffer,就sleep,用的锁是idelock,channel是b。注意这里就用了前面说到的循环。

当disk read完成了的时候就会调用ideintr

// Interrupt handler.
void
ideintr(void)
{
  struct buf *b;

  // First queued buffer is the active request.
  acquire(&idelock);

  if((b = idequeue) == 0){
    release(&idelock);
    return;
  }
  idequeue = b->qnext;

  // Read data if needed.
  if(!(b->flags & B_DIRTY) && idewait(1) >= 0)
    insl(0x1f0, b->data, BSIZE/4);

  // Wake process waiting for this buf.
  b->flags |= B_VALID;
  b->flags &= ~B_DIRTY;
  wakeup(b);

  // Start disk on next buf in queue.
  if(idequeue != 0)
    idestart(idequeue);

  release(&idelock);
}

就是把读写完的东西处理完之后调用wakeup(b)

lost wakeup

如果iderw在调用sleep之前私自释放了idelock,那么在sleep之前,就有可能调用了ideintr,调用sleep之后就不会有wakeup了,这就导致了"lost wakeup"。

xv6 lost wakeup solution

目标:

  • 对于sleep的循环,不要在释放条件锁和p->state = SLEEPING之间调用wakeup
  • SLEEPING状态下释放condition lock

xv6的策略:

  • wakeup要hold ptable.lock和条件锁。
  • sleeper任何时候都hold ptable.lock或条件锁
  diagram:
    |----idelock----|
                  |---ptable.lock---|
                                     |----idelock----|
                                      |-ptable.lock-|

从而让这两个东西不能同时运行。这也是为什么需要sleep有一个lock argument。

人们还发明了不少sequence coordination primitives,它们都需要解决wakeup problem。如:

  • condition variables (similar to sleep/wakeup)
  • counting semaphores
  • wait queue (linux kernel)

another xv6 usage example

除了上面IDE的例子,另外一个使用了sleep & wakeup的是pipe。我们在shell的那次作业中使用过其接口,现在来看看其内部是如何实现的。

一个pipe的数据结构是这样的:

struct pipe {
  struct spinlock lock;
  char data[PIPESIZE];
  uint nread;     // number of bytes read
  uint nwrite;    // number of bytes written
  int readopen;   // read fd is still open
  int writeopen;  // write fd is still open
};

我们直接来看pipereadpipewrite:

int
pipewrite(struct pipe *p, char *addr, int n){
  int i;

  acquire(&p->lock);
  for(i = 0; i < n; i++){
    while(p->nwrite == p->nread + PIPESIZE){  //DOC: pipewrite-full
      if(p->readopen == 0 || myproc()->killed){
        release(&p->lock);
        return -1;
      }
      wakeup(&p->nread);
      sleep(&p->nwrite, &p->lock);  //DOC: pipewrite-sleep
    }
    p->data[p->nwrite++ % PIPESIZE] = addr[i];
  }
  wakeup(&p->nread);  //DOC: pipewrite-wakeup1
  release(&p->lock);
  return n;
}

int
piperead(struct pipe *p, char *addr, int n){
  int i;

  acquire(&p->lock);
  while(p->nread == p->nwrite && p->writeopen){  //DOC: pipe-empty
    if(myproc()->killed){
      release(&p->lock);
      return -1;
    }
    sleep(&p->nread, &p->lock); //DOC: piperead-sleep
  }
  for(i = 0; i < n; i++){  //DOC: piperead-copy
    if(p->nread == p->nwrite)
      break;
    addr[i] = p->data[p->nread++ % PIPESIZE];
  }
  wakeup(&p->nwrite);  //DOC: piperead-wakeup
  release(&p->lock);
  return i;
}

感觉就是一个1 writer 1 reader的writer reader problem。通过sleepwakeup实现需要的blocking。

注意这里的sleep对应的channel,以及在piperead的最后仍然是要wakeup(&p->nwrite)的。这是因为在读之前有可能p->data满了,从而write运行了sleep,读了一点之后就可以继续写入了,需要唤醒。

terminate sleeping thread

如果要kill一个sleeping的thread,用上一讲提到的给这个进程加一个p->kill的flag,等其唤醒之后进入中断来自己销毁是不合理的,因为进程可能要很久之后才能被唤醒。

xv6 terminate sleeping thread solution

kill的时候,把所有的SLEEPING都标记成RUNNABLE,也就是把他们都唤醒。

// Kill the process with the given pid.
// Process won't exit until it returns
// to user space (see trap in trap.c).
int
kill(int pid){
  struct proc *p;

  acquire(&ptable.lock);
  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
    if(p->pid == pid){
      p->killed = 1;
      // Wake process from sleep if necessary.
      if(p->state == SLEEPING)
        p->state = RUNNABLE;
      release(&ptable.lock);
      return 0;
    }
  }
  release(&ptable.lock);
  return -1;
}

但是注意sleep往往是在一个循环里的,有的函数在循环里面就检查了p->kill从而进行销毁,如piperead这是方便的,但是有的则不检查,如iderw,不检查的就会继续一段时间直到其进入中断了,如iderw会至少执行到system call结束。

  • 为什么iderw不检查p->kill
  • if reading, calling FS code expects to see data in the disk buffer!

    if writing (or reading), might be halfway through create()

    quitting now leaves on-disk FS inconsistent. (这里没太明白...)

在xv6中,如果kill的进程是user space,那么下一次有system call或者time interrupt的时候就会exit。而如果target在kernel,就target就不会再执行user instruction但是可能会在kernel停留很久(kernel这部分没明白...)

JOS Solution

  • lost wakeup

    JOS在kernel中禁止了中断,所以在condition check和sleep之间不会有wakeup

  • terminate while sleeping

    JOS没有像create这样的blocking multi-step system call,因为既没有file system,也没有disk driver。

    唯一个blocking call是IPC的recv()

    • if env_destroy() is running, the target thread is not running recv() leaves env in a state where it can be safely destroyed (不明白,应该要等用到recv再说吧...)