HICSC
009 进程管理:用管程解决经典同步问题

管程可用于解决进程(或线程)的同步互斥问题,解决互斥问题的思路很简单,就是将对共享变量的操作封装起来,任何时候,只允许一个进程(或线程)进入即可。而解决同步问题,则是使用条件变量。因此,我们可以得到使用管程解决并发问题的代码模板:

整个模板中,最重要的问题就是如何识别出条件变量?由于条件变量是用于管理共享数据的并发访问的,因此,条件变量跟定义的共享数据息息相关。比如,使用管程实现阻塞队列,管程中的共享数据就是一个队列,那么自然而然的就推导出两个条件变量:队列已满队列为空

使用管程解决实际问题时,共享数据并不是固定。比如,哲学家进餐问题,使用管程解决时,可以把筷子作为共享数据,也可以把哲学家作为共享数据。而选择何种数据作为共享数据也没有什么标准,只要方便分析解决问题就好。

接下来,用几个示例,为你演示如何使用管程解决经典的同步问题。

管程可用于解决进程或线程的同步问题,但大多数实际场景下,都是处理线程间的同步互斥问题,因此,本文的所有实例均基于线程编写。另外,所有代码均使用Java语言编写。

哲学家就餐问题

有五个哲学家共用一张圆桌,分别坐在周围的五张椅子上,在圆桌上五只筷子,他们的生活方式是交替地进行思考和进餐。平时,一个哲学家进行思考,饥饿时便试图取用其左右两边的筷子,只有在他拿到两只筷子时才能进餐。 进餐完毕,放下筷子继续思考。

前文已经提到,使用管程解题时,可以把筷子作为共享数据,也可以把哲学家作为共享数据。大家可以暂停一下,自己在纸上画画,使用这两种数据作为共享数据时,该如何解决?

哲学家作为共享数据

首先来看第一种解法,哲学家作为共享数据。先给出管程的定义:

