疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

一个简单的java多线程例子


 

 
现在有这样一个任务,有一份手机号列表(20W),有一份话单的列表(10W),要统计哪些手机号没有出现在话单中,哪些手机号在话单中出现了不止一次。
想到的最直接的方式,就是两层循环去遍历,虽然此方法比较笨,但目前还没有想出更好的办法。
一开始使用单线程来处理,代码是随手写的并没有进行重构,只是做一个简单的说明:
[java] 
package tool;  
  
import java.util.List;  
  
public class SingleThread  
{  
    public static void main(String[] args)  
    {  
        SingleThread st = new SingleThread();  
  
        String userIdPath = "D:\shell\store_bak\tool\userid.txt";  
        List<String> userIds = Util.readUserId(userIdPath);  
        List<String> cdrItems = Util.readCdrItem();  
  
        st.process(userIds, cdrItems);  
    }  
  
    /** 
     *  
     * @param userIds 
     * @param cdrItems 
     */  
    private void process(List<String> userIds, List<String> cdrItems)  
    {  
        long startTime = System.currentTimeMillis();  
        int count = 0;  
        for (String key : userIds)  
        {  
            String[] uninKeys = key.split("\s+");  
            count = 0;  
            for (String cdr : cdrItems)  
            {  
                if (cdr.contains("|" + uninKeys[0] + "|")  
                        && cdr.contains("|" + uninKeys[1] + "|"))  
                {  
                    count++;  
                }  
            }  
        }  
        System.out.println((System.currentTimeMillis() - startTime) / 1000);  
    }  
  
}  
 
Util中的代码就不给出了,就是简单的文件读取操作,整个过程处理下来速度并不是太快,其中最耗时的操作在contains方法上,一开始使用的并不是contains方法,而是使用的正则表达式匹配,结果发现正则表达式的效率并不高,因此改用contains方法。但是效率还是不太理想。因此考虑使用多线程来处理。
和传统的生产者消费者不同,这里实际上只有消费者,因为产生原始数据几乎不耗时,最容易想到的办法就是定义个共享的index标志,依次互斥的进行+1操作,因此这里的index就是一个共享的变量,需要进行同步。直接使用jdk中提供的AtomicInteger,代码如下:
[java] 
package tool;  
  
import java.util.List;  
import java.util.concurrent.BrokenBarrierException;  
import java.util.concurrent.CyclicBarrier;  
import java.util.concurrent.atomic.AtomicInteger;  
  
public class MutiThread  
{  
    private static AtomicInteger lock = new AtomicInteger(0);  
  
    public static void main(String[] args)  
    {  
        MutiThread tool = new MutiThread();  
        String userIdPath = "D:\shell\store_bak\tool\userid.txt";  
        List<String> userIds = Util.readUserId(userIdPath);  
        List<String> cdrItems = Util.readCdrItem();  
  
        tool.work2(lock, userIds, cdrItems);  
    }  
  
    public void work2(AtomicInteger lock, List<String> userIds,  
            List<String> cdrItems)  
    {  
        final long startTime = System.currentTimeMillis();  
        CyclicBarrier cb = new CyclicBarrier(5, new Runnable()  
        {  
  
            @Override  
            public void run()  
            {  
                System.out.println((System.currentTimeMillis() - startTime) / 1000);  
            }  
        });  
        for (int i = 0; i < 5; i++)  
        {  
            new Thread(new Worker(userIds, cdrItems, lock, cb)).start();  
        }  
    }  
  
    class Worker implements Runnable  
    {  
        private List<String> userIds;  
        private List<String> cdrItems;  
        private AtomicInteger lock;  
        private CyclicBarrier cb;  
  
        public Worker(List<String> userIds, List<String> cdrItems,  
                AtomicInteger lock, CyclicBarrier cb)  
        {  
            this.userIds = userIds;  
            this.cdrItems = cdrItems;  
            this.lock = lock;  
            this.cb = cb;  
        }  
  
