疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

生产者消费者问题理解与Java实现


 

  生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程--即所谓的"生产者"和"消费者"--在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

  要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

  Java代码模拟生产者-消费者

  产品类

  package org.dennist.thread.demo;

  /**

  *

  * Product.java

  *

  * @version : 1.1

  *

  * @author :

  *

  * @since : 1.0 创建时间: 2013-2-25 上午09:03:38

  *

  * TODO : class Product.java is used for …

  *

  */

  public class Product { //产品类

  private int productId = 0;

  public Product(int productId){

  this.productId = productId;

  }

  public int getProductId() {

  return productId;

  }

  public void setProductId(int productId) {

  this.productId = productId;

  }

  @Override

  public String toString() {

  return ""+productId;

  }

  }

  仓库类

  package org.dennist.thread.demo;

  /**

  *

  * StoreHouse.java

  *

  * @version : 1.1

  *

  * @author : 

  *

  * @since : 1.0 创建时间: 2013-2-25 上午08:55:33

  *

  * TODO : 仓库

  *

  */

  public class StoreHouse {

  private int base = 0;

  private int top = 0;

  //仓库大小

  private Product[] products = new Product[10];

  /**

  * 生产产品

  * @param product

  */

  public synchronized void push(Product product){

  if(top==products.length){ //如果仓库已满,等待消费

  try {

  System.out.println("仓库已满,正在等待消费");

  wait();

  }catch (InterruptedException e) {

  System.out.println("stop push product because other reasons");

  }

  }

  //仓库未满,将生产的产品入库

  products[top] = product;

  //库中产品数量+1

  top++;

  }

  /**

  * 消费产品

  * @return

  */

  public synchronized Product pop() {

  Product product = null;

  while (top == base) { //仓库未空,不能消费

  notify();

  try {

  System.out.println("仓库已空,正等待生产…");

  wait();

  } catch (InterruptedException e) {

  System.out.println("stop push product because other reasons");

  }

  }

  //仓库未空,等待消费

  top--;

  product = products[top];

  products[top] = null;

  return product;

  }

  }

  生产者类

  package org.dennist.thread.demo;

  /**

  *

  * Producer.java

  *

  * @version : 1.1

  *

  * @author : 

  *

  * @since : 1.0 创建时间: 2013-2-25 上午08:53:16

  *

  * TODO : 生产者

  *

  */

  public class Producer implements Runnable{

  private String producerName ;

  private StoreHouse storeHouse ;

  public Producer(String producerName, StoreHouse storeHouse) {

  this.producerName = producerName;

  this.storeHouse = storeHouse;

  }

  public void setProducerName(String producerName) {

  this.producerName = producerName;

  }

  public String getProducerName() {

  return producerName;

  }

  @Override

  public void run() {

  execProcuct();

  }

  private void execProcuct() {

  int i = 0;

  while(true){

  i++;

  Product pro = new Product(i);

  storeHouse.push(pro);

  System.out.println(getProducerName() + " 生产了 " + pro);

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  return;

  }

  }

  }

  }

  消费者类

  package org.dennist.thread.demo;

  /**

  *

  * Consumer.java

  *

  * @version : 1.1

  *

  * @author : 

  *

  * @since : 1.0 创建时间: 2013-2-25 上午08:53:47

  *

  * TODO : 消费者

  public class Consumer implements Runnable{

  private String consumerName = null;

  private StoreHouse storeHouse = null;

  public Consumer(String consumerName, StoreHouse storeHouse) {

  this.consumerName = consumerName;

  this.storeHouse = storeHouse;

  }

  public void setConsumerName(String consumerName) {

  this.consumerName = consumerName;

  }

  public String getConsumerName() {

  return consumerName;

  }

  public void execConsume() {

  while (true) {

  System.out.println(getConsumerName() + " 消费了 " + storeHouse.pop());

  try {

  Thread.sleep(5000);

  } catch (InterruptedException e) {

  return;

  }

  }

  }

  @Override

  public void run() {

  execConsume();

  }

  }

  测试主类

  package org.dennist.thread.demo;

  /**

  *

  * TestPC.java

  *

  * @version : 1.1

  *

  * @author 

  *

  * @since : 1.0 创建时间: 2013-2-25 上午09:18:52

  *

  * TODO : 生产者消费者模拟

  *

  */

  public class TestPC {

  public static void main(String[] args) {

  StoreHouse storeHouse = new StoreHouse();

  Producer producer = new Producer("生产者", storeHouse);

  Consumer comsumer = new Consumer("消费者", storeHouse);

  Thread t1 = new Thread(producer);

  Thread t2 = new Thread(comsumer);

  t1.start();

  t2.start();

  }

  }

  关于JAVA多线程同步

  JAVA多线程同步主要依赖于若干方法和关键字

  1 wait方法:

  该方法属于Object的方法,wait方法的作用是使得当前调用wait方法所在部分(代码块)的线程停止执行,并释放当前获得的调用wait所在的代码块的锁,并在其他线程调用notify或者notifyAll方法时恢复到竞争锁状态(一旦获得锁就恢复执行)。

  调用wait方法需要注意几点:

  第一点:wait被调用的时候必须在拥有锁(即synchronized修饰的)的代码块中。

  第二点:恢复执行后,从wait的下一条语句开始执行,因而wait方法总是应当在while循环中调用,以免出现恢复执行后继续执行的条件不满足却继续执行的情况。

  第三点:若wait方法参数中带时间,则除了notify和notifyAll被调用能激活处于wait状态(等待状态)的线程进入锁竞争外,在其他线程中interrupt它或者参数时间到了之后,该线程也将被激活到竞争状态。

  第四点:wait方法被调用的线程必须获得之前执行到wait时释放掉的锁重新获得才能够恢复执行。

  2 notify方法和notifyAll方法:

  notify方法通知调用了wait方法,但是尚未激活的一个线程进入线程调度队列(即进入锁竞争),注意不是立即执行。并且具体是哪一个线程不能保证。另外一点就是被唤醒的这个线程一定是在等待wait所释放的锁。

  notifyAll方法则唤醒所有调用了wait方法,尚未激活的进程进入竞争队列。

  3 synchronized关键字:

  第一点:synchronized用来标识一个普通方法时,表示一个线程要执行该方法,必须取得该方法所在的对象的锁。

  第二点:synchronized用来标识一个静态方法时,表示一个线程要执行该方法,必须获得该方法所在的类的类锁。

  第三点:synchronized修饰一个代码块。类似这样:synchronized(obj) { //code… }.表示一个线程要执行该代码块,必须获得obj的锁。这样做的目的是减小锁的粒度,保证当不同块所需的锁不冲突时不用对整个对象加锁。利用零长度的byte数组对象做obj非常经济。

  4 atomic action(原子操作):

  在JAVA中,以下两点操作是原子操作。但是c和c++中并不如此。

  第一点:对引用变量和除了long和double之外的原始数据类型变量进行读写。

  第二点:对所有声明为volatile的变量(包括long和double)的读写。

  另外:在java.util.concurrent和java.util.concurrent.atomic包中提供了一些不依赖于同步机制的线程安全的类和方法。

  附录:

  进程间通信

  进程间通信(IPC,Inter-Process Communication),指至少两个进程或线程间传送数据或信号的一些技术或方法。线程是计算机系统分配资源的最小单位。每个进程都有自己的一部分独立的系统资源,彼此是隔离的。为了能使不同的进程互相访问资源并进行协调工作,才有了进程间通信。这些进程可以运行在同一计算机上或网络连接的不同计算机上。

  进程间通信技术包括消息传递、同步、共享内存和远程过程调用。IPC是一种标准的Unix通信机制。

  主要的IPC方法有

  (1)管道(Pipe):管道可用于具有亲缘关系进程间的通信,允许一个进程和另一个与它有共同祖先的进程之间进行通信。

  (2)命名管道(named pipe):命名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘关系进程间的通信。命名管道在文件系统中有对应的文件名。命名管道通过命令mkfifo或系统调用mkfifo来创建。

  (3)信号(Signal):信号是比较复杂的通信方式,用于通知接受进程有某种事件发生,除了用于进程间通信外,进程还可以发送信号给进程本身;linux除了支持Unix早期信号语义函数sigal外,还支持语义符合Posix.1标准的信号函数sigaction(实际上,该函数是基于BSD的,BSD为了实现可靠信号机制,又能够统一对外接口,用sigaction函数重新实现了signal函数)。

  (4)消息(Message)队列:消息队列是消息的链接表,包括Posix消息队列system V消息队列。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。消息队列克服了信号承载信息量少,管道只能承载无格式字节流以及缓冲区大小受限等缺

  (5)共享内存:使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。

  (6)内存映射(mapped memory):内存映射允许任何多个进程间通信,每一个使用该机制的进程通过把一个共享的文件映射到自己的进程地址空间来实现它。

  (7)信号量(semaphore):主要作为进程间以及同一进程不同线程之间的同步手段。

  (8)套接口(Socket):更为一般的进程间通信机制,可用于不同机器之间的进程间通信。起初是由Unix系统的BSD分支开发出来的,但现在一般可以移植到其它类Unix系统上:Linux和System V的变种都支持套接字。