疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

Java-多线程(高级)


 

 阻塞队列:

 
 
1)BlockingQueue该接口提供了:
 
add()/remove() 如果当队列没有数据,从队列中取数据;或者队列中数据已满,
 
向队列中添加数据;则会抛出异常.
 
put()/take() 如果当队列没有数据,从队列中取数据;或者队列中数据已满,
 
向队列中添加数据;则会形成阻塞.
 
offer()/poll() 会给调用者返回特殊的值,开发者可以通过这些值做相应的处理
 
同时还提供了超时版本.
 
2)接口实现
 
ArrayBlockingQueue>由数组实现的有界队列,默认情况下没有指定公平策略(也就是
 
一般的FIFO先进先出策略),如果不启动策略,会导致共享资源被贪婪的线程长时间占有,
 
而无法获取资源的线程可能死掉,这种情况称为饿死;
 
LinkedBlockingQueue>将最大的容量变为可选,默认的容量为整型最大值,也就是不存在
 
生产者生产加入队列时产生阻塞的情况.该队列一般在要求较低的情况下使用.
 
PriorityBlockingQueue>无界队列,由线程对象的优先级决定获取cpu操作时间,同时,
 
开发着也可以提供自己的比较器,比如相同扩展相同优先级的线程.
 
DelayedQueue>是用类似栈维护的特殊的优先级队列.
 
1.检索前指定时间内保持驻留在队列中.
 
2.按照驻留时间排序,最长驻留时间位于底部.
 
3.只允许检索过期后的对象,当队列中没有过期对象.poll返回null,peek
 
则获取栈顶的对象.
 
SynchronousQueue>实现了每个插入操作都必须等待对应的移除操作;队列始终为空,
 
当,发现队列有东西,就会有对应的消费着瞬间消费这些东西;
 
TransferQueue>该接口扩展了BlockingQueue.并且LinkedTransferQueue提供了
 
接口的具体实现;该接口扩展了BlockingQueue的put方法为transfer(),该方法
 
为超时的非阻塞调用.同时,该接口提供了获取等待消费者的数量检测.
 
/** 
* @author Lean @date:2014-9-28 
*/ 
public class StockExchange {
 
public static void main(String[] args) {
    BlockingQueue<Integer> queue=new LinkedBlockingQueue<Integer>();
 
    Saller saller=new Saller(queue);
    Buyer buyer=new Buyer(queue);
    Thread[] sallerThreads=new Thread[20];
    Thread[] buyerThreads=new Thread[20];
    for (int i = 0; i <sallerThreads.length; i++) {
        sallerThreads[i]=new Thread(saller);
        sallerThreads[i].start();
        buyerThreads[i]=new Thread(buyer);
        buyerThreads[i].start();
    }
    try {
        Thread.sleep(20);
    } catch (InterruptedException e) {
    }
    System.out.println("all thread interrupt!");
    for (Thread thread : sallerThreads) {
        thread.interrupt();
    }
    for (Thread thread : buyerThreads) {
        thread.interrupt();
    }
}
 
static class Saller implements Runnable{
 
    private BlockingQueue<Integer> mQueue;
    private boolean shutDownRequest;
 
    public Saller(BlockingQueue<Integer> queue){
        mQueue=queue;
    }
 
