写这篇文章源于我经历过的一次生产事故,在某家公司的时候,有个服务会收集业务系统的日志,此服务的开发人员在给业务系统的sdk中就因为使用了LinkedList,又没有做并发控制,就造成了此服务经常不能正常收集到业务系统的日志(丢日志以及日志上报的线程停止运行)。看一下add()方法的源码,我们就可以知道原因了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public boolean add(E e) { linkLast(e);//调用linkLast,在队列尾部添加元素 return true; } void linkLast(E e) { final Node<E> l = last; final Node<E> newNode = new Node<>(l, e, null); last = newNode; if (l == null) first = newNode; else l.next = newNode; size++;//多线程情况下,如果业务系统没做并发控制,size的数量会远远大于实际元素的数量 modCount++; } |
demo Lesson2LinkedListThreads 展示了在多线程且没有做并发控制的环境下,size的值远远大于了队列的实际值,100个线程,每个添加1000个元素,最后实际只加进去2030个元素: List的变量size值为:88371 第2031个元素取出为null 解决方案,使用锁或者使用ConcurrentLinkedQueue、LinkedBlockingQueue等支持添加元素为原子操作的队列。 上一节我们已经分析过LinkedBlockingQueue的put等方法的源码,是使用ReentrantLock来实现的添加元素原子操作。我们再简单看一下高并发queue的add和offer()方法,方法中使用了CAS来实现的无锁的原子操作: public boolean add(E e) { return offer(e); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } } |
接下来,我们再利用高并发queue对上面的demo进行改造,大家只要改变demo中的内容,讲下面两行的注释内容颠倒,即可发现没有丢失任何的元素: public static LinkedList list = new LinkedList(); //public static ConcurrentLinkedQueue list = new ConcurrentLinkedQueue(); 再看一下高性能queue的poll()方法,才觉得NB,取元素的方法也用CAS实现了原子操作,因此在实际使用的过程中,当我们在不那么在意元素处理顺序的情况下,队列元素的消费者,完全可以是多个,不会丢任何数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } } |
关于ConcurrentLinkedQueue和LinkedBlockingQueue: 1.LinkedBlockingQueue是使用锁机制,ConcurrentLinkedQueue是使用CAS算法,虽然LinkedBlockingQueue的底层获取锁也是使用的CAS算法 2.关于取元素,ConcurrentLinkedQueue不支持阻塞去取元素,LinkedBlockingQueue支持阻塞的take()方法,如若大家需要ConcurrentLinkedQueue的消费者产生阻塞效果,需要自行实现 3.关于插入元素的性能,从字面上和代码简单的分析来看ConcurrentLinkedQueue肯定是最快的,但是这个也要看具体的测试场景,我做了两个简单的demo做测试,测试的结果如下,两个的性能差不多,但在实际的使用过程中,尤其在多cpu的服务器上,有锁和无锁的差距便体现出来了,ConcurrentLinkedQueue会比LinkedBlockingQueue快很多: demo Lesson2ConcurrentLinkedQueuePerform:在使用ConcurrentLinkedQueue的情况下100个线程循环增加的元素数为:33828193 demo Lesson2LinkedBlockingQueuePerform:在使用LinkedBlockingQueue的情况下100个线程循环增加的元素数为:33827382 from:https://www.cnblogs.com/mantu/p/5802393.html
View Details参考资料:http://blog.csdn.net/chenchaofuck1/article/details/51660521 实现一个线程安全的队列有两种实现方式:一种是使用阻塞算法,阻塞队列就是通过使用加锁的阻塞算法实现的;另一种非阻塞的实现方式则可以使用循环CAS(比较并交换)的方式来实现。 ConcurrentLinkedQueue是一个基于链表实现的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。默认情况下head节点存储的元素为空,tair节点等于head节点。 一:入队 入队主要做两件事情, 第一是将入队节点设置成当前队列的最后一个节点。 第二是更新tail节点,如果原来的tail节点的next节点不为空,则将tail更新为刚入队的节点(即队尾结点),如果原来的tail节点(插入前的tail)的next节点为空,则将入队节点设置成tail的next节点(而tial不移动,成为倒数第二个节点),所以tail节点不总是尾节点!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
public boolean offer(E e) { if (e == null) throw new NullPointerException(); //入队前,创建一个入队节点 Node</e><e> n = new Node</e><e>(e); retry: //死循环,入队不成功反复入队。 for (;;) { //创建一个指向tail节点的引用 Node</e><e> t = tail; //p用来表示队列的尾节点,默认情况下等于tail节点。 Node</e><e> p = t; //获得p节点的下一个节点。 Node</e><e> next = succ(p); //next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点 if (next != null) { //循环了两次及其以上,并且当前节点还是不等于尾节点 if (hops > HOPS && t != tail) continue retry; p = next; } //如果p是尾节点,则设置p节点的next节点为入队节点。 else if (p.casNext(null, n)) { //如果tail节点有大于等于1个next节点,则将入队节点设置成tair节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tair节点。 if (hops >= HOPS) casTail(t, n); // 更新tail节点,允许失败 return true; } // p有next节点,表示p的next节点是尾节点,则重新设置p节点 else { p = succ(p); } } } } |
二:出队 不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,则弹出head的next结点并更新head结点为原来head的next结点的next结点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
public E poll() { Node</e><e> h = head; // p表示头节点,需要出队的节点 Node</e><e> p = h; for (int hops = 0;; hops++) { // 获取p节点的元素 E item = p.getItem(); // 如果p节点的元素不为空,使用CAS设置p节点引用的元素为null,如果成功则返回p节点的元素。 if (item != null && p.casItem(item, null)) { if (hops >= HOPS) { //将p节点下一个节点设置成head节点 Node</e><e> q = p.getNext(); updateHead(h, (q != null) ? q : p); } return item; } // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。那么获取p节点的下一个节点 Node</e><e> next = succ(p); // 如果p的下一个节点也为空,说明这个队列已经空了 if (next == null) { // 更新头节点。 updateHead(h, p); break; } // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点 p = next; } return null; } |
三:非阻塞却线程安全的原因 观察入队和出队的源码可以发现,无论入队还是出队,都是在死循环中进行的,也就是说,当一个线程调用了入队、出队操作时,会尝试获取链表的tail、head结点进行插入和删除操作,而插入和删除是通过CAS操作实现的,而CAS具有原子性。故此,如果有其他任何一个线程成功执行了插入、删除都会改变tail/head结点,那么当前线程的插入和删除操作就会失败,则通过循环再次定位tail、head结点位置进行插入、删除,直到成功为止。也就是说,ConcurrentLinkedQueue的线程安全是通过其插入、删除时采取CAS操作来保证的。不会出现同一个tail结点的next指针被多个同时插入的结点所抢夺的情况出现。 from:https://www.cnblogs.com/ygj0930/p/6544543.html
View Details阻 塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列. 1.ArrayDeque, (数组双端队列) 2.PriorityQueue, (优先级队列) 3.ConcurrentLinkedQueue, (基于链表的并发队列) 4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 5.ArrayBlockingQueue, (基于数组的并发阻塞队列) 6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列) 7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列) 8.PriorityBlockingQueue, (带优先级的无界阻塞队列) 9.SynchronousQueue (并发同步阻塞队列) 阻塞队列和生产者-消费者模式 阻塞队列(Blocking queue)提供了可阻塞的put和take方法,它们与可定时的offer和poll是等价的。如果Queue已经满了,put方法会被阻塞直到有空间可用;如果Queue是空的,那么take方法会被阻塞,直到有元素可用。Queue的长度可以有限,也可以无限;无限的Queue永远不会充满,所以它的put方法永远不会阻塞。 阻塞队列支持生产者-消费者设计模式。一个生产者-消费者设计分离了“生产产品”和“消费产品”。该模式不会发现一个工作便立即处理,而是把工作置于一个任务(“to do”)清单中,以备后期处理。生产者-消费者模式简化了开发,因为它解除了生产者和消费者之间相互依赖的代码。生产者和消费者以不同的或者变化的速度生产和消费数据,生产者-消费者模式将这些活动解耦,因而简化了工作负荷的管理。 生产者-消费者设计是围绕阻塞队列展开的,生产者把数据放入队列,并使数据可用,当消费者为适当的行为做准备时会从队列中获取数据。生产者不需要知道消费者的省份或者数量,甚至根本没有消费者—它们只负责把数据放入队列。类似地,消费者也不需要知道生产者是谁,以及是谁给它们安排的工作。BlockingQueue可以使用任意数量的生产者和消费者,从而简化了生产者-消费者设计的实现。最常见的生产者-消费者设计是将线程池与工作队列相结合。 阻塞队列简化了消费者的编码,因为take会保持阻塞直到可用数据出现。如果生产者不能足够快地产生工作,让消费者忙碌起来,那么消费者只能一直等待,直到有工作可做。同时,put方法的阻塞特性也大大地简化了生产者的编码;如果使用一个有界队列,那么当队列充满的时候,生产者就会阻塞,暂不能生成更多的工作,从而给消费者时间来赶进进度。 有界队列是强大的资源管理工具,用来建立可靠的应用程序:它们遏制那些可以产生过多工作量、具有威胁的活动,从而让你的程序在面对超负荷工作时更加健壮。 虽然生产者-消费者模式可以把生产者和消费者的代码相互解耦合,但是它们的行为还是间接地通过共享队列耦合在一起了 类库中包含一些BlockingQueue的实现,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,与 LinkedList和ArrayList相似,但是却拥有比同步List更好的并发性能。PriorityBlockingQueue是一个按优先级顺序排序的队列,当你不希望按照FIFO的属性处理元素时,这个PriorityBolckingQueue是非常有用的。正如其他排序的容器一样,PriorityBlockingQueue可以比较元素本身的自然顺序(如果它们实现了Comparable),也可以使用一个 Comparator进行排序。 最后一个BlockingQueue的实现是SynchronousQueue,它根本上不是一个真正的队列,因为它不会为队列元素维护任何存储空间。不过,它维护一个排队的线程清单,这些线程等待把元素加入(enqueue)队列或者移出(dequeue)队列。因为SynchronousQueue没有存储能力,所以除非另一个线程已经准备好参与移交工作,否则put和take会一直阻止。SynchronousQueue这类队列只有在消费者充足的时候比较合适,它们总能为下一个任务作好准备。 非阻塞算法 基于锁的算法会带来一些活跃度失败的风险。如果线程在持有锁的时候因为阻塞I/O,页面错误,或其他原因发生延迟,很可能所有的线程都不能前进了。 一个线程的失败或挂起不应该影响其他线程的失败或挂起,这样的算法成为非阻塞(nonblocking)算法;如果算法的每一个步骤中都有一些线程能够继续执行,那么这样的算法称为锁自由(lock-free)算法。在线程间使用CAS进行协调,这样的算法如果能构建正确的话,它既是非阻塞的,又是锁自由的。非竞争的CAS总是能够成功,如果多个线程以一个CAS竞争,总会有一个胜出并前进。非阻塞算法堆死锁和优先级倒置有“免疫性”(但它们可能会出现饥饿和活锁,因为它们允许重进入)。 非阻塞算法通过使用低层次的并发原语,比如比较交换,取代了锁。原子变量类向用户提供了这些底层级原语,也能够当做“更佳的volatile变量”使用,同时提供了整数类和对象引用的原子化更新操作。 from:https://blog.csdn.net/u012240455/article/details/81844055
View DetailsQueue: 基本上,一个队列就是一个先入先出(FIFO)的数据结构 Queue接口与List、Set同一级别,都是继承了Collection接口。LinkedList实现了Deque接 口。 Queue的实现 1、没有实现的阻塞接口的LinkedList: 实现了java.util.Queue接口和java.util.AbstractQueue接口 内置的不阻塞队列: PriorityQueue 和 ConcurrentLinkedQueue PriorityQueue 和 ConcurrentLinkedQueue 类在 Collection Framework 中加入两个具体集合实现。 PriorityQueue 类实质上维护了一个有序列表。加入到 Queue 中的元素根据它们的天然排序(通过其 java.util.Comparable 实现)或者根据传递给构造函数的 java.util.Comparator 实现来定位。 ConcurrentLinkedQueue 是基于链接节点的、线程安全的队列。并发访问不需要同步。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大 小, ConcurrentLinkedQueue 对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。 2)实现阻塞接口的: java.util.concurrent 中加入了 BlockingQueue 接口和五个阻塞队列类。它实质上就是一种带有一点扭曲的 FIFO 数据结构。不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者元素可用。 五个队列所提供的各有不同: * ArrayBlockingQueue :一个由数组支持的有界队列。 * LinkedBlockingQueue :一个由链接节点支持的可选有界队列。 * PriorityBlockingQueue :一个由优先级堆支持的无界优先级队列。 * DelayQueue :一个由优先级堆支持的、基于时间的调度队列。 * SynchronousQueue :一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。 下表显示了jdk1.5中的阻塞队列的操作: add 增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常 remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常 element 返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常 offer 添加一个元素并返回true 如果队列已满,则返回false poll 移除并返问队列头部的元素 如果队列为空,则返回null peek 返回队列头部的元素 如果队列为空,则返回null put 添加一个元素 如果队列满,则阻塞 take 移除并返回队列头部的元素 如果队列为空,则阻塞 remove、element、offer 、poll、peek 其实是属于Queue接口。 阻塞队列的操作可以根据它们的响应方式分为以下三类:aad、removee和element操作在你试图为一个已满的队列增加元素或从空队列取得元素时 抛出异常。当然,在多线程程序中,队列在任何时间都可能变成满的或空的,所以你可能想使用offer、poll、peek方法。这些方法在无法完成任务时 只是给出一个出错示而不会抛出异常。 注意:poll和peek方法出错进返回null。因此,向队列中插入null值是不合法的 最后,我们有阻塞操作put和take。put方法在队列满时阻塞,take方法在队列空时阻塞。 LinkedBlockingQueue的容量是没有上限的(说的不准确,在不指定时容量为Integer.MAX_VALUE,不要然的话在put时怎么会受阻呢),但是也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。 ArrayBlockingQueue在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。 PriorityBlockingQueue是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元 素要具有比较能力。 DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed […]
View Details