博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Map主动通知线程消费
阅读量:6415 次
发布时间:2019-06-23

本文共 8695 字,大约阅读时间需要 28 分钟。

hot3.png

  1. 正文代码

import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CopyOnWriteArrayList;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;/** * 注册一个或多个消费线程的Map.  * 当缓存中存在数据就跑已注册的线程 * 

 *  注册的线程中需要做删除该map中的元素处理 * 

 * 否则就如果为空,就等待. * * @param 
 *            the key type * @param 
 *            the value type * @author ming.peng * @date 2013-12-19 * @since 4.0.0 */@SuppressWarnings("serial")public class ConsumConcurrentHashMap
 extends ConcurrentHashMap
 { /** The lock, 锁,向Map中增加数据提醒休眠线程,及Map中无数据时让线程处于等待. */ private final ReentrantLock lock = new ReentrantLock(); /** Condition for waiting takes */ private final Condition notEmpty = lock.newCondition(); /** 默认设置线程池,用于执行任务, 线程执行器. */ private ExecutorService executor = Executors.newFixedThreadPool(5); /** The runnables. */ private final List
 runnables = new CopyOnWriteArrayList
(); /**  * Instantiates a new drives concurrent hash map.  */ public ConsumConcurrentHashMap() { super(); } /**  * Instantiates a new drives concurrent hash map.  *  * @param initialCapacity  *            the initial capacity  * @param loadFactor  *            the load factor  * @param concurrencyLevel  *            the concurrency level  */ public ConsumConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { super(initialCapacity, loadFactor, concurrencyLevel); } /**  * Instantiates a new drives concurrent hash map.  *  * @param initialCapacity  *            the initial capacity  * @param loadFactor  *            the load factor  */ public ConsumConcurrentHashMap(int initialCapacity, float loadFactor) { super(initialCapacity, loadFactor); } /**  * Instantiates a new drives concurrent hash map.  *  * @param initialCapacity  *            the initial capacity  */ public ConsumConcurrentHashMap(int initialCapacity) { super(initialCapacity); } /**  * Instantiates a new drives concurrent hash map.  *  * @param m  *            the m  */ public ConsumConcurrentHashMap(Map
 m) { super(m); } /**  * Gets the executor.  *  * @return the executor  */ public ExecutorService getExecutor() { return executor; } /**  * Sets the executor.  *  * @param executor  *            the new executor  */ public void setExecutor(ExecutorService executor) { this.executor = executor; } /**  * Regsiter runnable.  *  * @param runnable  *            the runnable  */ public void regsiterRunnable(Runnable runnable) { ConsumerRunnable run = new ConsumerRunnable(runnable); this.runnables.add(run); executor.execute(run); } /**  * Regsiter runnable.  *  * @param runName  *            the run name  * @param runnable  *            the runnable  */ public void regsiterRunnable(String runName, Runnable runnable) { ConsumerRunnable run = new ConsumerRunnable(runName, runnable); this.runnables.add(run); executor.execute(run); } /**  * 删除线程,但不会立马结束线程  * Removes the runnable.  *  * @param runnable  *            the runnable  */ public void removeRunnable(String runName) { for (ConsumerRunnable run : this.runnables) { if (null != run.getRunName() && run.getRunName().equals(runName)) { run.setActive(false); // 标记线程退出 this.runnables.remove(run); // 把线程对象从队列中删除 } } } /**  * 删除线程,但不会立马结束线程  * Removes the runnable.  *  * @param runnable  *            the runnable  */ public void removeRunnable(Runnable runnable) { for (ConsumerRunnable run : this.runnables) { if (run.getRunnable().equals(runnable)) { run.setActive(false); // 标记线程退出 this.runnables.remove(run); // 把线程对象从队列中删除 } } } /*  * (non-Javadoc)  *  * @see java.util.concurrent.ConcurrentHashMap#put(java.lang.Object,  * java.lang.Object)  */ @Override public V put(K key, V value) { V v = super.put(key, value); notifyConsumerRunnables(); return v; } /*  * (non-Javadoc)  *  * @see java.util.concurrent.ConcurrentHashMap#putIfAbsent(java.lang.Object,  * java.lang.Object)  */ @Override public V putIfAbsent(K key, V value) { V v = super.putIfAbsent(key, value); notifyConsumerRunnables(); return v; } /*  * (non-Javadoc)  *  * @see java.util.concurrent.ConcurrentHashMap#putAll(java.util.Map)  */ @Override public void putAll(Map
 m) { super.putAll(m); notifyConsumerRunnables(); } /**  * notify consumer threads  */ private void notifyConsumerRunnables() { lock.lock(); try{ if (this.size() > this.runnables.size() * 1000) { notEmpty.signalAll(); } else { notEmpty.signal(); } }finally{ lock.unlock(); } } /**  * 消费线程  *   * @author rocca.peng@hunteron.com  * @Description   * @Date  2015年7月17日 下午6:00:46  */ protected final class ConsumerRunnable implements Runnable { /** The run name. */ private String runName; /** The runnable. */ private Runnable runnable; /** The isNow. 表示是否继续执行任务, false表示不执行任务,true执行 */ private boolean isNow = true; /** The isActive, 标识线程是否运行, false停止运行, true运行. */ private boolean isActive = true; /**  * Instantiates a new drives runnable.  */ private ConsumerRunnable() { } /**  * Instantiates a new drives runnable.  *  * @param runnable  *            the runnable  */ private ConsumerRunnable(Runnable runnable) { super(); this.runnable = runnable; } /**  * Instantiates a new drives runnable.  *  * @param runName  *            the run name  * @param runnable  *            the runnable  */ private ConsumerRunnable(String runName, Runnable runnable) { super(); this.runnable = runnable; this.runName = runName; } /** 将注册进来的runnable执行#run()方法. */ public void run() { while (this.isActive) { lock.lock(); try { if (ConsumConcurrentHashMap.this.isEmpty()) { notEmpty.await(); } } catch (Exception e) { logger.error("等待异常", e); } finally { lock.unlock(); } try { if (isNow) { runnable.run(); } } catch (Exception e) { logger.error("", e); } } } public boolean isNow() { return isNow; } public void setNow(boolean isNow) { this.isNow = isNow; } /**  * Gets the Active.  *   * @return  */ public boolean isActive() { return isActive; } /**  * Sets the Active.  *   * @param isActive  */ public void setActive(boolean isActive) { this.isActive = isActive; } /**  * Gets the run name.  *  * @return the run name  */ public String getRunName() { return runName; } /**  * Sets the run name.  *  * @param runName  *            the new run name  */ public void setRunName(String runName) { this.runName = runName; } /**  * Gets the runnable.  *  * @return the runnable  */ public Runnable getRunnable() { return runnable; } /**  * Sets the runnable.  *  * @param runnable  *            the new runnable  */ public void setRunnable(Runnable runnable) { this.runnable = runnable; } } /**  * 停止所有任务,正在执行的任务会继续执行,关闭线程池  */ public void shutdown() { // 将所有线程标记结束 for (ConsumerRunnable run : runnables) { run.setActive(false); } // 如果存在休眠的线程,就唤醒所有线程,执行完毕,退出run方法 lock.lock(); try{ notEmpty.signalAll(); }finally{ lock.unlock(); } // 关闭线程执行器 executor.shutdown(); } /**  * 停止所有任务,尝试停止正在执行的线程,关闭线程池  */ public List
 shutdownNow() { // 将所有线程标记结束 for (ConsumerRunnable run : runnables) { run.setActive(false); run.setNow(false); // 在结束时是否执行任务 } // 如果存在休眠的线程,就唤醒所有线程,执行完毕,退出run方法 lock.lock(); try{ notEmpty.signalAll(); }finally{ lock.unlock(); } // 关闭线程执行器 return executor.shutdownNow(); }}

2. 测试

import java.util.Map.Entry;import java.util.Random;import java.util.UUID;import org.apache.commons.lang3.RandomStringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Test {	private static final Logger logger = LoggerFactory.getLogger(Test.class);	private static final ConsumConcurrentHashMap
 map = new ConsumConcurrentHashMap<>(); public static void main(String[] args) { Runnable runnable = new Runnable() { Random r = new Random();  @Override public void run() { long s = System.currentTimeMillis(); long e = 0l; while((e = System.currentTimeMillis()) - s < 10 * 60 * 1000) { map.put(UUID.randomUUID().toString(), RandomStringUtils.randomAlphanumeric(5));// try {// long nextLong = r.nextInt(10);// Thread.sleep(nextLong);// } catch (InterruptedException es) {// es.printStackTrace();// } } } }; Thread thread = new Thread(runnable); thread.start(); map.regsiterRunnable(new Runnable() { private int i = 0; @Override public void run() { i++; for (Entry
 entry : map.entrySet()) { logger.info(i + "\t" + entry.getKey() + "\t" + entry.getValue() + "\t" + map.size()); map.remove(entry.getKey()); } } }); map.regsiterRunnable(new Runnable() { private int i = 0; @Override public void run() { i++; for (Entry
 entry : map.entrySet()) { logger.info(i + "\t" + entry.getKey() + "\t" + entry.getValue() + "\t" + map.size()); map.remove(entry.getKey()); } } }); try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } map.shutdown(); } }

转载于:https://my.oschina.net/mingpeng/blog/480852

你可能感兴趣的文章
.net Mvc文件下载的功能,大文件下载完成之后修改数据库功能
查看>>
Android -- 保存文件
查看>>
采用Asp.Net的Forms身份验证时,非持久Cookie的过期时间会自动扩展
查看>>
OLA音频变速算法的仿真与剖析
查看>>
java环境变量配置
查看>>
素数筛法--SPOJ Problem 2 Prime Generator
查看>>
mysql知识初篇(一)
查看>>
QNX环境
查看>>
[Linux 命令]df -h
查看>>
WordPress彩色背景标签云实现
查看>>
Json.net 常用使用小结
查看>>
网页端压缩解压缩插件JSZIP库的使用
查看>>
php和java的一些比较
查看>>
html5的自定义data-*属性和jquery的data()方法的使用示例
查看>>
GET和POST的区别
查看>>
【Scala】Scala技术栈
查看>>
C语言中二维字符数组的定义和初始化
查看>>
源程序出现各种奇怪的符号P
查看>>
【svn】 linux svn 强制提交注释
查看>>
P6 EPPM R16.1安装与配置指南(一)
查看>>