一、前言
多线程是JAVA开发中的基础知识,也是需要掌握的基技能,实现多线程的方式有很多种,使用的场景也很多。比如需要异步处理事务等。以下介绍一种多线程使用例子
1、比如当代码运行到某一逻辑时,我们进行多线程处理
new CommonThread("registUser", "registUser") {
@Override
public void run() {
try {
EnumQueueType.TOPIC_USER_REGIST_EVENT_TRIGGER_NEW.sendMessage(userModel, userModel.getPhone());
} catch (Exception e) {
logger.error("注册会员推送消息队列异常:", e);
}
}
}.start();
new CommonThread("registUser", "registUser") {@Overridepublic void run() {try {EnumQueueType.TOPIC_USER_REGIST_EVENT_TRIGGER_NEW.sendMessage(userModel, userModel.getPhone());} catch (Exception e) {logger.error("注册会员推送消息队列异常:", e);}}}.start();
这里我们封装了CommonThread,这个类继承了Thread 并且实现了 Callable接口来实现
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import sunbox.core.dao.app.ConfigDao;
import sunbox.core.enumer.CommonExceptionType;
import sunbox.core.enumer.IExceptionType;
import sunbox.core.util.SimpleFunc;import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;public class CommonThread extends Thread implements Callable<CommonThread> {private static final Log logger = LogFactory.getLog(CommonThread.class);private static final Map<String, ExecutorService> threadPool = new ConcurrentHashMap<String, ExecutorService>();private static final AtomicInteger threadId = new AtomicInteger();private String poolName;ThreadContext context;ThreadContext currentContext;private ExecutorService pool;private String resultKey;protected Object callReuslt;protected static final int SECOND = 1000;public CommonThread() {this("sunbox-thread-" + threadId.incrementAndGet());}public CommonThread(String name) {this(name, null);}private static ExecutorService exec = Executors.newCachedThreadPool();public CommonThread(String name, String poolName) {super(name);super.setDaemon(true);if (poolName == null)poolName = "base";this.poolName = poolName;pool = threadPool.get(poolName);if (pool == null) {synchronized (threadPool) {pool = threadPool.get(poolName);if (pool == null) {Integer poolSize = ConfigDao.getAppConfig().getConfigInteger("core.thread." + poolName + ".size", 100);// 改为使用可自动回收的线程池final String pName=poolName;pool = new ThreadPoolExecutor(poolSize, poolSize, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),new ThreadFactory() {int index = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("sunbox:%s-%04d",pName , (++index)));}});((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);threadPool.put(poolName, pool);}}}this.context = ThreadContext.getThreadContext();}public CommonThread resultMapping(String resultKey) {this.resultKey = resultKey;return this;}@Overridepublic void start() {if (resultKey != null)context.addResult(resultKey, pool.submit((Callable) this));else {pool.execute(this);}}public static ExecutorService getExec(){return exec;}public static Object executeWithTimeOut(int timeOutSeconds, Supplier<?> func){try {return getExec().submit(new Callable<Object>() {public Object call(){return func.get();}}).get(timeOutSeconds, TimeUnit.SECONDS);} catch (InterruptedException|ExecutionException|TimeoutException e) {throw CommonExceptionType.SimpleException.throwEx(e,"time out execute error.");}}@Overridepublic CommonThread call() throws Exception {this.currentContext = ThreadContext.getThreadContext();this.currentContext.setParent(context);run();return this;}public static void startWhileThread(Consumer<Void> run) {startWhileThread(run,100);}public static void startWhileThread(Consumer<Void> run,long interval) {new CommonThread() {public void run() {while (true) {try {run.accept(null);} catch (Exception e) {// TODO Auto-generated catch block} finally {sleep(interval);}}}}.start();}public static void startWhileThread(int sleepTime, SimpleFunc run) {startWhileThread(sleepTime, run, null, null);}public static void startWhileThread(int sleepTime, SimpleFunc run, Consumer<RuntimeException> catchConsumer, SimpleFunc outFinal) {new CommonThread() {public void run() {try {while (true) {try {try {run.func();} catch (IExceptionType.CommonException e) {if (CommonExceptionType.Continue == e.getExceptionType()) {continue;}if (CommonExceptionType.Break == e.getExceptionType()) {break;} else {throw e;}};} catch (RuntimeException e) {if (catchConsumer != null)catchConsumer.accept(e);} finally {sleep(sleepTime);}}} catch (Exception e) {} finally {if (outFinal != null)outFinal.func();}}}.start();}public static String printStackTrace() {return printStackTrace(Thread.currentThread().getStackTrace());}public static String printStackTrace(Exception e) {return printStackTrace(e.getStackTrace());}private static String printStackTrace(StackTraceElement[] stackTrace) {StringBuffer buf = new StringBuffer();for (StackTraceElement e : stackTrace) {buf.append(String.format("%s:%s:%s", e.getClassName(), e.getMethodName(), e.getLineNumber())).append("\n");}return buf.toString();}public static void sleep(int millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {logger.error(e, e);CommonExceptionType.CommonException.throwEx(e,"thread is interupted.");}}public static void sleep(int millis, Consumer<InterruptedException> consumer) {try {Thread.sleep(millis);} catch (InterruptedException e) {consumer.accept(e);}}public static void sleepSecond() {sleep(SECOND);}public static void sleep(long millis) {try {if (millis < 0)millis = 0;Thread.sleep(millis);} catch (InterruptedException e) {logger.error(e, e);}}public static void wait(Object obj) {try {synchronized (obj) {obj.wait();}} catch (InterruptedException e) {logger.error(e, e);}}public static void notify(Object obj) {synchronized (obj) {obj.notifyAll();}}public static <V> void cancelFuture(Future<V> future){if(future==null)return;try {future.cancel(true);}catch (Exception e){logger.error(e.getMessage(),e);}}public static <T, O> List<T> parallel(List<O> params, Consumer<O> func) {return parallel(params, func, true);}public static <T, O> List<T> parallel(List<O> params, Consumer<O> func, boolean waitForEnd) {if ((func == null) || (params == null) || (params.size() <= 0)) {return null;}String resultName = waitForEnd ? "ParallelWaitForEnd" : "Parallel";// 启动任务for (O param : params) {CommonThread work = new CommonThread(resultName) {public void run() {try {func.accept(param);} catch (Exception ex) {logger.error(ex.getMessage(), ex);}}}.resultMapping(resultName);work.start();}if (waitForEnd) {return ThreadContext.getThreadContext().getResults(resultName);} else {return null;}}public static <T> List<T> parallel(List<Runnable> functions) {return parallel(functions, true);}public static <T> List<T> parallel(List<Runnable> functions, boolean waitForEnd) {if ((functions == null) || (functions.size() <= 0)) {return null;}String resultName = waitForEnd ? "ParallelWaitForEnd" : "Parallel";// 启动任务for (Runnable func : functions) {CommonThread work = new CommonThread(resultName) {public void run() {try {func.run();} catch (Exception ex) {logger.error(ex.getMessage(), ex);}}}.resultMapping(resultName);work.start();}if (waitForEnd) {return ThreadContext.getThreadContext().getResults(resultName);} else {return null;}}public static void main(String[] args) {for (int i = 0; i < 1000; i++) {new CommonThread("线程池测试" + i, "test") {public void run() {Thread t = Thread.currentThread();System.out.println(String.format("thread id '%d',threa name '%s'", t.getId(), t.getName()));sleep(1000);}}.start();}while (true) {sleep(100);}}
}
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import sunbox.core.dao.app.ConfigDao;
import sunbox.core.enumer.CommonExceptionType;
import sunbox.core.enumer.IExceptionType;
import sunbox.core.util.SimpleFunc;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class CommonThread extends Thread implements Callable<CommonThread> {
private static final Log logger = LogFactory.getLog(CommonThread.class);
private static final Map<String, ExecutorService> threadPool = new ConcurrentHashMap<String, ExecutorService>();
private static final AtomicInteger threadId = new AtomicInteger();
private String poolName;
ThreadContext context;
ThreadContext currentContext;
private ExecutorService pool;
private String resultKey;
protected Object callReuslt;
protected static final int SECOND = 1000;
public CommonThread() {
this("sunbox-thread-" + threadId.incrementAndGet());
}
public CommonThread(String name) {
this(name, null);
}
private static ExecutorService exec = Executors.newCachedThreadPool();
public CommonThread(String name, String poolName) {
super(name);
super.setDaemon(true);
if (poolName == null)
poolName = "base";
this.poolName = poolName;
pool = threadPool.get(poolName);
if (pool == null) {
synchronized (threadPool) {
pool = threadPool.get(poolName);
if (pool == null) {
Integer poolSize = ConfigDao.getAppConfig().getConfigInteger("core.thread." + poolName + ".size", 100);
// 改为使用可自动回收的线程池
final String pName=poolName;
pool = new ThreadPoolExecutor(poolSize, poolSize, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),new ThreadFactory() {
int index = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("sunbox:%s-%04d",pName , (++index)));
}
});
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
threadPool.put(poolName, pool);
}
}
}
this.context = ThreadContext.getThreadContext();
}
public CommonThread resultMapping(String resultKey) {
this.resultKey = resultKey;
return this;
}
@Override
public void start() {
if (resultKey != null)
context.addResult(resultKey, pool.submit((Callable) this));
else {
pool.execute(this);
}
}
public static ExecutorService getExec(){
return exec;
}
public static Object executeWithTimeOut(int timeOutSeconds, Supplier<?> func){
try {
return getExec().submit(new Callable<Object>() {
public Object call(){
return func.get();
}
}).get(timeOutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException|ExecutionException|TimeoutException e) {
throw CommonExceptionType.SimpleException.throwEx(e,"time out execute error.");
}
}
@Override
public CommonThread call() throws Exception {
this.currentContext = ThreadContext.getThreadContext();
this.currentContext.setParent(context);
run();
return this;
}
public static void startWhileThread(Consumer<Void> run) {
startWhileThread(run,100);
}
public static void startWhileThread(Consumer<Void> run,long interval) {
new CommonThread() {
public void run() {
while (true) {
try {
run.accept(null);
} catch (Exception e) {
// TODO Auto-generated catch block
} finally {
sleep(interval);
}
}
}
}.start();
}
public static void startWhileThread(int sleepTime, SimpleFunc run) {
startWhileThread(sleepTime, run, null, null);
}
public static void startWhileThread(int sleepTime, SimpleFunc run, Consumer<RuntimeException> catchConsumer, SimpleFunc outFinal) {
new CommonThread() {
public void run() {
try {
while (true) {
try {
try {
run.func();
} catch (IExceptionType.CommonException e) {
if (CommonExceptionType.Continue == e.getExceptionType()) {
continue;
}
if (CommonExceptionType.Break == e.getExceptionType()) {
break;
} else {
throw e;
}
}
;
} catch (RuntimeException e) {
if (catchConsumer != null)
catchConsumer.accept(e);
} finally {
sleep(sleepTime);
}
}
} catch (Exception e) {
} finally {
if (outFinal != null)
outFinal.func();
}
}
}.start();
}
public static String printStackTrace() {
return printStackTrace(Thread.currentThread().getStackTrace());
}
public static String printStackTrace(Exception e) {
return printStackTrace(e.getStackTrace());
}
private static String printStackTrace(StackTraceElement[] stackTrace) {
StringBuffer buf = new StringBuffer();
for (StackTraceElement e : stackTrace) {
buf.append(String.format("%s:%s:%s", e.getClassName(), e.getMethodName(), e.getLineNumber())).append("\n");
}
return buf.toString();
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
logger.error(e, e);
CommonExceptionType.CommonException.throwEx(e,"thread is interupted.");
}
}
public static void sleep(int millis, Consumer<InterruptedException> consumer) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
consumer.accept(e);
}
}
public static void sleepSecond() {
sleep(SECOND);
}
public static void sleep(long millis) {
try {
if (millis < 0)
millis = 0;
Thread.sleep(millis);
} catch (InterruptedException e) {
logger.error(e, e);
}
}
public static void wait(Object obj) {
try {
synchronized (obj) {
obj.wait();
}
} catch (InterruptedException e) {
logger.error(e, e);
}
}
public static void notify(Object obj) {
synchronized (obj) {
obj.notifyAll();
}
}
public static <V> void cancelFuture(Future<V> future){
if(future==null)
return;
try {
future.cancel(true);
}catch (Exception e){
logger.error(e.getMessage(),e);
}
}
public static <T, O> List<T> parallel(List<O> params, Consumer<O> func) {
return parallel(params, func, true);
}
public static <T, O> List<T> parallel(List<O> params, Consumer<O> func, boolean waitForEnd) {
if ((func == null) || (params == null) || (params.size() <= 0)) {
return null;
}
String resultName = waitForEnd ? "ParallelWaitForEnd" : "Parallel";
// 启动任务
for (O param : params) {
CommonThread work = new CommonThread(resultName) {
public void run() {
try {
func.accept(param);
} catch (Exception ex) {
logger.error(ex.getMessage(), ex);
}
}
}.resultMapping(resultName);
work.start();
}
if (waitForEnd) {
return ThreadContext.getThreadContext().getResults(resultName);
} else {
return null;
}
}
public static <T> List<T> parallel(List<Runnable> functions) {
return parallel(functions, true);
}
public static <T> List<T> parallel(List<Runnable> functions, boolean waitForEnd) {
if ((functions == null) || (functions.size() <= 0)) {
return null;
}
String resultName = waitForEnd ? "ParallelWaitForEnd" : "Parallel";
// 启动任务
for (Runnable func : functions) {
CommonThread work = new CommonThread(resultName) {
public void run() {
try {
func.run();
} catch (Exception ex) {
logger.error(ex.getMessage(), ex);
}
}
}.resultMapping(resultName);
work.start();
}
if (waitForEnd) {
return ThreadContext.getThreadContext().getResults(resultName);
} else {
return null;
}
}
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new CommonThread("线程池测试" + i, "test") {
public void run() {
Thread t = Thread.currentThread();
System.out.println(String.format("thread id '%d',threa name '%s'", t.getId(), t.getName()));
sleep(1000);
}
}.start();
}
while (true) {
sleep(100);
}
}
}