疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

生产者消费者问题、Java实现


 

   

  我们先来温习下如下概念:

  1. 基础概念

  基本的

  程序 - Program

  程序是静态的源代码或目标程序,是一个没有生命的实体。

  进程 - Process

  当CPU赋予程序生命时也即操作系统执行它时,程序成为了一个活动的实体(但不是可执行的实体),称为进程 - 进行中的程序。

  进程是程序的一个实例;

  是计算机分配资源的基本单位;

  是线程的容器;

  线程 - Thread

  线程是进程中的一个可执行实体,是系统独立调度和基本单位,线程自己不拥有系统资源。

  线程是现代操作系统的重要指标。

  一个进程中的所有线程共享进程所有资源。

  进阶的

  1. 进程间通信 - IPC

  进程间通信 - Inter-Process Communication, IPC;

  IPC是标准的UNIX通信机制,是一组编程接口,让程序员能够协调不同的进程,使之能在一个OS里同时运行,并相互传递、交换信息。

  最初的IPC方法有两种:

  信号 - Signals

  是UNIX系统中使用最早的一种IPC方法。

  OS中预先规定好了一系列事件(信号源),比如一个键盘中断或者一个错误都是,这些事件能产生一个信号。OS通过信号来通知当前进程系统中发生了某种预先规定好的事件。

  当进程识别出信号的到来,就采取适当的动作来传送或处理信号。

  管道 - PIPE

  暂不表。

  后来,在System V UNIX(1983,正统UNIX)中又首次引入了3中IPC机制:

  消息队列 - message queues

  消息队列是消息的一个链表,它允许一个或多个进程向它写消息,一个或多个进程从中读取消息。

  Linux维护了一个MQ向量表:msgque表示消息队列。

  信号量/旗语 - semaphore

  信号量本质上是一个计数器,用来记录某个资源的存取状况。有互斥信号量、条件信号量。

  共享内存 - shared memory

  共享内存通常由一个进程创建,其余进程对这块内存进行读写。通常用信号量来管理共享内存。

  另外还有一种非常重要的方式:

  Socket - 套接字

  TCP/IP是网络通讯协议,而Socket是这些协议的一种实现API,最早实现在BSD UNIX中。今天Socket是最通用的网络编程API,所有提供TCP/IP协议栈的OS中都提供了SocketAPI。

  2. 线程间怎么通信?

  进程可以通过以上那么多IPC手段来通讯,那么线程呢?

  其实线程之间通讯很简单,因为同一个进程的线程们共享进程的资源,所以它们之间的通讯其实就是去读写共享资源。当然,可能需要互斥手段来保证数据一致性。

  tips:

  现在我想你能明白Java线程同步的相关知识点了。

  线程同步时所说的“共享资源”,比如类中的成员变量,它其实在JVM运行时内存中的堆中或方法区中,这两个区域是线程共享的。

  再说明白点,这个区域是JVM进程(Process)的资源区域,也就是所有JVM线程(Thread)可以读写的共享资源喽。

  线程同步是保证线程安全访问共享资源的一种手段,同步方法包括:

  事件 - Event

  互斥 - Mutex

  信号量/旗语 - Semaphore

  临界资源

  2. 生产者消费者问题

  1. 抛出问题

  生产者消费者问题(Producer-consumer problem)也可以叫有限缓冲问题(Bounded-buffer problem)。这个问题是一个很经典(简单)的进程/线程同步问题。

  问题如下:

  首先,工厂中有一个产品缓冲区,生产者会不停地往缓冲区中添加新产品,而消费者则是不停的从缓冲区取走产品。

  问题:

  怎样保证生产者不会在缓冲区满时加入数据,且消费者不会在缓冲区为空时消耗数据?

  解答也很简单:

  生产者必须在缓冲区满时休眠,直到消费者消耗了产品时才能被唤醒。

  同时,也必须让消费者在缓冲区空时休眠,直到生产者往缓冲区中添加产品时才能唤醒消费者。

  对于程序设计层面来说,解决方案就是上面1说的IPC方法,其中最常用的是信号量方法,所以我们再来具体说说信号量。

  2. 再说信号量 - semaphore

  1. 概述

  如上所述,信号量本质上是一个计数器,可以用来处理进程同步问题。

  1965年,荷兰计算机科学家艾兹格·迪杰斯特拉(Edsger W. Dijkstra)发明了Semaphore机制,现在广泛的应用在各种OS中。

  2. OS中的描述:

  在系统中,给予每个进程一个信号量,代表每个进程目前的状态,未得到控制权的进程会在特定的地方被强迫停下来,等待可以继续进行的信号到来。

  3. 种类:

  信号量是一个任意整数

  称为:计数信号量(Counting semaphore)或一般信号量(general semaphore)

  信号量只有二进制的0或1

  称为:二进制信号量(Binary semaphore)或互斥信号量 - Mutex。

  4. PV原语:

  Dijkstra同时提出了两个原语(原子语句)来操作semaphore:

  P原语

  P是荷兰语Proeren(测试)的首字母。

  P是阻塞原语,负责把当前进程由运行状态转换为阻塞状态,直到另一个进程唤醒它。

  代表的操作为:申请一个空闲资源(信号量-1),若成功,则退出;若失败,则该进程被阻塞;

  V原语

  V是荷兰语Verhogen(增加)的首字母。

  V为唤醒原语,负责把一个被阻塞的进程唤醒,它有一个参数表,记录着等待被唤醒的进程。

  代表的操作为:释放一个被占用的资源(信号量+1),若发现有被阻塞的线程,则选择一个唤醒之。

  5. 三种使用:

  3的两种种类和4的PV原语结合,可以对semaphore的操作可以分为三种情况:

  视semaphore为一个加锁标志位,实现对一个共享变量的互斥访问。

  过程:

  P(mutex); // mutex的初始值为1,访问该共享数据;

  V(mutex);

  //非临界区

  1

  2

  3

  1

  2

  3

  视semaphore为共享资源剩余个数,实现对一个类共享资源的访问。

  过程:

  P(mutex); // mutex的初始值为资源的个数N,使用该资源;

  V(mutex);

  //非临界区

 

  视semaphore为进程间的同步工具。

  过程:

  临界区C1;

  P(S);

  V(S);

  临界区C2;

 

  3. Java中实现

  生产者消费者问题具体来说有以下几种

  1个生产者1个消费者

  1个生产者N个消费者

  M个生产者N个消费者

  感觉网上大部分人写的都是教学性质的第一种,下面我自己按我的理解写了一个第三种,其实我也不知道对不对啊…… 因为我没找到我认为正确的、可以参阅的……

  各位读者,如有不当之处,还望能不吝赐教:

  import java.util.LinkedList;

  import java.util.Random;

  /**

  * 多生产者、多消费者的情况

  *

  * @author alanzhangyx

  *

  */

  public class ProducerConsumer {

  //定义一个队列缓冲区,数据为Integer

  LinkedList list = new LinkedList();

  //设置缓冲区最大容量

  static final int MAX_SIZE = 100;

  /**

  * 生产者。

  *

  *

生产者进行V原语操作

 

  *

     

      *

  • 获取缓冲区,如果缓冲区没有达到MAX_SIZE,则生产一个产品(n个也行)放入缓冲区,并唤醒所有线程
  •  

      *

  • 无论如何最后使自己休眠(这里是wait)
  •  

      *

 

  *

  * @version 1.0.0

  * @author alanzhangyx

  */

  class Producer implements Runnable{

  @Override

  public void run() {

  while (true) {

  synchronized (list){

  if (list.size() < MAX_SIZE) {

  int num = new Random().nextInt(100);

  list.add(num);

  list.notifyAll();

  System.out.println("生产者" + Thread.currentThread().getName() + "生产了产品:" + num + ",----此时缓冲区容量为" + list.size());

  }

  try {

  list.wait();

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  }

  }

  }

  }

  /**

  * 消费者。

  *

  *

消费者进行P原语操作

 

  *

     

      *

  • 获取缓冲区,如果缓冲区有数据,则从缓冲区取出一个产品(n个也行),并唤醒所有线程
  •  

      *

  • 无论如何最后使自己休眠(这里是wait)
  •  

      *

 

  *

  * @version 1.0.0

  * @author alanzhangyx

  */

  class Consumer implements Runnable{

  @Override

  public void run() {

  while (true) {

  synchronized (list){

  if (list.size() > 0) {

  int num = list.poll();//poll是Queue的操作,删除队列头元素

  System.out.println("消费者" + Thread.currentThread().getName() + "消费了产品:" + num + ",----此时缓冲区容量为" + list.size());

  list.notifyAll();

  }

  try {

  list.wait();

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  }

  }

  }

  }

  public static void main(String[] args) {

  ProducerConsumer pc = new ProducerConsumer();

  //Thread构造函数需要一个Runnable对象即可构造一个新的线程,Runnable对象可以重复利用,不必new多个

  //一个消费者,一个生产者

  Consumer c = pc.new Consumer();

  Producer p = pc.new Producer();

  //生产者和消费者谁先start都一样

  new Thread(c).start();

  new Thread(c).start();

  new Thread(c).start();

  new Thread(c).start();

  new Thread(c).start();

  new Thread(p).start();

  new Thread(p).start();

  new Thread(p).start();

  new Thread(p).start();

  new Thread(p).start();

  }

  }