    @Override
    public void run() {
        while (shutDownRequest==false) {
            int quantity=(int)(Math.random()*100);
            try {
                mQueue.put(quantity);
// System.out.println(“saller order by Thread:”+Thread.currentThread().getName()+” quantity:”+quantity); 
} catch (InterruptedException e) { 
shutDownRequest=true; 
}
 
}
 
static class Buyer implements Runnable{
 
    private BlockingQueue<Integer> mQueue;
    private boolean shutDownRequest;
 
    public Buyer(BlockingQueue<Integer> queue){
        mQueue=queue;
    }
 
    @Override
    public void run() {
        while (shutDownRequest==false) {
            try {
                System.out.println("buyer order by Thread:"+Thread.currentThread().getName()+"  quantity:"+mQueue.take());
            } catch (InterruptedException e) {
                shutDownRequest=true;
            }
        }
    }
 
}
}
/** 
* @author Lean @date:2014-9-28 
*/ 
public class LuckyNumberGenerator {
 
public static void main(String[] args) {
    TransferQueue<String> queue=new LinkedTransferQueue<String>();
    Thread producerThread=new Thread(new Producer(queue));
    producerThread.setDaemon(true);
    producerThread.start();
    for (int i = 0; i < 20; i++) {
        Thread comsumerThread=new Thread(new Comsumer(queue));
        comsumerThread.setDaemon(true);
        comsumerThread.start();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    System.out.println(Thread.currentThread().getThreadGroup().activeCount());
}
 
static class Producer implements Runnable{
 
    private TransferQueue<String> mQueue;
 
    public Producer(TransferQueue<String> queue){
        this.mQueue=queue;
    }
 
    public String product(){
        return "your lucky number is: "+((int)(Math.random()*100));
    }
 
    @Override
    public void run() {
        while (true) {
            try {
                if (mQueue.hasWaitingConsumer()) {
                        mQueue.put(product());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
}
 
static class Comsumer implements Runnable{
 
    private TransferQueue<String> mQueue;
 
    public Comsumer(TransferQueue<String> queue){
        this.mQueue=queue;
    }
 
    @Override
    public void run() {
        try {
            System.out.println(mQueue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
}
}
同步器:
 
1)信号量Semaphore
 
指定代理个数,在某一时间内,查看当前是否有代理处理事情, 处理完事件,释放代理; 
/** 
* @author Lean 
*/ 
public class Bank {
private static final int COUNT=100;
private static final Semaphore semaphore=new Semaphore(2,true);
 
public static void main(String[] args) {
    for (int i = 0; i < COUNT; i++) {
        final int count=i;
        new Thread(){
            @Override
            public void run() {
                try {
                    if (semaphore.tryAcquire(10, TimeUnit.MILLISECONDS)) {
                        try {
                            Teller.getService(count);
                        }finally{
                            semaphore.release();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();
    }
}
 
static class Teller{
    public static void getService(int i){
        System.out.println("serving:"+i);
        try {
            Thread.sleep((long)(Math.random()*10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
2)屏障CyclicBarrier
 
指多个线程到达某个点后停止执行(调用CyclicBarrier对象的 
wawit()方法)当多个任务(到达构造参数的指定的个数)达到指定的位置后,执行CyclicBarrier构造参数的Runnable;
/** 
* 屏障(会合点) 
* sample:计算平方和 
* @author Lean @date:2014-9-29 
*/ 
public class CalculateSum {
 
public static final int COUNT=3;
public static int[] tempArray=new int[COUNT];
 
public static void main(String[] args) {
    CyclicBarrier barrier=new CyclicBarrier(COUNT,new Runnable() {
 
        @Override
        public void run() {
            int sum=0;
            for (int i = 0; i < COUNT; i++) {
                sum=sum+tempArray[i];
            }
            System.out.println("the result is:"+sum);
        }
    });
    for (int i = 0; i <COUNT; i++) {
        new Thread(new Square(i,barrier)).start();
    }
    System.out.println("caculate now...");
}
 
static class Square implements Runnable{
 
    private int initSize;
    private CyclicBarrier barrier;
 
    public Square(int initSize,CyclicBarrier barrier){
        this.initSize=initSize;
        this.barrier=barrier;
    }
 
    @Override
    public void run() {
        int result=initSize*initSize;
        tempArray[initSize]=result;
        try {
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
 
 
}
3)倒计数闭锁CountDownLatch
 
构造CountDownLatch的时候指定倒数个数,调用await(),倒数加1, 
调用,countDown(),倒数-1,当倒数为0时,执行run()方法内await()后的代码.
/** 
* @author Lean @date:2014-9-29 
*/ 
public class EnhancedStockExchange {
 
public static void main(String[] args) {
    BlockingQueue<Integer> queue=new LinkedBlockingQueue<Integer>();
    CountDownLatch startLatch=new CountDownLatch(1);
    final CountDownLatch stopLatch=new CountDownLatch(200);
    Producer producer=new Producer(startLatch, stopLatch, queue);
    Saller saller=new Saller(startLatch, stopLatch, queue);
    Thread[] sellerThreads=new Thread[100];
    for (int i = 0; i < sellerThreads.length; i++) {
        sellerThreads[i]=new Thread(saller);
        sellerThreads[i].start();
    }
    Thread[] producerThreads=new Thread[100];
    for (int i = 0; i < producerThreads.length; i++) {
        producerThreads[i]=new Thread(producer);
        producerThreads[i].start();
    }
    //倒数闭锁,当前倒数为1,执行如下函数,倒数0;
    startLatch.countDown();
 
    new Thread(new Runnable() {
 
        @Override
        public void run() {
            try {
                //执行await(),倒数201;
                stopLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("all thread countdown!");
        }
    }).start();
 
 
    try {
        Thread.sleep(20);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Terminating...");
    //执行interrupt(),执行while语句后的mStopLatch.countDown();倒数为1
    for (Thread thread : sellerThreads) {
        thread.interrupt();
    }
    for (Thread thread : producerThreads) {
        thread.interrupt();
    }
    //倒数为0,执行run()方法内await()后的代码;
    stopLatch.countDown();
}
 
 
static class Producer implements Runnable{
 
    public CountDownLatch mStartLatch;
    public CountDownLatch mStopLatch;
    private BlockingQueue<Integer> mQueue;
    private boolean shutDownRequest;
 
    public Producer(CountDownLatch startLatch,CountDownLatch stopLatch,BlockingQueue<Integer> queue){
        mStartLatch=startLatch;
        mStopLatch=stopLatch;
        mQueue=queue;
    }
 
    @Override
    public void run() {
        try {
            mStartLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (shutDownRequest==false) {
            try {
                mQueue.put((int)(Math.random()*(100)));
            } catch (InterruptedException e) {
                shutDownRequest=true;
            }
        }
        mStopLatch.countDown();
    }
 
}
 
static class Saller implements Runnable{
 
    public CountDownLatch mStartLatch;
    public CountDownLatch mStopLatch;
    private BlockingQueue<Integer> mQueue;
    private boolean shutDownRequest;
 
    public Saller(CountDownLatch startLatch,CountDownLatch stopLatch,BlockingQueue<Integer> queue){
        mStartLatch=startLatch;
        mStopLatch=stopLatch;
        mQueue=queue;
    }
 
    @Override
    public void run() {
        try {
            mStartLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (shutDownRequest==false) {
            try {
                System.out.println("saller comsume: "+mQueue.take());
            } catch (InterruptedException e) {
                shutDownRequest=true;
            }
        }
        mStopLatch.countDown();
    }
 
}
4)移相器Phaser >实现屏障一样的功能,相比于屏障和倒计数闭锁,Phaser实例manager 提供了可伸缩的等待数目. 
在运行的过程中,动态增加拦截数可调用 manager.register();当调用manager.arriveAndDeregister()时,当前所有
 
等待线程继续执行;在线程执行中,可调用manager.arriveAndAwaitAdvance();
 
等待其他线程;同时我们可以调用manager.getArrivedParties()查看等待线程数;
 
/** 
* @author Lean @date:2014-9-29 
*/ 
public class HorseRace {
 
private final int NUMBER_OF_HORSE=12;
private static final int INIT_PARTIES=1;
private static final Phaser manager=new Phaser(INIT_PARTIES);
 
public static void main(String[] args) {
    //检查准备就绪的马匹数量
    Thread raceMonitor=new Thread(new RaceMonitor());
    raceMonitor.setDaemon(true);
    raceMonitor.start();
 
    new HorseRace().managerRace();
 
}
 
 
private void managerRace() {
    ArrayList<Horse> horses=new ArrayList<HorseRace.Horse>();
    for (int i = 0; i < NUMBER_OF_HORSE; i++) {
        horses.add(new Horse());
    }
    runRace(horses);
}
 
private void runRace(Iterable<Horse> horses) {
    for (final Horse horse : horses) {
        manager.register();
        new Thread(){
            @Override
            public void run() {
                try {
                    Thread.sleep((new Random()).nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                manager.arriveAndAwaitAdvance();
                horse.run();
            };
        }.start();
    }
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    manager.arriveAndDeregister();
}
 
 
 
 
private static class RaceMonitor implements Runnable{
 
    @Override
    public void run() {
        while (true) {
// System.out.println(“number of horses to run:”+HorseRace.manager.getArrivedParties()); 
try { 
Thread.sleep(1); 
} catch (InterruptedException e) { 
e.printStackTrace(); 
}
 
}
 
private static class Horse implements Runnable{
 
    private static final AtomicInteger idSource=new AtomicInteger();
    private final int id=idSource.incrementAndGet();
 
    @Override
    public void run() {
        System.out.println(toString()+" is running");
    }
 
    @Override
    public String toString() {
        return "Horse [id=" + id + "]";
    }
 
}
5)交换器Exchanger
 
类型T为两个线程交换的对象,在某些相同操作的批量编程中,其中一类线程
 
负责生产对象,另一类编程负责消耗对象,对于线程间共享数据,前面介绍了锁
 
的定义,当我们使用JAVA提供的Exchanger传输对象,不需要锁的概念.
 
buffers=ProductExchange.exchanger.exchange(buffers, 1000,TimeUnit.MILLISECONDS);
 
该对象的exchange方法参数传递了该线程其他线程的数据,并返回了其他线程返回的数据
 
/** 
* @author Lean @date:2014-9-29 
*/ 
public class ProductExchange {
 
public static Exchanger<ArrayList<Integer>> exchanger=new Exchanger<ArrayList<Integer>>();
 
public static void main(String[] args) {
    Thread producerThread=new Thread(new Producer());
    Thread comsumeThread=new Thread(new Comsume());
    producerThread.start();
    comsumeThread.start();
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
 
    producerThread.interrupt();
    comsumeThread.interrupt();
 
}
 
private static class Producer implements Runnable{
 
    private static ArrayList<Integer> buffers=new ArrayList<Integer>();
    private boolean okToRun=true;
 
    @Override
    public void run() {
        while (okToRun) {
            try {
                if (buffers.isEmpty()) {
                    for (int i = 0; i <10; i++) {
                        buffers.add((int)(Math.random()*100));
                    }
                    Thread.sleep(200);
                    for (int i : buffers) {
                        System.out.print(i+" ,");
                    }
                    System.out.println("");
                    buffers=ProductExchange.exchanger.exchange(buffers, 1000,TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException e) {
                okToRun=false;
            } catch (TimeoutException e) {
                System.out.println("produce time out!");
            }
        }
    }
 
}
 
private static class Comsume implements Runnable{
 
    private static ArrayList<Integer> buffers=new ArrayList<Integer>();
    private boolean okToRun=true;
 
    @Override
    public void run() {
        while (okToRun) {
            try {
                if (buffers.isEmpty()) {
                    buffers=ProductExchange.exchanger.exchange(buffers);
                    for (int i : buffers) {
                        System.out.print(i+" ,");
                    }
                    System.out.println("");
                    Thread.sleep(200);
                    buffers.clear();
                }
            } catch (InterruptedException e) {
                okToRun=false;
            }
        }
    }
 
}
}
 
1、具有1-5工作经验的,面对目前流行的技术不知从何下手,需要突破技术瓶颈的可以参加疯狂软件Java培训。
 
2、在公司待久了,过得很安逸,但跳槽时面试碰壁。需要在短时间内进修、跳槽拿高薪的可以加群。
 
3、如果没有工作经验,但基础非常扎实,对java工作机制,常用设计思想,常用java开发框架掌握熟练的,可以加群。
 
4、觉得自己很牛B,一般需求都能搞定。但是所学的知识点没有系统化,很难在技术领域继续突破的可以疯狂软件Java培训。。