通过动态Resize扩展Java信号量

在创建并发系统的时,信号量是一种非常有用的机制或者说是工具。如果你不熟悉信号量的使用,基本的想法或者简单的理解信号量,就是允许确定数量的许可被释放,任何想要申请获得一个许可,如果没有可用的许可时,会被阻塞直到有许可被释放为止。

申请和释放信号量的许可

Java的并发库中包含有一个名为Semaphore的实现类。一个信号量需要被初始化为允许的许可数目。举个例子,使用new Semaphore(2)实例化了一个信号量,前两次acquire()方法会获得许可立即返回,但是第三次acquire()会阻塞直到有其他线程调用release()方法。

典型的运用就是线程在访问共享资源前调用acquire()申请许可,访问完成后再调用release()方法释放许可。这就意味着使用信号量作并发控制的时候释放或者申请Lock完全由程序员决定,与只使synchronized的方法或者block形成明显的对比。

如果你的调用代码段在申请了许可后会抛出异常,记得把release()方法放进finally代码段中,保证即使异常发生也能够正确释放申请的许可。如果每次申请了许可acquire()后不调用release()释放的话,这样最终许可会被耗尽,运用程序也会进入死锁状态。如果在某些情况下你调用了release()不止一次的话,你会面临另外一个问题(很难调试发现的问题):信号量的许可数比你预先设计的会多。

// 不正确的方法 -- semaphore not released if the exception is thrown

try {

    sem.acquire();

    sharedResource.doStuff();

    sem.release();

} catch (IOException e) {

    logger.warn("The resource is broken", e);

}

// 正确的方法

try {

    sem.acquire();

    sharedResource.doStuff();

} catch (IOException e) {

    logger.warn("The resource is broken", e);

} finally {

    sem.release();

}

只要这是你想要的这已经很好了,你也应该很满意了(除非你还想要一个或者)

 

动态的增加许可数

但是,如果想要在信号量已经初始化后,改变信号量的许可数该怎么办?这通常在长时间运行的程序,需要改变信号量而又不需要重新启动程序时会很有用。问题的一半其实很简单:注意到方法acquire()release()。调用acquire()方法没有返回值,因此release()方法不用使用任何参数,这就意味着如果想增加可使用的许可数,我们可以简单的调用release()我们想要的次就可以达到目的,而事实上并没有什么具体的释放。

Semaphore sem = new Semaphore(3);

sem.release();

sem.release();

for (int i = 0; i < 5; i++) {

    sem.acquire();

    System.out.println("Acquired a permit");

}

因此,增加许可数是很简单的,但是减少许可数就有点棘手了。假设你有一个Semaphore实例,拥有10许可数(不管是通过初始化,还是通过调用release()获得的现在我们知道它们效果是一样的),现在我们想只允许更少的线程同时访问共享资源。

动态减少许可数

具体而言,如果有10个线程拥有许可,并且新的许可数是7个,我们不会去告诉每一个线程它们现在已经不符合新的规范。我们所能做就只能是保证下一个acquire()会阻塞直到有足够的线程(准确的是4)已经调用了release()方法,使得许可数达到新的许可数限制。去打断已经拥有许可的线程,但又不在新限制约束范围内是不可取的,那已经超出了简单信号量的讨论范围了。

如果你想从新配置它只允许7个许可数,你可以有几种选择:

  • 调用acquire()三次,这可能会有效,也可能会永久阻塞。如果有超过7个的许可数在使用当中,其他的acquire()调用会阻塞任意长的时间。
  • 使用availablePermits()来检查和调试也不会有什么帮助的。使用availablePermits()来确定一个acquire()调用会不会阻塞是一种经典的check-then-act race conditon,但是这也不会解决问题,虽然不会阻塞,但是availablePermits()返回的结果也只会是0;不断的调drainPermits()直到获有3个许可被获取,然后释其他的许可来满足你的需要,这样调用也会造成任意长时间的等待;
  • 也可以不断调用tryAcquire()直到获得3个许可,这样同样会噪声任意长时间的等待。

以上方法都有两个缺点:

  • 最重要的是,他们需要阻塞,直到他们想要的许可数都可用。
  • 间接的实现目的,都没有直接实现目标

确切的说,目标是信号量应释放较少的许可证给正在使用信号量来控制访问共享资源的线程。获取(大概没有释放)许可证是实现目标的方法之一,但不一定是唯一的方法。换句话说,正在执行的重新配置许可数量的线程并不需要在技术上需要获得许可(无论是通过acquire()drainPermits()tryAcquire()等),来满足这个要求,因为它不需要访问信号量正在控制的共享资源的。

