Positive example 1:
//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
Positive example 2:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown();//gracefully shutdown
Positive example 3:
<bean id="userThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />
<property name="threadFactory" value= threadFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>
//in code
userThreadPool.execute(thread);ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) corePoolSize 核心线程数 会一直存在,除非allowCoreThreadTimeOut设置为true maximumPoolSize 线程池最大线程数 keepAliveTime:除了核心线程数外的线程 如果没有任务多久释放。 unit:超时时间的单位 workQueue:工作队列,保存未执行的Runnable 任务 threadFactory:创建线程的工厂类 handler:当线程已满,工作队列也满了的时候,会被调用。被用来实现各种拒绝策略。
3.5.1.默认直接拒绝抛出ThreadPoolExecutor.AbortPolicy RejectedExecutionException
3.5.2.直接不处理ThreadPoolExecutor.DiscardPolicy()
3.5.3.把加入队列最早的任务删除。ThreadPoolExecutor.DiscardOldestPolicy()
3.5.4.让调用线程池的任务去处理。ThreadPoolExecutor.CallerRunsPolicy()
自定义拒绝策略 实现RejectedExecutionHandler接口,实现抽象方法rejectedExecution方法。 当引用自定义拒绝策略时会初始化自定义拒绝策略类的构造方法。 当线程堵塞触发拒绝策略时会执行rejectedExecution方法。 这几种拒绝策略都是静态内部类实现RejectedExecutionHandler接口。当我们要向队列中添加一个元素时,我们需要调用put()方法。该方法将阻塞,直到其他某个线程调用take()方法,表明它已准备好获取一个元素。
SynchronousQueue并不是真正的队列,而是一种管理直接在线程之间移交信息的机制,但我们应该将其视为两个线程之间单个元素的交换点,其中一个线程正在传递一个元素,另一个线程正在获取该元素。
ThreadLocal<Integer> threadLocalValue = new ThreadLocal<>(); threadLocalValue.set(1); Integer result = threadLocalValue.get();
Lock是一个接口,接口的实现类ReentrantLock, ReentrantReadWriteLock.ReadLock,ReentrantReadWriteLock.WriteLock
lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。
使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。
tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回,在拿不到锁时也不会一直在那等待。
JUC包中的原子操作类可以分为4类。 1、基本类型: AtomicInteger, AtomicLong, AtomicBoolean ; 2、数组类型: AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray ; 3、引用类型: AtomicReference, AtomicStampedRerence, AtomicMarkableReference ; 4、对象的属性修改类型: AtomicIntegerFieldUpdater,AtomicLongFieldUpdater, AtomicReferenceFieldUpdater 。
import java.util.concurrent.CountDownLatch;
public class ThreadRunnableDemo implements Runnable{
private CountDownLatch downLatch;
public ThreadRunnableDemo(CountDownLatch downLatch) {
this.downLatch = downLatch;
}
@Override
public void run() {
System.out.printf("Thread %s start\t",Thread.currentThread().getName());
try {
Thread.sleep(300);
System.out.printf("Thread %s stop\n",Thread.currentThread().getName());
downLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}package com.jy.lejutaobao.testDemo;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch downLatch=new CountDownLatch(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
for(int i=0; i < 3; i ) {
executor.submit(new ThreadRunnableDemo(downLatch));
}
try {
downLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
System.out.println("都执行完了.");
}
}
CountDownLatch: 一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行。 CyclicBrrier: N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCyclicBarrierExample1 {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(5);
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i ) {
final int threadNum = i;
executor.execute(() -> {
try {
System.out.print("线程 = " threadNum " 开始 \t");
if((threadNum 1) % 5==0){
System.out.println("\n");
}
Thread.sleep(1000 threadNum);
System.out.print("线程 = " threadNum " 已完成\t");
if((threadNum 1) % 5==0){
System.out.println("\n");
}
barrier.await();
} catch (Exception e) {
}
});
}
executor.shutdown();
}
}
线程池分批执行,一批一批执行. 五个一批 Semaphore semaphore = new Semaphore(5); 获取信号量 semaphore.acquire(); 释放信号量 semaphore.release();
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class TestSemaphoreDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Semaphore semaphore = new Semaphore(5);
for(int i = 0;i<20;i ){
int finalI = i;
executorService.execute(new Runnable() {
@Override
public void run() {
try {
/*获取信号量*/
semaphore.acquire();
System.out.println("Thread = " finalI " 获取acquire");
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.release();
System.out.println("Thread = " finalI " 释放release");
}
});
}
}
}Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}