博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
生产者消费者模式
阅读量:6713 次
发布时间:2019-06-25

本文共 6729 字,大约阅读时间需要 22 分钟。

hot3.png

生产者消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据,如果缓冲区已经满了,则生产者线程阻塞;如果缓冲区为空,那么消费者线程阻塞。

【实现方法一:synchronized、wait和notify】

public class ProducterConsumer{	public static void main(String [] args){		Storage storage = new Storage();					Thread pT = new Thread(new Producter(storage));		Thread cT = new Thread(new Consumer(storage));		pT.start();		cT.start();	}}class Product{	int id;		Product(int id){		this.id = id;	}		public String toString(){		return "P" + id;	}}class Storage {	int index = 0;	Product [] ProBox = new Product[10];		public synchronized void push(Product p){		try{			while(index == ProBox.length){				this.wait();			}				}catch (Exception ex){			ex.printStackTrace();		}		ProBox[index] = p;		index ++;					this.notifyAll();	}		public synchronized Product pop(){		try{			while(index == 0){				this.wait();			}				}catch (Exception ex){			ex.printStackTrace();		}				this.notifyAll();		index --;		return ProBox[index];	}}class Producter implements Runnable{	Storage s = null;		Producter(Storage s){		this.s = s;	}		public void run(){		for(int i = 0; i < 10; i ++){			try{				Product p = new Product(i);				s.push(p);				System.out.println("生产了:" + p);				// 根据生产速度和消费速度的关系确定休眠时间				Thread.sleep(300);			}catch (Exception ex){				ex.printStackTrace();			}		}	}}class Consumer implements Runnable{	Storage s = null;		Consumer(Storage s){		this.s = s;	}		public void run(){		for(int i = 0; i < 10; i ++){			try{				System.out.println("消费了:" + s.pop());				// 根据生产速度和消费速度的关系确定休眠时间				Thread.sleep(1000);			}catch (Exception ex){				ex.printStackTrace();			}		}	}	}

【实现方法二:lock和condition的await、signalAll】

import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * 使用Lock 和 Condition解决生产者消费者问题 * @author tangzhijing * */public class LockCondition {        public static void main(String[] args) {            Lock lock = new ReentrantLock();            Condition producerCondition = lock.newCondition();            Condition consumerCondition = lock.newCondition();            Resource2 resource = new Resource2(lock,producerCondition,consumerCondition);                        //生产者线程            ProducerThread2 producer1 = new ProducerThread2(resource);                        //消费者线程            ConsumerThread2 consumer1 = new ConsumerThread2(resource);            ConsumerThread2 consumer2 = new ConsumerThread2(resource);            ConsumerThread2 consumer3 = new ConsumerThread2(resource);                        producer1.start();            consumer1.start();            consumer2.start();            consumer3.start();        }}/** * 消费者线程 */class ConsumerThread2 extends Thread{    private Resource2 resource;    public ConsumerThread2(Resource2 resource){        this.resource = resource;        //setName("消费者");    }    public void run(){        while(true){            try {                Thread.sleep((long) (1000 * Math.random()));            } catch (InterruptedException e) {                e.printStackTrace();            }            resource.remove();        }    }}/** * 生产者线程 * @author tangzhijing * */class ProducerThread2 extends Thread{    private Resource2 resource;    public ProducerThread2(Resource2 resource){        this.resource = resource;        setName("生产者");    }    public void run(){        while(true){                try {                    Thread.sleep((long) (1000 * Math.random()));                } catch (InterruptedException e) {                    e.printStackTrace();                }                resource.add();        }    }}/** * 公共资源类 * @author tangzhijing * */class Resource2{    private int num = 0;//当前资源数量    private int size = 10;//资源池中允许存放的资源数目    private Lock lock;    private Condition producerCondition;    private Condition consumerCondition;    public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) {        this.lock = lock;        this.producerCondition = producerCondition;        this.consumerCondition = consumerCondition;     }    /**     * 向资源池中添加资源     */    public void add(){        lock.lock();        try{            if(num < size){                num++;                System.out.println(Thread.currentThread().getName() +                         "生产一件资源,当前资源池有" + num + "个");                //唤醒等待的消费者                consumerCondition.signalAll();            }else{                //让生产者线程等待                try {                    producerCondition.await();                    System.out.println(Thread.currentThread().getName() + "线程进入等待");                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }finally{            lock.unlock();        }    }    /**     * 从资源池中取走资源     */    public void remove(){        lock.lock();        try{            if(num > 0){                num--;                System.out.println("消费者" + Thread.currentThread().getName()                         + "消耗一件资源," + "当前资源池有" + num + "个");                producerCondition.signalAll();//唤醒等待的生产者            }else{                try {                    consumerCondition.await();                    System.out.println(Thread.currentThread().getName() + "线程进入等待");                } catch (InterruptedException e) {                    e.printStackTrace();                }//让消费者等待            }        }finally{            lock.unlock();        }    }    }

【实现方法三:BlockingQueue】

import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;public class ProducterConsumer{	public static void main(String [] args){		BlockingQueue
storage = new LinkedBlockingQueue
(10); Thread pT = new Thread(new Producer(storage)); Thread cT = new Thread(new Consumer(storage)); pT.start(); cT.start(); }}class Product{ int id; Product(int id){ this.id = id; } public String toString(){ return "P" + id; }}class Producer implements Runnable { BlockingQueue
s = null; Producer(BlockingQueue
s){ this.s = s; } public void run(){ for(int i = 0; i < 10; i ++){ try { Product p = new Product(i); s.put(p); System.out.println("生产了:" + p); // 根据生产速度和消费速度的关系确定休眠时间 Thread.sleep(300); } catch (Exception ex) { ex.printStackTrace(); } } }}class Consumer implements Runnable { BlockingQueue
s = null; Consumer(BlockingQueue
s){ this.s = s; } public void run() { for(int i = 0; i < 10; i ++){ try { System.out.println("消费了:" + s.take()); // 根据生产速度和消费速度的关系确定休眠时间 Thread.sleep(1000); } catch (Exception ex) { ex.printStackTrace(); } } } }

 

转载于:https://my.oschina.net/MasterLi161307040026/blog/1926651

你可能感兴趣的文章
Redis在新浪的大规模运维经验
查看>>
nginx 源码安装openssl修复Heartbleed漏洞
查看>>
Oracle IO问题解析(四)
查看>>
OSPF与MTU
查看>>
如何处理人际关系
查看>>
oracle小应用
查看>>
centos-5 yum安装nginx-mysql5.1-php5.2-fastcgi构建LNMP服务器
查看>>
监控linux系统cpu硬盘网络io等资源脚本
查看>>
如何解决Windows 7英文版操作系统显示简体中文程序乱码的问题
查看>>
当“大系统”遇上“小末梢”
查看>>
go 交换两个变量的值 忒带劲了
查看>>
China官方NTP server
查看>>
JVM调优总结
查看>>
Linux 6下yum方式安装配置LAMP平台
查看>>
OpenCASCADE Coordinate Transforms
查看>>
loadrunner安装
查看>>
pt-query-digest查询日志分析工具
查看>>
张明贵-Linux基础命令学习-5
查看>>
模拟Linux开机故障与解决方案
查看>>
三大范式和五大约束
查看>>