public class PhilosopherMonitor {
    // 1.一把锁
    Lock mutex = new ReentrantLock();
    // 2.共享数据
    State[] states = new State[5];
    enum State {THINKING, HUNGRY, EATING}
    // 3.条件变量
    Semaphore[] conditions = new Semaphore[5];
    // 4.初始化代码
    public PhilosopherMonitor() {
        for (int i = 0; i < 5; i++) {
            states[i] = State.THINKING;
            conditions[i] = new Semaphore(1);
        }
    }
    // 5.对外方法
    void pickup(int i);
    void putdown(int i);

管程定义的详细介绍:

解析来看看两个对外的方法代码:

/**
 * 第i位哲学家拿起筷子进餐
 */
void pickup(int i) throws InterruptedException {
    mutex.lock();
    // 哲学家拿起筷子,说明他饿了,想要进食,所以先更新其状态
    states[i] = State.HUNGRY;
    // 测试,看看哲学家是否可以拿到筷子
    test(i);
    mutex.unlock();
    // 如果没能拿到筷子,则进入等待状态
    if (states[i] != State.EATING) {
        conditions[i].acquire();
    }
}

/**
 * 第i位哲学家放下筷子
 */
void putdown(int i) throws InterruptedException {
    mutex.lock();
    // 哲学家完成进食,继续进入思考状态
    states[i] = State.THINKING;
    // 测试一下左右两边的哲学家,看看其能否进食
    test(left(i));
    test(right(i));
    mutex.unlock();
}

/**
 * 就餐测试:判断哲学家i能否拿到筷子,如果能拿到,直接进入进食状态,
 * 否则直接返回,进入等待状态
 */
void test(int i) {
    // 哲学家本身处理饥饿状态,且邻居都没有进食
    if (states[i] == State.HUNGRY && 
            states[left(i)] != State.EATING && 
            states[right(i)] != State.EATING) {
        System.out.println(“哲学家” + i + “开始进餐”);
        states[i] = State.EATING;
        conditions[i].release();
    }
}

private int left(int i) { return (i - 1 + 5) % 5;}
private int right(int i) { return (i + 1) % 5;}

代码中的left和right方法表示获取该哲学家左右邻居。整个代码还是比较好理解的,就说一点,为什么条件变量要使用信号量,答案就是在test()方法中,由于test方法会在多个地方调用,在调用test时,并不能保证condition[i]已经调用acquire方法。我们知道在调用Condition的signal方法之前,一定要先调用起await方法,否则就会出错,而使用信号量则不存在这个问题。所以,这里完全就是为了讨巧而已。当然,如果把test方法的逻辑放回到两个方法中,你完全可以使用lock + condition来处理。再强调一下,这里仅是方便展示逻辑而已,你也可以尝试自己写一下。

最后看下测试用例:

class Philosopher implements Runnable {
    private int index;
  public Philosopher(int index) {
          this.index = index;
     }
  @Override
    public void run() {
          while (true) {
            thinking(); // 随机暂停几秒
          monitor.pickup(index);
          eating(); // 随机暂停几秒
          monitor.putdown(index);
     }
    }
}

public class PhilosopherTest {
    public static void main(String[] args) {
        for (int I = 0; I < 5; I++) {
            new Thread(new Philosopher(i)).start();
        }
    }
}
筷子作为共享变量

再来看第2种解法,筷子作为共享数据,主要思路是有5根筷子,用一个boolean数组来存放筷子的状态,哲学家每次拿起和放下的都是两根筷子,比如哲学家拿到第1根筷子,但没有拿到第二根,那么就等待第二根筷子。就直接贴代码,其它看看注释就好。

public class PhilosopherMonitor {
    // 2.共享数据
    boolean[] fork = new boolean[5];
    // 3.条件变量
    Semaphore[] conditions = new Semaphore[5];
    // 4.初始化代码
    public PhilosopherMonitor() {
        for (int i = 0; i < 5; i++) {
            conditions[i] = new Semaphore(1);
        }
    }
    // 5.对外方法
    /**
     * 拿起第i和i+1根筷子
     */
    void pickup(int i) throws InterruptedException {
        int left = i;
        int right = (i + 1) % 5;
        if (!fork[left]) {
            conditions[left].acquire();
        }
        fork[left] = false;
        if (!fork[right]) {
            conditions[right].acquire();
        }
        fork[right] = false;
    }

    /**
     * 放下第i和i+1根筷子
     */
    void putdown(int i) {
        int left = i;
        int right = (i + 1) % 5;
        if (conditions[left].tryAcquire()) {
            // 如果没有人等待这只筷子,则直接改变状态
            fork[left] = true;
        } else {
            // 有人等待这跟筷子,唤醒等待
            conditions[left].release();
        }
        if (conditions[right].tryAcquire()) {
            // 如果没有人等待这只筷子,则直接改变状态
            fork[right] = true;
        } else {
            // 有人等待这跟筷子,唤醒等待
            conditions[right].release();
        }
    }
}

需要注意的是,这里是不需要锁来保证互斥的,因为信号量本来就可以保证每根筷子同一时刻只能有一个线程获取到。那为什么前一种解法需要锁来保证互斥呢,还是因为test方法,因为在放下筷子时,会同时检查左右邻居的状态,而在拿起筷子时,会检查自己的状态,这就有可能导致,同一时刻,有多个线程改变同一个哲学家的状态,所以需要加入锁来保证互斥。

生产者-消费者问题

生产者消费者问题中包含三个角色:生产者(Producer),消费者(Consumer)以及一个固定大小的缓冲区(Buffer)。生产者干两件事:制造数据并且在缓冲区未满时写入,否则等待;在缓冲区为空的情况下写入数据并唤醒消费者读取数据;消费者在缓冲区不为空时读出数据,否则等待;在缓冲区满的情况下消费数据并唤醒生产者写入数据。

生产者消费者问题,可以使用信号量解决,也可以使用管程解决。其实解题方法在上文已经介绍过,这里再重复一下。还是先看管程定义:

public class ProducerConsumerMonitor {
    private final int capacity;
    // 1.一把锁
    Lock mutex = new ReentrantLock();
    // 2.共享数据
    List<Integer> data;
    // 3.条件变量
    // 条件变量:队列不满
    final Condition notFull = mutex.newCondition();
    // 条件变量:队列不空
    final Condition notEmpty = mutex.newCondition();
    // 4.初始化
    public ProducerConsumerMonitor(int capacity) {
        this.data = new ArrayList<>(capacity);
        this.capacity = capacity;
    }
    // 5.对外方法
    void procedure(int item) {}
    int comsumer() {}
}

生产者和消费者的具体实现如下:

// 5.对外方法
public void procedure(int item) throws InterruptedException {
    mutex.lock();
    try {
        while (data.size() == capacity) {
            notFull.await();
        }
        data.add(item);
        // 入队后,通知可出队
        notEmpty.signal();
    } finally {
        mutex.unlock();
    }
}

public int comsumer() throws InterruptedException {
    mutex.lock();
    try {
        while (data.isEmpty()) {
            notEmpty.await();
        }
        int result = data.get(0);
        data.remove(0);
        // 入队后,通知可出队
        notFull.signal();
        return result;
    } finally {
        mutex.unlock();
    }
}

测试用例:

public static void main(String[] args) {
    final ProducerConsumerMonitor monitor = new ProducerConsumerMonitor(2);

    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            int item = new Random().nextInt(100);
            monitor.procedure(item);
            System.out.println(“生产者生产:” + item);
        }).start();
    }

    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            System.out.println(“消费者消费:” + monitor.comsumer())
        }).start();
    }
}

