Java并发包学习
本文为《分布式Java应用于实践》笔记。
一、集合包
1. ArrayList
- 默认构造为实例化一个Object类型的数组
- add时数组已满:Arrays.copyOf
- add插入指定位置:index后对象逐个往后复制
- remove删除:对象非null则使用equals,删除使用fastRemove方法,将index后对象逐个往前复制
- iterator遍历:先比较创建时的modCount与现有modCount,如不同抛出异常,相同则用get获取
- contains:通过遍历实现
2. LinkedArrayList
- 基于双向链表实现:先生成Entry赋值给header,然后header的pre与next指向自身,形成闭环
- add:创建Entry对象,next指向header,pre指向header的pre
- get:选择从头开始遍历还是从未到头遍历
- iterator遍历:可以调用hasPrevious
- contains:通过遍历实现
3. Vector
- 容量策略更可控:基于capacityIncrement
- 线程安全:add,remove,get等加上了synchronized关键字
4. Stack
- 基于Vector实现
- peek方法:先获取数组大小,再获取最后一个元素
5. HashSet
- 基于HashMap实现
- 不支持get方法获取指定位置元素
6. TreeSet
- 支持排序,传入Comparator
7. HashMap
- put:如果key为null,基于Entry的next方法遍历查找;如果不为null,先取得key的HashCode,然后做Hash操作,再与Entry数组-1的值按位与得到位置
- 哈希冲突:不同的key找到相同的存储位置——链地址法。
- get:寻找hash值相等,且key满足equals的Entry
8. TreeMap
- put:基于红黑树实现,必须要有key比较的方法
- 先判断root是否为null,如是则创建一个Entry赋值给root,然后根据Comparator基于红黑树遍历
二、并发包
1. ConcurrentHashMap
- put:根据Hash值计算并获取Segment对象,先进行lock操作,判断是否扩容,然后进行与HashMap基本相同的操作,完成之后释放锁
- 默认分为16段,各自都持有锁
- get:读数据时,仅在value为null时才加锁
- 实现读取不加锁:HashEntry对象数组对应的变量是volatile的,hash、key以及next属性都是final的
- size:不加锁情况下遍历所有段,读取volatile的count和modCount,统计完毕后再遍历,比较modCount是否有改变。如果执行三次还有问题,那么分段加锁计算
2. CopyOnWriteArrayList
- add:使用ReetrantLock保证线程安全,每次都创建一个新的Object数组,将新增加的对象放入数组末尾后,引用切换
- remove:创建一个比当前数组小1的数组,没有使用Arrays.copyOf
- get:没有加锁保护,可能读脏数据
- iterator:仅对快照数组进行遍历
- 小结:增删性能较差
3. ArrayBlockingQueue
- 构造器:传入参数即为初始化数组的大小,同时初始化锁和两个Condition,notEmpty和notFull
- offer:插入时加锁,如数组已满,进入等待。如等待超时,返回false,未超时则调用notFull条件的await进行等待,唤醒后继续判断数组是否已满。
- poll:调用notEMpty条件的await进行等待
- iterator:调用next方法时先进行加锁
4. LinkedBlockingQueue
- put/offer一把锁,take/poll一把锁,避免读写时相互竞争锁
5. Atomic原子类
- incrementAndGet:关键方法为compareAndSet,调用unsafe的compareAndSwapInt方法(native方法),基于CPU的CAS元语实现
6. ThreadPoolExecutor
execute:如当前线程数小于corePoolSize,调用addIfUnderCorePoolSize方法:
- 先对mainLock加锁
- 然后addThread增加线程
- addThread方法创建Worker对象,调用threadFactory创建线程,放入Workers中
- 增加线程数,释放mainLock,完成方法执行
启动新线程执行Runnable任务,Worker的Run方法先执行Runnable任务,执行完毕后Task后,通过WorkQueue的poll或者take方法获取新的Task
- 线程池配置:
- 高性能:SynchronousQueue作为任务缓冲队列,不在队列缓冲,直接开线程执行
- 缓冲执行:使用LinkedBlockingQueue,可以使Runnable任务尽量被corePoolSize范围的线程执行掉
7. Executors
- newFixedThreadPool(int):固定大小线程池
- newSingleThreadExecutor:单线程执行线程池
- newCachedThreadPool:缓存队列使用SynchronousQueue,直到启动线程数量达到整形最大值,报异常
- newScheduledThreadPool
8. FutureTask
- run:调用Sync的innerRun方法实现,基于CAS将state从0设置为RUNNING。获取当前线程,如果是RUNNING,调用innerSet,首先获取STATE,如果是RAN(已经执行)则返回,CANCELLED则调用releaseShared,最后将runner属性设置为null
- get:若STATE为RAN或者CANCELLED且runner为null,则进入后续步骤,否则等待。后续无异常,则返回result
- cancel:调用Sync的innerCancel,基于CAS将state设置为CANCELLED
9. 其他并发工具
- Semaphore信号量:通过CAS操作减,返回剩余个数大于0则完成tryAcquire
- CountDownLatch:减计数到0后才执行
- CyclicBarrier:与CountDownLatch相反,增加到一定计数后执行
- ReetrantLock:CAS操作,可重入。如果未获得锁,则用unsafe.park()方法挂起
- ReetrantReadWriteLock:升级降级机制:
- 持有读锁后,不能直接加写锁的lock,“读锁不可升级”
- 持有写锁,可调用读锁的lock,写锁unlock后,当前锁降级为读锁