「java生产消费模型实现」Java实现生产者消费者
本篇文章给大家谈谈java生产消费模型实现,以及Java实现生产者消费者对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、JAVA模拟生产者与消费者实例
- 2、java使用哪些类实现生产消费者模型
- 3、什么是阻塞队列?如何使用阻塞队列来实现生产者-消费者模型?
- 4、如何在 Java 中正确使用 wait,notify 和 notifyAll
JAVA模拟生产者与消费者实例
使用的生产者和消费者模型具有如下特点:
(1)本实验的多个缓冲区不是环形循环的,也不要求按顺序访问。生产者可以把产品放到目前某一个空缓冲区中。
(2)消费者只消费指定生产者的产品。
(3)在测试用例文件中指定了所有的生产和消费的需求,只有当共享缓冲区的数据满足了所有关于它的消费需求后,此共享缓冲区才可以作为空闲空间允许新的生产者使用。
(4)本实验在为生产者分配缓冲区时各生产者间必须互斥,此后各个生产者的具体生产活动可以并发。而消费者之间只有在对同一产品进行消费时才需要互斥,同时它们在消费过程结束时需要判断该消费对象是否已经消费完毕并清除该产品。
Windows
用来实现同步和互斥的实体。在Windows
中,常见的同步对象有:信号量(Semaphore)、
互斥量(Mutex)、临界段(CriticalSection)和事件(Event)等。本程序中用到了前三个。使用这些对象都分
为三个步骤,一是创建或者初始化:接着请求该同步对象,随即进入临界区,这一步对应于互斥量的
上锁;最后释放该同步对象,这对应于互斥量的解锁。这些同步对象在一个线程中创建,在其他线程
中都可以使用,从而实现同步互斥。当然,在进程间使用这些同步对象实现同步的方法是类似的。
1.用锁操作原语实现互斥
为解决进程互斥进人临界区的问题,可为每类临界区设置一把锁,该锁有打开和关闭两种状态,进程执行临界区程序的操作按下列步骤进行:
①关锁。先检查锁的状态,如为关闭状态,则等待其打开;如已打开了,则将其关闭,继续执行步骤②的操作。
②执行临界区程序。
③开锁。将锁打开,退出临界区。
2.信号量及WAIT,SIGNAL操作原语
信号量的初值可以由系统根据资源情况和使用需要来确定。在初始条件下信号量的指针项可以置为0,表示队列为空。信号量在使用过程中它的值是可变的,但只能由WAIT,SIGNAL操作来改变。设信号量为S,对S的WAIT操作记为WAIT(S),对它的SIGNAL操作记为SIGNAL(S)。
WAIT(S):顺序执行以下两个动作:
①信号量的值减1,即S=S-1;
②如果S≥0,则该进程继续执行;
如果
S(0,则把该进程的状态置为阻塞态,把相应的WAITCB连人该信号量队列的末尾,并放弃处理机,进行等待(直至其它进程在S上执行SIGNAL操作,把它释放出来为止)。
SIGNAL(S):顺序执行以下两个动作
①S值加
1,即
S=S+1;
②如果S)0,则该进程继续运行;
如果S(0则释放信号量队列上的第一个PCB(既信号量指针项所指向的PCB)所对应的进程(把阻塞态改为就绪态),执行SIGNAL操作的进程继续运行。
在具体实现时注意,WAIT,SIGNAL操作都应作为一个整体实施,不允许分割或相互穿插执行。也就是说,WAIT,SIGNAL操作各自都好像对应一条指令,需要不间断地做下去,否则会造成混乱。
从物理概念上讲,信号量S)时,S值表示可用资源的数量。执行一次WAIT操作意味着请求分配一个单位资源,因此S值减1;当S0时,表示已无可用资源,请求者必须等待别的进程释放了该类资源,它才能运行下去。所以它要排队。而执行一次SIGNAL操作意味着释放一个单位资源,因此S值加1;若S(0时,表示有某些进程正在等待该资源,因而要把队列头上的进程唤醒,释放资源的进程总是可以运行下去的。
---------------
/**
*
生产者
*
*/
public
class
Producer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Producer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
empty.p();
mutex.p();
System.out.println(name+"
inserts
a
new
product
into
"+buf.nextEmptyIndex);
buf.nextEmptyIndex
=
(buf.nextEmptyIndex+1)%buf.size;
mutex.v();
full.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
---------------
/**
*
消费者
*
*/
public
class
Customer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Customer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
full.p();
mutex.p();
System.out.println(name+"
gets
a
product
from
"+buf.nextFullIndex);
buf.nextFullIndex
=
(buf.nextFullIndex+1)%buf.size;
mutex.v();
empty.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
-------------------------
/**
*
缓冲区
*
*/
public
class
Buffer{
public
Buffer(int
size,int
nextEmpty,int
nextFull){
this.nextEmptyIndex
=
nextEmpty;
this.nextFullIndex
=
nextFull;
this.size
=
size;
}
public
int
size;
public
int
nextEmptyIndex;
public
int
nextFullIndex;
}
-----------------
/**
*
此类用来模拟信号量
*
*/
public
class
Semaphore{
private
int
semValue;
public
Semaphore(int
semValue){
this.semValue
=
semValue;
}
public
synchronized
void
p(){
semValue--;
if(semValue0){
try
{
this.wait();
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
public
synchronized
void
v(){
semValue++;
if(semValue=0){
this.notify();
}
}
}
------------------------
public
class
Test
extends
Thread
{
public
static
void
main(String[]
args)
{
Buffer
bf=new
Buffer(10,0,0);
Semaphore
mutex=new
Semaphore(1);
Semaphore
full=new
Semaphore(0);
Semaphore
empty=new
Semaphore(10);
//new
Thread(new
Producer("p001",mutex,full,empty,bf)).start();
Producer
p=new
Producer("p001",mutex,full,empty,bf);
new
Thread(new
Producer("p002",mutex,full,empty,bf)).start();
new
Thread(new
Producer("p003",mutex,full,empty,bf)).start();
new
Thread(new
Producer("p004",mutex,full,empty,bf)).start();
new
Thread(new
Producer("p005",mutex,full,empty,bf)).start();
try{
sleep(3000);
}
catch(Exception
ex)
{
ex.printStackTrace();
}
new
Thread(new
Customer("c001",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c002",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c003",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c004",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c005",mutex,full,empty,bf)).start();
}
}
--------------------------------------------
java使用哪些类实现生产消费者模型
span style="font-size:18px;"span style="font-size:18px;"package com.ft.com.nextxteam.test;
import java.util.LinkedList;
/**
* 使用wait/notify处理消费者问题
* Created by drjr on 16-9-19.
*/
public class TestProductConm {
public static class Storage {
// 仓库最大存储量
private final int MAX_SIZE = 100;
// 仓库存储的载体
private LinkedListObject list = new LinkedListObject();
// 生产num个产品
public void produce(int num) {
// 同步代码段
synchronized (list) {
// 如果仓库剩余容量不足
while (list.size() + num MAX_SIZE) {
System.out.println("【要生产的产品数量】:" + num + "/t【库存量】:"
+ list.size() + "/t暂时不能执行生产任务!");
try {
// 由于条件不满足,生产阻塞
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生产条件满足情况下,生产num个产品
for (int i = 1; i = num; ++i) {
list.add(new Object());
}
System.out.println("【已经生产产品数】:" + num + "/t【现仓储量为】:" + list.size());
list.notifyAll();
}
}
// 消费num个产品
public void consume(int num) {
// 同步代码段
synchronized (list) {
// 如果仓库存储量不足
while (list.size() num) {
System.out.println("【要消费的产品数量】:" + num + "/t【库存量】:"
+ list.size() + "/t暂时不能执行生产任务!");
try {
// 由于条件不满足,消费阻塞
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费条件满足情况下,消费num个产品
for (int i = 1; i = num; ++i) {
list.remove();
}
System.out.println("【已经消费产品数】:" + num + "/t【现仓储量为】:" + list.size());
list.notifyAll();
}
}
// get/set方法
public LinkedListObject getList() {
return list;
}
public void setList(LinkedListObject list) {
this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}
public static class Producer extends Thread {
// 每次生产的产品数量
private int num;
// 所在放置的仓库
private Storage storage;
// 构造函数,设置仓库
public Producer(Storage storage) {
this.storage = storage;
}
// 线程run函数
public void run() {
produce(num);
}
// 调用仓库Storage的生产函数
public void produce(int num) {
storage.produce(num);
}
// get/set方法
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Storage getStorage() {
return storage;
}
public void setStorage(Storage storage) {
this.storage = storage;
}
}
public static class Consumer extends Thread {
// 每次消费的产品数量
private int num;
// 所在放置的仓库
private Storage storage;
// 构造函数,设置仓库
public Consumer(Storage storage) {
this.storage = storage;
}
// 线程run函数
public void run() {
consume(num);
}
// 调用仓库Storage的生产函数
public void consume(int num) {
storage.consume(num);
}
// get/set方法
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Storage getStorage() {
return storage;
}
public void setStorage(Storage storage) {
this.storage = storage;
}
}
public static void main(String[] args) {
// 仓库对象
Storage storage = new Storage();
// 生产者对象
Producer p1 = new Producer(storage);
Producer p2 = new Producer(storage);
Producer p3 = new Producer(storage);
Producer p4 = new Producer(storage);
Producer p5 = new Producer(storage);
Producer p6 = new Producer(storage);
Producer p7 = new Producer(storage);
// 消费者对象
Consumer c1 = new Consumer(storage);
Consumer c2 = new Consumer(storage);
Consumer c3 = new Consumer(storage);
// 设置生产者产品生产数量
p1.setNum(10);
p2.setNum(10);
p3.setNum(10);
p4.setNum(10);
p5.setNum(10);
p6.setNum(10);
p7.setNum(80);
// 设置消费者产品消费数量
c1.setNum(50);
c2.setNum(20);
c3.setNum(30);
// 线程开始执行
c1.start();
c2.start();
c3.start();
p1.start();
p2.start();
p3.start();
p4.start();
p5.start();
p6.start();
p7.start();
}
}/span/span
什么是阻塞队列?如何使用阻塞队列来实现生产者-消费者模型?
解释:
在enqueue和dequeue方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。
阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。
如何在 Java 中正确使用 wait,notify 和 notifyAll
wait, notify 和 notifyAll,这些在多线程中被经常用到的保留关键字,在实际开发的时候很多时候却并没有被大家重视。本文对这些关键字的使用进行了描述。
在 Java 中可以用 wait、notify 和 notifyAll
来实现线程间的通信。。举个例子,如果你的Java程序中有两个线程——即生产者和消费者,那么生产者可以通知消费者,让消费者开始消耗数据,因为队列缓
冲区中有内容待消费(不为空)。相应的,消费者可以通知生产者可以开始生成更多的数据,因为当它消耗掉某些数据后缓冲区不再为满。
我们可以利用wait()来让一个线程在某些条件下暂停运行。例如,在生产者消费者模型中,生产者线程在缓冲区为满的时候,消费者在缓冲区为空的时
候,都应该暂停运行。如果某些线程在等待某些条件触发,那当那些条件为真时,你可以用 notify 和 notifyAll
来通知那些等待中的线程重新开始运行。不同之处在于,notify 仅仅通知一个线程,并且我们不知道哪个线程会收到通知,然而 notifyAll
会通知所有等待中的线程。换言之,如果只有一个线程在等待一个信号灯,notify和notifyAll都会通知到这个线程。但如果多个线程在等待这个信
号灯,那么notify只会通知到其中一个,而其它线程并不会收到任何通知,而notifyAll会唤醒所有等待中的线程。
在这篇文章中你将会学到如何使用 wait、notify 和 notifyAll 来实现线程间的通信,从而解决生产者消费者问题。如果你想要更深入地学习Java中的多线程同步问题,我强烈推荐阅读Brian Goetz所著的《Java Concurrency in Practice | Java 并发实践》,不读这本书你的 Java 多线程征程就不完整哦!这是我最向Java开发者推荐的书之一。
如何使用Wait
尽管关于wait和notify的概念很基础,它们也都是Object类的函数,但用它们来写代码却并不简单。如果你在面试中让应聘者来手写代码,
用wait和notify解决生产者消费者问题,我几乎可以肯定他们中的大多数都会无所适从或者犯下一些错误,例如在错误的地方使用
synchronized 关键词,没有对正确的对象使用wait,或者没有遵循规范的代码方法。说实话,这个问题对于不常使用它们的程序员来说确实令人感觉比较头疼。
第一个问题就是,我们怎么在代码里使用wait()呢?因为wait()并不是Thread类下的函数,我们并不能使用
Thread.call()。事实上很多Java程序员都喜欢这么写,因为它们习惯了使用Thread.sleep(),所以他们会试图使用wait()
来达成相同的目的,但很快他们就会发现这并不能顺利解决问题。正确的方法是对在多线程间共享的那个Object来使用wait。在生产者消费者问题中,这
个共享的Object就是那个缓冲区队列。
第二个问题是,既然我们应该在synchronized的函数或是对象里调用wait,那哪个对象应该被synchronized呢?答案是,那个
你希望上锁的对象就应该被synchronized,即那个在多个线程间被共享的对象。在生产者消费者问题中,应该被synchronized的就是那个
缓冲区队列。(我觉得这里是英文原文有问题……本来那个句末就不应该是问号不然不太通……)
永远在循环(loop)里调用 wait 和 notify,不是在 If 语句
现在你知道wait应该永远在被synchronized的背景下和那个被多线程共享的对象上调用,下一个一定要记住的问题就是,你应该永远在
while循环,而不是if语句中调用wait。因为线程是在某些条件下等待的——在我们的例子里,即“如果缓冲区队列是满的话,那么生产者线程应该等
待”,你可能直觉就会写一个if语句。但if语句存在一些微妙的小问题,导致即使条件没被满足,你的线程你也有可能被错误地唤醒。所以如果你不在线程被唤
醒后再次使用while循环检查唤醒条件是否被满足,你的程序就有可能会出错——例如在缓冲区为满的时候生产者继续生成数据,或者缓冲区为空的时候消费者
开始小号数据。所以记住,永远在while循环而不是if语句中使用wait!我会推荐阅读《Effective Java》,这是关于如何正确使用wait和notify的最好的参考资料。
基于以上认知,下面这个是使用wait和notify函数的规范代码模板:
// The standard idiom for calling the wait method in Java synchronized (sharedObject) { while (condition) { sharedObject.wait(); // (Releases lock, and reacquires on wakeup) } // do action based upon condition e.g. take or put into queue }
就像我之前说的一样,在while循环里使用wait的目的,是在线程被唤醒的前后都持续检查条件是否被满足。如果条件并未改变,wait被调用之前notify的唤醒通知就来了,那么这个线程并不能保证被唤醒,有可能会导致死锁问题。
Java wait(), notify(), notifyAll() 范例
下面我们提供一个使用wait和notify的范例程序。在这个程序里,我们使用了上文所述的一些代码规范。我们有两个线程,分别名为
PRODUCER(生产者)和CONSUMER(消费者),他们分别继承了了Producer和Consumer类,而Producer和
Consumer都继承了Thread类。Producer和Consumer想要实现的代码逻辑都在run()函数内。Main线程开始了生产者和消费
者线程,并声明了一个LinkedList作为缓冲区队列(在Java中,LinkedList实现了队列的接口)。生产者在无限循环中持续往
LinkedList里插入随机整数直到LinkedList满。我们在while(queue.size ==
maxSize)循环语句中检查这个条件。请注意到我们在做这个检查条件之前已经在队列对象上使用了synchronized关键词,因而其它线程不能在
我们检查条件时改变这个队列。如果队列满了,那么PRODUCER线程会在CONSUMER线程消耗掉队列里的任意一个整数,并用notify来通知
PRODUCER线程之前持续等待。在我们的例子中,wait和notify都是使用在同一个共享对象上的。
import java.util.LinkedList; import java.util.Queue; import java.util.Random; /** * Simple Java program to demonstrate How to use wait, notify and notifyAll() * method in Java by solving producer consumer problem. * * @author Javin Paul */ public class ProducerConsumerInJava { public static void main(String args[]) { System.out.println("How to use wait and notify method in Java"); System.out.println("Solving Producer Consumper Problem"); QueueInteger buffer = new LinkedList(); int maxSize = 10; Thread producer = new Producer(buffer, maxSize, "PRODUCER"); Thread consumer = new Consumer(buffer, maxSize, "CONSUMER"); producer.start(); consumer.start(); } } /** * Producer Thread will keep producing values for Consumer * to consumer. It will use wait() method when Queue is full * and use notify() method to send notification to Consumer * Thread. * * @author WINDOWS 8 * */ class Producer extends Thread { private QueueInteger queue; private int maxSize; public Producer(QueueInteger queue, int maxSize, String name){ super(name); this.queue = queue; this.maxSize = maxSize; } @Override public void run() { while (true) { synchronized (queue) { while (queue.size() == maxSize) { try { System.out .println("Queue is full, " + "Producer thread waiting for " + "consumer to take something from queue"); queue.wait(); } catch (Exception ex) { ex.printStackTrace(); } } Random random = new Random(); int i = random.nextInt(); System.out.println("Producing value : " + i); queue.add(i); queue.notifyAll(); } } } } /** * Consumer Thread will consumer values form shared queue. * It will also use wait() method to wait if queue is * empty. It will also use notify method to send * notification to producer thread after consuming values * from queue. * * @author WINDOWS 8 * */ class Consumer extends Thread { private QueueInteger queue; private int maxSize; public Consumer(QueueInteger queue, int maxSize, String name){ super(name); this.queue = queue; this.maxSize = maxSize; } @Override public void run() { while (true) { synchronized (queue) { while (queue.isEmpty()) { System.out.println("Queue is empty," + "Consumer thread is waiting" + " for producer thread to put something in queue"); try { queue.wait(); } catch (Exception ex) { ex.printStackTrace(); } } System.out.println("Consuming value : " + queue.remove()); queue.notifyAll(); } } } }
为了更好地理解这个程序,我建议你在debug模式里跑这个程序。一旦你在debug模式下启动程序,它会停止在PRODUCER或者
CONSUMER线程上,取决于哪个线程占据了CPU。因为两个线程都有wait()的条件,它们一定会停止,然后你就可以跑这个程序然后看发生什么了
(很有可能它就会输出我们以上展示的内容)。你也可以使用Eclipse里的Step into和Step over按钮来更好地理解多线程间发生的事情。
本文重点:
1. 你可以使用wait和notify函数来实现线程间通信。你可以用它们来实现多线程(3)之间的通信。
2. 永远在synchronized的函数或对象里使用wait、notify和notifyAll,不然Java虚拟机会生成 IllegalMonitorStateException。
3. 永远在while循环里而不是if语句下使用wait。这样,循环会在线程睡眠前后都检查wait的条件,并在条件实际上并未改变的情况下处理唤醒通知。
4. 永远在多线程间共享的对象(在生产者消费者模型里即缓冲区队列)上使用wait。
5. 基于前文提及的理由,更倾向用 notifyAll(),而不是 notify()。
这是关于Java里如何使用wait,
notify和notifyAll的所有重点啦。你应该只在你知道自己要做什么的情况下使用这些函数,不然Java里还有很多其它的用来解决同步问题的方
案。例如,如果你想使用生产者消费者模型的话,你也可以使用BlockingQueue,它会帮你处理所有的线程安全问题和流程控制。如果你想要某一个线
程等待另一个线程做出反馈再继续运行,你也可以使用CycliBarrier或者CountDownLatch。如果你只是想保护某一个资源的话,你也可
以使用Semaphore。
关于java生产消费模型实现和Java实现生产者消费者的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。
发布于:2022-12-09,除非注明,否则均为
原创文章,转载请注明出处。