读者-写者问题

首先看下问题描述:对于一个共享数据,有两类使用者:

读者-写者问题需要考虑的情况非常多,比如:

本文采用写者优先的策略,首先定义管程:

public class RwMonitor {
    // 1.锁
    Lock lock = new ReentrantLock();
    // 2.共享数据
    int activeReader = 0; // 正在读数据的读者数量
    int activeWriter = 0; // 正在写数据的写者数量
    int waitReader = 0; // 等待的读者数量
    int waitWriter = 0; // 等待的写者数量
    // 3.条件变量
    Condition canRead = lock.newCondition();
    Condition canWrite = lock.newCondition();
    // 4.初始化
    // 5.对外方法
    public void read();
    public void write();
}

然后看下具体实现,非常简单,具体可以看注释:

public void read() {
    try {
        startRead();
        reading();
        doneRead();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public void write() {
    try {
        startWrite();
        writing();
        doneWrite();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

private void startRead() throws InterruptedException {
    lock.lock();
    try {
        System.out.println(“[reader thread] waitWriter=“ + waitWriter);
        // 只要有等待的写者或进行中的写者,那么读者等待
        while (activeWriter + waitWriter > 0) {
            waitReader++;
            canRead.await();
            waitReader—;
        }
        // 这里的activeReader操作最好放到reading()方法里面
        // 遇到异常情况返回时,及时减下来
        activeReader++;
    } finally {
        lock.unlock();
    }
}

private void doneRead() {
    lock.lock();
    try {
        activeReader—;
        // 等待的读者数量 = 0,等待的写者数量 > 0
        if (activeReader == 0 && waitWriter > 0) {
            // 唤醒写者
            canWrite.signal();
        }
    } finally {
        lock.unlock();
    }
}

private void startWrite() throws InterruptedException {
    lock.lock();
    try {
        // 只要有正在读的读者或者正在写的写者,则等待
        while (activeWriter + activeReader > 0) {
            // 等待数量+1
            waitWriter++;
            System.out.println(“[writer thread] waitWriter=“ + waitWriter);
            canWrite.await();
            // 被唤醒后,等待数量-1
            waitWriter—;
        }
        activeWriter++;
    } finally {
        lock.unlock();
    }
}

private void doneWrite() {
    lock.lock();
    try {
        activeWriter—;
        // 写完后,如果还有等待写的写者,则唤醒
        if (waitWriter > 0) {
            canWrite.signal();
        } else if (waitReader > 0) {
            // 没有等待的写者了,唤醒读者
            canRead.signalAll();
        }
    } finally {
        lock.unlock();
    }
}


/**
 * 模拟读取数据
 */
private void reading() {
    try {
        TimeUnit.SECONDS.sleep(new Random().nextInt(5));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(“read data”);
}

/**
 * 模拟读取数据
 */
private void writing() {
    try {
        TimeUnit.SECONDS.sleep(new Random().nextInt(5));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(“write data”);
}

重点关注如下内容:

  1. 在Java中Condition和Lock是配合使用的,也就是说在使用condition.await()之前,需要lock.lock()。这点从condition变量的声明也可以看出来。其实Lock是基于AQS实现的,而lock.newCondition()会创建一个AQS的ConditionObject的内部类。AQS内部维护了一个同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列,同样地,condition内部也是使用同样的方式,内部维护了一个等待队列,所有调用condition.await方法的线程会加入到等待队列中,并且线程状态转换为等待状态。
  2. 基于第一点,代码中调用lock.lock后,线程占有锁,而当调用canRead或者canWrite的await()方法后,会释放独占锁。
  3. 判断是写者优先还是读者优先,注意看startRead和startWrite方法中的while循环条件,在startRead中,只要有等待的写照,后来的读者都会被阻塞,因此,是写者优先。
  4. 注意doneWrite方法的最后是调用canRead.signalAll(),因为是写者优先,因此,所有的写者完成以后,可能已经有多个读者等待,所以,需要signalAll
  5. 为什么要把读写方法分为准备方法startXXX和收尾的方法doneXXX?这是因为,读写操作可能会很耗时,如果揉到一起,会导致锁的时间很长,影响系统性能。在实际开发中,也要注意这一点,锁的范围需要尽可能小。

接下来是测试用例:

public class RwMonitorTest {
    public static void main(String[] args) {
        final RwMonitor monitor = new RwMonitor();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> monitor.read()).start();
            new Thread(() -> monitor.write()).start();
        }
    }
}

运行后,你可以看到如下运行结果:

read data
write data
write data
write data
…
read data
read data

即:前面一两个读线程完成后,接下了会是10个写线程,然后再是剩下的读线程,与前面预测的一致。

需要注意一点是,测试用例不要分开创建读写线程,比如,先创建10个读者,再创建10个写者:

for (int i = 0; i < 10; i++) {
        new Thread(() -> monitor.read()).start();
}
for (int i = 0; i < 10; i++) {
        new Thread(() -> monitor.write()).start();
}

这样做会导致10个读者完成后,再进行10个写,因为startRead方法快速执行完成,所有的读线程均为等待,后面的10个写线程,只能等所有的读线程完成后再执行。

结语

在JDK中,并发工具包的基础是AQS,而AQS就是Java版管程的具体实现,因此,在理解管程原理的基础上,再想想如何利用管程来解决实际问题后,再去看AQS的源码会事半功倍。这也是为什么写AQS源码分析的文章很多,但基本上看不懂,因为没有从整体上理解AQS的实现思路,越看就越容易被绕进去。

就比如,在使用condition.await时,根据管程的原理,被阻塞的线程会被放到与之对应的阻塞列表中,我们的实现代码中好像并没有啊,其实你去看看AQS的源码就知道了,它在内部实现了一个双向链表Node,在存放阻塞线程时,会把线程包装成Node节点对象。AQS中的很多方法都是Node的实现、如何把线程包装成Node、如何把线程放到这个链表中、在释放时如何从链表中取节点等等。

当你理解这些以后,再去看AQS的源码,就有清晰的思路了。

还有一点需要注意的是,用管程解决实际问题的时候,不一定要想文中示例这样,把所有逻辑都扔到一个类文件实现,这里仅仅是简单的示例而已。

最后,希望这两篇文章可以更好的帮助你理解并发编程的基础,从下篇开始,会说说进程间的通信问题。

封面图:Marty Garcia on Unsplash

Comments

Post a Message

人生在世,错别字在所难免,无需纠正。

提交评论