Thread,Runable

Thread源码

1
2
3
4
5
public class Thread implements Runnable {
	/* What will be run. */
  private Runnable target;
	...
}
  • 当Thread通过Runable作为构造参数创建时,会调用Runable具体实现类的run方法。如果线程是通过继承Thread构造的,则需要复写Thread的run方法。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    /**
     * If this thread was constructed using a separate
     * <code>Runnable</code> run object, then that
     * <code>Runnable</code> object's <code>run</code> method is called;
     * otherwise, this method does nothing and returns.
     * <p>
     * Subclasses of <code>Thread</code> should override this method.
     *
     * @see     #start()
     * @see     #stop()
     * @see     #Thread(ThreadGroup, Runnable, String)
     */
    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }
  • Thread接受Runable作为构造参数,内部通过init方法初始化构造参数

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    /**
    * Initializes a Thread.
    *
    * @param g the Thread group
    * @param target the object whose run() method gets called
    * @param name the name of the new Thread
    * @param stackSize the desired stack size for the new thread, or
    *        zero to indicate that this parameter is to be ignored.
    * @param acc the AccessControlContext to inherit, or
    *            AccessController.getContext() if null
    * @param inheritThreadLocals if {@code true}, inherit initial values for
    *            inheritable thread-locals from the constructing thread
    */
    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) 

Runable源码

1
2
3
public interface Runnable {
    public abstract void run();
}

多线程实现方式

继承Thread,并复写(override) run方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class PrimeThread extends Thread {
      long minPrime;
      PrimeThread(long minPrime) {
          this.minPrime = minPrime;
      }

      public void run() {
          // compute primes larger than minPrime
          &nbsp;.&nbsp;.&nbsp;.
      }
 }

创建线程并启动

1
2
PrimeThread p = new PrimeThread(143);
p.start();

通过Runable接口作为构造参数创建线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class PrimeRun implements Runnable {
     long minPrime;
     PrimeRun(long minPrime) {
         this.minPrime = minPrime;
     }

     public void run() {
         // compute primes larger than minPrime
         &nbsp;.&nbsp;.&nbsp;.
     }
 }

创建线程并启动

1
2
PrimeRun p = new PrimeRun(143);
new Thread(p).start();

Callable,Future,RunableFuture,FutureTask

Callable

是可返回一个结果或抛出异常的任务

1
2
3
4
5
6
7
8
9
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Future

代表一个异步计算的结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

RunableFuture

一个可被取消的异步计算

1
2
3
4
5
6
7
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

FutureTask

RunableFuture接口实现。当Runable作为构造参数时,会通过适配器模式将Runable转换为Callable,需要另外一个参数作为结果的异步计算结果的返回值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class FutureTask<V> implements RunnableFuture<V> {
	/** The underlying callable; nulled out after running */
  private Callable<V> callable;
  /** The result to return or exception to throw from get() */
  private Object outcome; // non-volatile, protected by state reads/writes
  /** The thread running the callable; CASed during run() */
  private volatile Thread runner;
  /** Treiber stack of waiting threads */
  private volatile WaitNode waiters;
  ...
  
  public FutureTask(Callable<V> callable) {
    if (callable == null)
      throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
  }
  public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
  }
  
  public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
  }    
}

Executor,ExecutorService,AbstractExecutorService,ThreadPoolExecutor

Executor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

ExecutorService

1
2
3
4
5
6
public interface ExecutorService extends Executor {
	<T> Future<T> submit(Callable<T> task);
	<T> Future<T> submit(Runnable task, T result);
	Future<?> submit(Runnable task);
	...
}

AbstractExecutorService

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public abstract class AbstractExecutorService implements ExecutorService {
	protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
  }
  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
  }
  public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
  }

  public <T> Future<T> submit(Runnable task, T result) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<T> ftask = newTaskFor(task, result);
      execute(ftask);
      return ftask;
  }

  public <T> Future<T> submit(Callable<T> task) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<T> ftask = newTaskFor(task);
      execute(ftask);
      return ftask;
  }
}

ThreadPoolExecutor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class ThreadPoolExecutor extends AbstractExecutorService {
	private final class Worker extends AbstractQueuedSynchronizer
        implements Runnable{
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    private boolean addWorker(Runnable firstTask, boolean core) {
        ...
        ...
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
    }

	}
}