生产者消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据,如果缓冲区已经满了,则生产者线程阻塞;如果缓冲区为空,那么消费者线程阻塞。
【实现方法一:synchronized、wait和notify】
public class ProducterConsumer{ public static void main(String [] args){ Storage storage = new Storage(); Thread pT = new Thread(new Producter(storage)); Thread cT = new Thread(new Consumer(storage)); pT.start(); cT.start(); }}class Product{ int id; Product(int id){ this.id = id; } public String toString(){ return "P" + id; }}class Storage { int index = 0; Product [] ProBox = new Product[10]; public synchronized void push(Product p){ try{ while(index == ProBox.length){ this.wait(); } }catch (Exception ex){ ex.printStackTrace(); } ProBox[index] = p; index ++; this.notifyAll(); } public synchronized Product pop(){ try{ while(index == 0){ this.wait(); } }catch (Exception ex){ ex.printStackTrace(); } this.notifyAll(); index --; return ProBox[index]; }}class Producter implements Runnable{ Storage s = null; Producter(Storage s){ this.s = s; } public void run(){ for(int i = 0; i < 10; i ++){ try{ Product p = new Product(i); s.push(p); System.out.println("生产了:" + p); // 根据生产速度和消费速度的关系确定休眠时间 Thread.sleep(300); }catch (Exception ex){ ex.printStackTrace(); } } }}class Consumer implements Runnable{ Storage s = null; Consumer(Storage s){ this.s = s; } public void run(){ for(int i = 0; i < 10; i ++){ try{ System.out.println("消费了:" + s.pop()); // 根据生产速度和消费速度的关系确定休眠时间 Thread.sleep(1000); }catch (Exception ex){ ex.printStackTrace(); } } } }
【实现方法二:lock和condition的await、signalAll】
import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * 使用Lock 和 Condition解决生产者消费者问题 * @author tangzhijing * */public class LockCondition { public static void main(String[] args) { Lock lock = new ReentrantLock(); Condition producerCondition = lock.newCondition(); Condition consumerCondition = lock.newCondition(); Resource2 resource = new Resource2(lock,producerCondition,consumerCondition); //生产者线程 ProducerThread2 producer1 = new ProducerThread2(resource); //消费者线程 ConsumerThread2 consumer1 = new ConsumerThread2(resource); ConsumerThread2 consumer2 = new ConsumerThread2(resource); ConsumerThread2 consumer3 = new ConsumerThread2(resource); producer1.start(); consumer1.start(); consumer2.start(); consumer3.start(); }}/** * 消费者线程 */class ConsumerThread2 extends Thread{ private Resource2 resource; public ConsumerThread2(Resource2 resource){ this.resource = resource; //setName("消费者"); } public void run(){ while(true){ try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } resource.remove(); } }}/** * 生产者线程 * @author tangzhijing * */class ProducerThread2 extends Thread{ private Resource2 resource; public ProducerThread2(Resource2 resource){ this.resource = resource; setName("生产者"); } public void run(){ while(true){ try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } resource.add(); } }}/** * 公共资源类 * @author tangzhijing * */class Resource2{ private int num = 0;//当前资源数量 private int size = 10;//资源池中允许存放的资源数目 private Lock lock; private Condition producerCondition; private Condition consumerCondition; public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) { this.lock = lock; this.producerCondition = producerCondition; this.consumerCondition = consumerCondition; } /** * 向资源池中添加资源 */ public void add(){ lock.lock(); try{ if(num < size){ num++; System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个"); //唤醒等待的消费者 consumerCondition.signalAll(); }else{ //让生产者线程等待 try { producerCondition.await(); System.out.println(Thread.currentThread().getName() + "线程进入等待"); } catch (InterruptedException e) { e.printStackTrace(); } } }finally{ lock.unlock(); } } /** * 从资源池中取走资源 */ public void remove(){ lock.lock(); try{ if(num > 0){ num--; System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前资源池有" + num + "个"); producerCondition.signalAll();//唤醒等待的生产者 }else{ try { consumerCondition.await(); System.out.println(Thread.currentThread().getName() + "线程进入等待"); } catch (InterruptedException e) { e.printStackTrace(); }//让消费者等待 } }finally{ lock.unlock(); } } }
【实现方法三:BlockingQueue】
import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;public class ProducterConsumer{ public static void main(String [] args){ BlockingQueuestorage = new LinkedBlockingQueue (10); Thread pT = new Thread(new Producer(storage)); Thread cT = new Thread(new Consumer(storage)); pT.start(); cT.start(); }}class Product{ int id; Product(int id){ this.id = id; } public String toString(){ return "P" + id; }}class Producer implements Runnable { BlockingQueue s = null; Producer(BlockingQueue s){ this.s = s; } public void run(){ for(int i = 0; i < 10; i ++){ try { Product p = new Product(i); s.put(p); System.out.println("生产了:" + p); // 根据生产速度和消费速度的关系确定休眠时间 Thread.sleep(300); } catch (Exception ex) { ex.printStackTrace(); } } }}class Consumer implements Runnable { BlockingQueue s = null; Consumer(BlockingQueue s){ this.s = s; } public void run() { for(int i = 0; i < 10; i ++){ try { System.out.println("消费了:" + s.take()); // 根据生产速度和消费速度的关系确定休眠时间 Thread.sleep(1000); } catch (Exception ex) { ex.printStackTrace(); } } } }