layout: post title: "java并发包" subtitle: " "1.解决了什么问题? 2.怎么解决的?"" date: 2018-10-07 07:00:00 author: "青乡" header-img: "img/post-bg-2015.jpg" catalog: true tags: - DataStructure&Algorithm - multi thread
解决了什么问题
普通同步的性能问题。
如何解决
可重入锁。
为什么
可重入锁为什么可以提高性能?
类
并发List
写时复制ArrayList。
有没有LinkedList?为什么没有?
并发Set
底层实现 和并发List一样。
怎么保证数据不重复? 1.非并发Set,即TreeSet、HashSet,都是使用hash。来确保数据不重复。 怎么确保?就是插入的时候,key的hash值一样,就不插入了。
2.并发Set没有使用hash,而是遍历比较,值一样,就不插入了。见下面源码。
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot); //不存在,才插入数据
}
private static int indexOf(Object o, Object[] elements,
int index, int fence) {
if (o == null) {
for (int i = index; i < fence; i++)
if (elements[i] == null)
return i;
} else {
for (int i = index; i < fence; i++) //遍历比较数据
if (o.equals(elements[i]))
return i;
}
return -1;
}
复制代码
并发Map
底层原理 1.映射包含段Segment数组 2.每个段Segment包含了映射数据键值对数组
这样的优点是,每个Segment都有锁,锁的粒度更细,所以并发性能更高。
源码-jdk7-ConcurrentHashMap
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j); //段数组:没有数据,就插入
return s.put(key, hash, value, false); //键值对数组:插入数据
}
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) //插入Segment到Segment数组
break;
}
}
}
return seg;
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node); //插入键值对数据到键值对数组Segment
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
复制代码
并发包的作用
1.简单同步
下面2种情况,可以通过简单同步解决。
1)1个操作,包含多个指令
解决的是一个操作,但是这个操作包含了多条计算机指令。
这些指令执行时间不确定,导致多个线程会出现问题。
2)多个操作,但是都在1个方法里
2.并发包
还有一种情况,就是有的代码,包含了多个同步方法。
虽然这些方法每个都是同步的,但是可能在执行1个方法的时候改了数据(比如,数组删除一个元素,数组大小10改为9),另外一个方法读了数据,直接导致数组越界(数组元素个数还是以前的10)。
并发包就是要解决这个问题。
具体怎么解决?实现原理和底层实现?
队列
1.普通队列
数组
链表
2.并发包
只有链表实现ConcurrentLinkedQueue。为什么没有数组实现ConcurrentArrayQueue?
为什么没有数组实现ConcurrentArrayQueue?
普通队列的实现原理和底层实现?
1.数组
1)数组
存放数据。
2)数组索引指针,不是真的索引
用于指向队列的头和尾。
2.链表
节点类,包含
1)数据
2)对象指针
指向下一个节点。
并发包-链表队列的底层实现和实现原理?
阻塞队列
什么是阻塞队列?
1.队列
先进先出。
2.阻塞
一直卡在那里。
3.阻塞队列
有个线程,一直读阻塞队列里的数据,
1)来一个数据,就读一个数据
2)不来,就阻塞在那里,等待下一个订单数据的到来
上面是消费者的角度看。
如果从生产者的角度看,
1)一直往队列里写数据
2)没写满,就一直写;
写满了,就阻塞,直到有数据被消费,腾出了一个新的空间
注:非阻塞队列,即普通队列Queue。
1.没有数据,就返回null。
2.数据满了,就插不进去了。
什么是生产者-消费者模式?
只是一个设计模式而已。具体包括以下几个部分,
1.数据从哪里读?
有个定时任务去扫库,作用是读数据。
读数据的代码,就叫做生产者。
2.数据放在哪里?
阻塞队列。
3.数据写到哪里?
写数据的代码,就叫做消费者。
阻塞队列底层原理?怎么才能实现阻塞?
1.阻塞当前线程,比如生产者
如果集合已满,当前数据就不添加到集合,并且当前线程(即生产者)的代码就阻止在这里了。直到被其他线程(即消费者线程)唤醒,才继续接着执行接下来的代码,具体代码就是添加数据到集合,因为现在集合不是满的,刚刚被消费者线程消费了一个数据。
//jdk7-ArrayBlockingQueue
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) //如果集合已满,那么当前线程(即生产者线程)等待
notFull.await();
insert(e); //直到消费者线程消费了一个数据,并且唤醒生产者线程,才接着执行这行代码,即插入数据。 //注:另外一个需要注意的地方是,只要是让锁持有的线程等待的话,它都是循环调用,而不是只调用一次。jdk7-Thread.join()方法也是循环调用wait,让锁持有线程(即父线程)等待。
} finally {
lock.unlock();
}
}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal(); //生产者线程生产一个数据之后,唤醒消费者线程,如果确实有消费者线程正在等待,那么消费者线程被唤醒之后继续工作即消费数据。
}
复制代码
2.阻塞当前线程,比如消费者
同理。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) //消费者线程发现集合里没有数据,就一直等等
notEmpty.await();
return extract(); //直到生产者线程插入数据,并且唤醒了消费者线程
} finally {
lock.unlock();
}
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal(); //消费者线程消费一个数据之后,唤醒生产者线程,如果确实有生产者线程正在等待,那么生产者线程被唤醒之后继续工作即插入数据到集合。
return x;
}
复制代码
总结 所以阻塞是怎么实现的?就是通过wait/notify机制实现的!
参考 blog.csdn.net/u010412719/… webcache.googleusercontent.com/search?q=ca…
Condition?
作用
和Object.wait/notify一样,
1.让当前线程等待
2.唤醒其他等待线程
那与Object.wait/notify的区别?
优点,可以定向通知。而Object要么随机通知要么通知所有。
注:显式锁/可重入锁RetrantLock,有可以控制锁的粒度,等等优点。
怎么使用?
Object
1.锁对象 //普通的对象 2.线程调用锁对象的wait/notify,从而让当前线程等待,或唤醒其他线程
Condition 1.锁对象 //可重入锁RetrantLock
Condition必须与RetrantLock结合使用,是一对。 2.线程调用Condition.wait/notify,从而让当前线程等待,或唤醒其他线程 具体是在显式锁之间的代码块里调用Condition.wait/notify。例子,见jdk7-ArrayBlockingQueue.put()。
使用哪一种阻塞队列?
有2种,
1.数组
2.链表 JDK线程池ThreadPoolExecutor使用的是LinkedBlockingQueue,用来放任务队列。为什么要用链表队列?因为线程池大小不固定。哪怕一开始设置了线程池大小,后面也可以改变大小。数组的优点是读数据快,但是大小固定,插入/删除慢,而链表的优缺点刚好相反,所以当实际应用场景选择哪一种数据结构更好的时候,只需要根据优缺点以及应用场景需要哪一种优缺点,就可以决定使用哪一种数据结构了。
二者的关系?
应用场景?
1.支付系统
订单的生产者-消费者。
使用的是哪一个实现类?数组还是链表?
因为不需要从中间插入和删除数据,所以是数组。
上面说的1的这种情况是错误的,支付系统不是使用阻塞队列,消费者不是一直等待处理数据,服务器客户端程序的服务器才是使用的阻塞队列,但是支付系统的订单不是,因为订单是使用定时任务去扫描生产者放入队列的数据,使用的队列只是普通队列,而不是阻塞队列,没必要阻塞,也不是阻塞的应用场景。
具体是使用的并发链表队列ConcurrentLinkedQueue。
并发包为什么没有并发数组队列?但是阻塞队列是有数组和链表2种实现。
2.服务器客户端程序
服务器是消费者,处理请求。
客户端是生产者,生产请求。
3.其他的任何处理数据和生产数据,都可以看作是生产者-消费者模式
底层原理和底层实现?
数组阻塞队列和链表阻塞队列的区别?
1.数组
读数据。
2.链表
从中间插入和删除。
阻塞队列与生产者-消费者设计模式的关系?
应用场景
1.支付系统-订单 并发链表队列 ConcurrentLinkedQueue
2.订单 //以下可能用到并发map ConcurrentHashMap 3.通道 4.订单和通道
线程池
原子操作
总结
一、同步集合 集合 1.使用显式锁,粒度更细
jdk源码-CopyOnWriteArrayList
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
复制代码
为什么更快?
1.读更快,因为读没有同步,可以多个线程同时读数据。
2.写使用显示锁进行同步,速度和隐式锁一样。
那为什么还要这么弄呢?因为CopyOnWrite的应用场景是读多写少,在实际应用场景当中也有很多这种需求——允许读的数据有少量误差,对数据的实时性要求没有那么高。
3.那数据会不会错误?会。但是旧的同步集合和新的并发集合有一点区别,
1)旧的同步集合
并发读和写,会报错-并发读写错误。
2)新的并发集合
每次当写数据的时候,数据复制了一份出来,写的时候是往新的数组里写,读线程读的数据是旧的数组,所以两者互不影响,代码也不会报错-并发读写错误。
www.cnblogs.com/chengxiao/p…
juejin.im/post/5aaa2b…
应用场景? 工作中用的比较少。
映射 底层实现 1.Segment每个都有锁对象 2.每个版本的jdk实现都有很大的改变
jdk源码-ConcurrentHashMap
//怎么插入数据?比较和交换CAS
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
复制代码
二、线程池
应用场景 定时器扫库
三、显式锁 ReentrantLock
作用 粒度更细