if (this.value == A) {
this.value = B
return true;
} else {
return false;
}
// AtomicInteger.java
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
// AtomicInteger.java
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) delta;
}
// Unsafe.java
// compareAndSwapInt(var1, var2, var5, var5 var4)其实换成 compareAndSwapInt(obj, offset, expect, update)比较清楚,意思就是如果 obj 内的 value 和 expect 相等,就证明没有其他线程改变过这个变量,那么就更新它为 update,如果这一步的 CAS 没有成功,那就采用自旋的方式继续进行 CAS 操作,取出乍一看这也是两个步骤了啊,其实在 JNI 里是借助于一个 CPU 指令完成的。所以还是原子操作。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 var4));
return var5;
}
// 该方法为本地方法,有四个参数,分别代表:对象、对象的地址、预期值、修改值
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public class ProducerConsumer {
public static void main(String[] args) {
ProducerConsumer main = new ProducerConsumer();
Queue<Integer> buffer = new LinkedList<>();
int maxSize = 5;
new Thread(main.new Producer(buffer, maxSize), "Producer1").start();
new Thread(main.new Consumer(buffer, maxSize), "Comsumer1").start();
new Thread(main.new Consumer(buffer, maxSize), "Comsumer2").start();
}
class Producer implements Runnable {
private Queue<Integer> queue;
private int maxSize;
Producer(Queue<Integer> queue, int maxSize) {
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == maxSize) {
try {
System.out.println("Queue is full");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Random random = new Random();
int i = random.nextInt();
System.out.println(Thread.currentThread().getName() " Producing value : " i);
queue.add(i);
queue.notifyAll();
}
}
}
}
class Consumer implements Runnable {
private Queue<Integer> queue;
private int maxSize;
public Consumer(Queue<Integer> queue, int maxSize) {
super();
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
try {
System.out.println("Queue is empty");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() " Consuming value : " queue.remove());
queue.notifyAll();
}
}
}
}
}