使用reducePermits()扩展信号量

幸运的是,有一个方法可以帮助我们得到我们想要的:reducePermits()。但是这个方法是受保护的,所以你需要些一个子类来暴露这个方法。一旦创建了这样的一个子类,我们就可以通过调用release()方法来增加许可数,调用reducePermits() 来减少许可数,而不会造成线程的阻塞。你如果你想做的只是随意的增加或者减少许可数量,你应该已经可以得到你想要的了,听起来是不是很棒啊!有时,我们可 能更想直接调整新的许可数,而不用关注调整之前的许可数。可以通过一个额外的变量来记录许可数,与新设置的许可数相比,来决定是增加还是减少许可。

下面提供了一个简单的继承Semaphore类来暴露reducePermits()方法的实现。里面包含了一个AdjustableSemaphore来暴露两个最常用的方法release()acquire()包含了@ThreadSafe线程安全的标记(在jcip.net中定义的,jcipJava Concurrency In Practice的缩写)。如果你不想在你的项目中使用jcip标记,可以移除@ThreadSafe和相关的引用,但还是希望能够尽量使用它。它们可以提供很严格的注释信息,却不会修改编译和运行环境,但是却非常有用在你想知道一个类的作者提供什么样的线程安全的时候。这在修改别人的代码的时候是很需要的,你不想把别人的线程安全代码无知的改为不安全吧?

package com.genius.concurrency;

import java.util.concurrent.Semaphore;

import net.jcip.annotations.ThreadSafe;

/**

 * A simple implementation of an adjustable semaphore.

 *

 * Copyright © 2008, Genius.com

 *

 * @author Marshall Pierce <marshall@genius.com>

 */

@ThreadSafe

final public class AdjustableSemaphore {

    /**

     * semaphore starts at 0 capacity; must be set by setMaxPermits before use

     */

    private final ResizeableSemaphore semaphore = new ResizeableSemaphore();

    /**

     * how many permits are allowed as governed by this semaphore.

     * Access must be synchronized on this object.

     */

    private int maxPermits = 0;

    /**

     * New instances should be configured with setMaxPermits().

     */

    public AdjustableSemaphore() {

        // no op

    }

    /*

     * Must be synchronized because the underlying int is not thread safe

     */

    /**

     * Set the max number of permits. Must be greater than zero.

     *

     * Note that if there are more than the new max number of permits currently

     * outstanding, any currently blocking threads or any new threads that start

     * to block after the call will wait until enough permits have been released to

     * have the number of outstanding permits fall below the new maximum. In

     * other words, it does what you probably think it should.

     *

     * @param newMax

     */

    synchronized void setMaxPermits(int newMax) {

        if (newMax < 1) {

            throw new IllegalArgumentException("Semaphore size must be at least 1,"

                + " was " + newMax);

        }

        int delta = newMax - this.maxPermits;

        if (delta == 0) {

            return;

        } else if (delta > 0) {

            // new max is higher, so release that many permits

            this.semaphore.release(delta);

        } else {

            delta *= -1;

            // delta < 0.

            // reducePermits needs a positive #, though.

            this.semaphore.reducePermits(delta);

        }

        this.maxPermits = newMax;

    }

    /**

     * Release a permit back to the semaphore. Make sure not to double-release.

     *

     */

    void release() {

        this.semaphore.release();

    }

    /**

     * Get a permit, blocking if necessary.

     *

     * @throws InterruptedException

     *             if interrupted while waiting for a permit

     */

    void acquire() throws InterruptedException {

        this.semaphore.acquire();

    }

    /**

     * A trivial subclass of <code>Semaphore</code> that exposes the reducePermits

     * call to the parent class. Doug Lea says it's ok...

     * http://osdir.com/ml/java.jsr.166-concurrency/2003-10/msg00042.html

     *

     * Copyright © 2009, Genius.com

     *

     * @author Marshall Pierce <marshall@genius.com>

     *

     */

    private static final class ResizeableSemaphore extends Semaphore {

        /**

         *

         */

        private static final long serialVersionUID = 1L;

 

        /**

         * Create a new semaphore with 0 permits.

         */

        ResizeableSemaphore() {

            super(0);

        }

 

        /*

         * (non-Javadoc)

         *

         * @see java.util.concurrent.Semaphore#reducePermits(int)

         */

        @Override

        protected void reducePermits(int reduction) {

            super.reducePermits(reduction);

        }

    }

}

这只是一个基本的实现,你可以按照你的需要,限制许可数在一定的范围内,或者是提供一个构造函数来接受参数确定初始化的许可数,或者允许负数的许可数,或者暴露更多的信号量方法… …

csharper -