        @Override  
        public void run()  
        {  
            while (true)  
            {  
                int index = lock.getAndIncrement();  
                if (index >= userIds.size())  
                    break;  
                String id = userIds.get(index);  
                process1(id, cdrItems);  
            }  
  
            try  
            {  
                cb.await();  
            } catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            } catch (BrokenBarrierException e)  
            {  
                e.printStackTrace();  
            }  
        }  
  
    }  
  
    private void process1(String id, List<String> cdrItems)  
    {  
        String[] uninKeys = id.split("\s+");  
        int count = 0;  
        for (String cdr : cdrItems)  
        {  
            if (cdr.contains("|" + uninKeys[0] + "|")  
                    && cdr.contains("|" + uninKeys[1] + "|"))  
            {  
                count++;  
            }  
        }  
    }  
  
}  
 
使用多线程的方式确实能够提高不少效率,尤其是数据量大的时候,至少是两倍的速度,这里的线程数也不是越多越好,因为JVM对线程的调度也会消耗资源。
针对这个场景,考虑下concurrenthashmap的实现,可以将资源进行分段处理,可以巧妙的避开多线程的资源征用,因此可以将list分成不同的段,交给不同的线程去处理,代码如下:
[java]
package tool;  
  
import java.util.List;  
import java.util.concurrent.BrokenBarrierException;  
import java.util.concurrent.CyclicBarrier;  
import java.util.concurrent.atomic.AtomicInteger;  
  
public class MutiSegmentMutiThread  
{  
    private static AtomicInteger lock = new AtomicInteger(0);  
    private static int ThreadNum = 10;  
  
    public static void main(String[] args)  
    {  
        MutiSegmentMutiThread tool = new MutiSegmentMutiThread();  
        String userIdPath = "D:\shell\store_bak\tool\userid.txt";  
        List<String> userIds = Util.readUserId(userIdPath);  
        List<String> cdrItems = Util.readCdrItem();  
  
        tool.work2(lock, userIds, cdrItems);  
    }  
  
    public void work2(AtomicInteger lock, List<String> userIds,  
            List<String> cdrItems)  
    {  
        final long startTime = System.currentTimeMillis();  
        CyclicBarrier cb = new CyclicBarrier(ThreadNum, new Runnable()  
        {  
            @Override  
            public void run()  
            {  
                System.out.println((System.currentTimeMillis() - startTime) / 1000);  
            }  
        });  
        int segmentSize = userIds.size() / ThreadNum;  
        int start = 0;  
        int end = 0;  
        for (int i = 0; i < ThreadNum; i++)  
        {  
            start = i * segmentSize;  
            if (i == ThreadNum - 1)  
            {  
                end = userIds.size();  
            } else  
            {  
                end = (i + 1) * segmentSize;  
            }  
            new Thread(new Worker(userIds, cdrItems, cb, start, end)).start();  
        }  
    }  
  
    class Worker implements Runnable  
    {  
        private List<String> userIds;  
        private List<String> cdrItems;  
        private CyclicBarrier cb;  
        private int start;  
        private int end;  
  
        public Worker(List<String> userIds, List<String> cdrItems,  
                CyclicBarrier cb, int start, int end)  
        {  
            this.userIds = userIds;  
            this.cdrItems = cdrItems;  
            this.cb = cb;  
            this.start = start;  
            this.end = end;  
        }  
  
        @Override  
        public void run()  
        {  
            for (int i = start; i < end; i++)  
            {  
                String id = userIds.get(i);  
                process1(id, cdrItems);  
            }  
            try  
            {  
                cb.await();  
            } catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            } catch (BrokenBarrierException e)  
            {  
                e.printStackTrace();  
            }  
        }  
  
    }  
  
    private void process1(String id, List<String> cdrItems)  
    {  
        String[] uninKeys = id.split("\s+");  
        int count = 0;  
        for (String cdr : cdrItems)  
        {  
            if (cdr.contains("|" + uninKeys[0] + "|")  
                    && cdr.contains("|" + uninKeys[1] + "|"))  
            {  
                count++;  
            }  
        }  
    }  
  
}  
 
实际测试中第三种方式确实比第二种要快些,但是提升并不是很明显。以上的代码只是为解决问题提供一个思路,想必还能够继续优化,如果数据量非常大,可以考虑使用分布式计算了。