// 内部类 publicclassConditionObjectimplementsCondition, java.io.Serializable{ // 版本号 privatestaticfinallong serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ // condition队列的头结点 privatetransient Node firstWaiter; /** Last node of condition queue. */ // condition队列的尾结点 privatetransient Node lastWaiter;
/** * Creates a new {@code ConditionObject} instance. */ // 构造方法 publicConditionObject(){ }
// Internal methods
/** * Adds a new waiter to wait queue. * @return its new wait node */ // 添加新的waiter到wait队列 private Node addConditionWaiter(){ // 保存尾结点 Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { // 尾结点不为空,并且尾结点的状态不为CONDITION // 清除状态为CONDITION的结点 unlinkCancelledWaiters(); // 将最后一个结点重新赋值给t t = lastWaiter; } // 新建一个结点 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) // 尾结点为空 // 设置condition队列的头结点 firstWaiter = node; else// 尾结点不为空 // 设置为节点的nextWaiter域为node结点 t.nextWaiter = node; // 更新condition队列的尾结点 lastWaiter = node; return node; }
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ privatevoiddoSignal(Node first){ // 循环 do { if ( (firstWaiter = first.nextWaiter) == null) // 该节点的nextWaiter为空 // 设置尾结点为空 lastWaiter = null; // 设置first结点的nextWaiter域 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头结点不为空,一直循环 }
/** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ privatevoiddoSignalAll(Node first){ // condition队列的头结点尾结点都设置为空 lastWaiter = firstWaiter = null; // 循环 do { // 获取first结点的nextWaiter域结点 Node next = first.nextWaiter; // 设置first结点的nextWaiter域为空 first.nextWaiter = null; // 将first结点从condition队列转移到sync队列 transferForSignal(first); // 重新设置first first = next; } while (first != null); }
/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ // 从condition队列中清除状态为CANCEL的结点 privatevoidunlinkCancelledWaiters(){ // 保存condition队列头结点 Node t = firstWaiter; Node trail = null; while (t != null) { // t不为空 // 下一个结点 Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { // t结点的状态不为CONDTION状态 // 设置t节点的额nextWaiter域为空 t.nextWaiter = null; if (trail == null) // trail为空 // 重新设置condition队列的头结点 firstWaiter = next; else// trail不为空 // 设置trail结点的nextWaiter域为next结点 trail.nextWaiter = next; if (next == null) // next结点为空 // 设置condition队列的尾结点 lastWaiter = trail; } else// t结点的状态为CONDTION状态 // 设置trail结点 trail = t; // 设置t结点 t = next; } }
// public methods
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。 publicfinalvoidsignal(){ if (!isHeldExclusively()) // 不被当前线程独占,抛出异常 thrownew IllegalMonitorStateException(); // 保存condition队列头结点 Node first = firstWaiter; if (first != null) // 头结点不为空 // 唤醒一个等待线程 doSignal(first); }
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。 publicfinalvoidsignalAll(){ if (!isHeldExclusively()) // 不被当前线程独占,抛出异常 thrownew IllegalMonitorStateException(); // 保存condition队列头结点 Node first = firstWaiter; if (first != null) // 头结点不为空 // 唤醒所有等待线程 doSignalAll(first); }
/** * Implements uninterruptible condition wait. * <ol> * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */ // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断 publicfinalvoidawaitUninterruptibly(){ // 添加一个结点到等待队列 Node node = addConditionWaiter(); // 获取释放的状态 int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { // // 阻塞当前线程 LockSupport.park(this); if (Thread.interrupted()) // 当前线程被中断 // 设置interrupted状态 interrupted = true; } if (acquireQueued(node, savedState) || interrupted) // selfInterrupt(); }
/* * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire. */
/** Mode meaning to reinterrupt on exit from wait */ privatestaticfinalint REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ privatestaticfinalint THROW_IE = -1;
/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ privateintcheckInterruptWhileWaiting(Node node){ return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
/** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ privatevoidreportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) thrownew InterruptedException(); elseif (interruptMode == REINTERRUPT) selfInterrupt(); }
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ // // 等待,当前线程在接到信号或被中断之前一直处于等待状态 publicfinalvoidawait()throws InterruptedException { if (Thread.interrupted()) // 当前线程被中断,抛出异常 thrownew InterruptedException(); // 在wait队列上添加一个结点 Node node = addConditionWaiter(); // int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 阻塞当前线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查结点等待时的中断类型 break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 publicfinallongawaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) thrownew InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); finallong deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); }
/** * Implements absolute timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态 publicfinalbooleanawaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) thrownew InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0 publicfinalbooleanawait(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) thrownew InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); finallong deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
// support for instrumentation
/** * Returns true if this condition was created by the given * synchronization object. * * @return {@code true} if owned */ finalbooleanisOwnedBy(AbstractQueuedSynchronizer sync){ return sync == AbstractQueuedSynchronizer.this; }
/** * Queries whether any threads are waiting on this condition. * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. * * @return {@code true} if there are any waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ // 查询是否有正在等待此条件的任何线程 protectedfinalbooleanhasWaiters(){ if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) returntrue; } returnfalse; }
/** * Returns an estimate of the number of threads waiting on * this condition. * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. * * @return the estimated number of waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ // 返回正在等待此条件的线程数估计值 protectedfinalintgetWaitQueueLength(){ if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; }
/** * Returns a collection containing those threads that may be * waiting on this Condition. * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. * * @return the collection of threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ // 返回包含那些可能正在等待此条件的线程集合 protectedfinal Collection<Thread> getWaitingThreads(){ if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } }
// 当获取(资源)失败后,检查并且更新结点状态 privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ // 获取前驱结点的状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1 /* * This node has already set status asking a release * to signal it, so it can safely park. */ // 可以进行park操作 returntrue; if (ws > 0) { // 表示状态为CANCELLED,为1 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点 // 赋值pred结点的next域 pred.next = node; } else { // 为PROPAGATE -3 或者是0 表示无状态,(为CONDITION -2时,表示此节点在condition queue中) /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 比较并设置前驱结点的状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 不能进行park操作 returnfalse; }
// Skip cancelled predecessors // 保存node的前驱结点 Node pred = node.prev; while (pred.waitStatus > 0) // 找到node前驱结点中第一个状态小于0的结点,即不为CANCELLED状态的结点 node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. // 获取pred结点的下一个结点 Node predNext = pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 设置node结点的状态为CANCELLED node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { // node结点为尾结点,则设置尾结点为pred结点 // 比较并设置pred结点的next节点为null compareAndSetNext(pred, predNext, null); } else { // node结点不为尾结点,或者比较设置不成功 // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // (pred结点不为头结点,并且pred结点的状态为SIGNAL)或者 // pred结点状态小于等于0,并且比较并设置等待状态为SIGNAL成功,并且pred结点所封装的线程不为空 // 保存结点的后继 Node next = node.next; if (next != null && next.waitStatus <= 0) // 后继不为空并且后继的状态小于等于0 compareAndSetNext(pred, predNext, next); // 比较并设置pred.next = next; } else { unparkSuccessor(node); // 释放node的前一个结点 }
// 释放后继结点 privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ // 获取node结点的等待状态 int ws = node.waitStatus; if (ws < 0) // 状态值小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3 // 比较并且设置结点等待状态,设置为0 compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 获取node节点的下一个结点 Node s = node.next; if (s == null || s.waitStatus > 0) { // 下一个结点为空或者下一个节点的等待状态大于0,即为CANCELLED // s赋值为空 s = null; // 从尾结点开始从后往前开始遍历 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) // 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点 // 保存结点 s = t; } if (s != null) // 该结点不为为空,释放许可 LockSupport.unpark(s.thread); }
classProducer{ private Depot depot; publicProducer(Depot depot){ this.depot = depot; } publicvoidproduce(int no){ new Thread(new Runnable() { @Override publicvoidrun(){ depot.produce(no); } }, no + " produce thread").start(); } }
publicclassReentrantLockDemo{ publicstaticvoidmain(String[] args)throws InterruptedException { Depot depot = new Depot(500); new Producer(depot).produce(500); new Producer(depot).produce(200); new Consumer(depot).consume(500); new Consumer(depot).consume(200); } }
运行结果(可能的一种):
1 2 3 4 5 6 7 8
produce = 500, size = 500 Thread[200 produce thread,5,main] before await consume = 500, size = 0 Thread[200 consume thread,5,main] before await Thread[200 produce thread,5,main] after await produce = 200, size = 200 Thread[200 consume thread,5,main] after await consume = 200, size = 0