当前位置: 首页> 娱乐> 八卦 > 微型企业网络设计方案_代理网关app未运行怎么办_石家庄seo管理_指定关键词排名优化

微型企业网络设计方案_代理网关app未运行怎么办_石家庄seo管理_指定关键词排名优化

时间:2025/7/11 1:23:18来源:https://blog.csdn.net/dongjing991/article/details/144979875 浏览次数:0次
微型企业网络设计方案_代理网关app未运行怎么办_石家庄seo管理_指定关键词排名优化

一、前言

       多线程是JAVA开发中的基础知识,也是需要掌握的基技能,实现多线程的方式有很多种,使用的场景也很多。比如需要异步处理事务等。以下介绍一种多线程使用例子

1、比如当代码运行到某一逻辑时,我们进行多线程处理

new CommonThread("registUser", "registUser") {
            @Override
            public void run() {
                try {
                    EnumQueueType.TOPIC_USER_REGIST_EVENT_TRIGGER_NEW.sendMessage(userModel, userModel.getPhone());

                } catch (Exception e) {
                    logger.error("注册会员推送消息队列异常:", e);
                }
            }
        }.start();

new CommonThread("registUser", "registUser") {@Overridepublic void run() {try {EnumQueueType.TOPIC_USER_REGIST_EVENT_TRIGGER_NEW.sendMessage(userModel, userModel.getPhone());} catch (Exception e) {logger.error("注册会员推送消息队列异常:", e);}}}.start();

这里我们封装了CommonThread,这个类继承了Thread 并且实现了 Callable接口来实现

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import sunbox.core.dao.app.ConfigDao;
import sunbox.core.enumer.CommonExceptionType;
import sunbox.core.enumer.IExceptionType;
import sunbox.core.util.SimpleFunc;import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;public class CommonThread extends Thread implements Callable<CommonThread> {private static final Log logger = LogFactory.getLog(CommonThread.class);private static final Map<String, ExecutorService> threadPool = new ConcurrentHashMap<String, ExecutorService>();private static final AtomicInteger threadId = new AtomicInteger();private String poolName;ThreadContext context;ThreadContext currentContext;private ExecutorService pool;private String resultKey;protected Object callReuslt;protected static final int SECOND = 1000;public CommonThread() {this("sunbox-thread-" + threadId.incrementAndGet());}public CommonThread(String name) {this(name, null);}private static ExecutorService exec = Executors.newCachedThreadPool();public CommonThread(String name, String poolName) {super(name);super.setDaemon(true);if (poolName == null)poolName = "base";this.poolName = poolName;pool = threadPool.get(poolName);if (pool == null) {synchronized (threadPool) {pool = threadPool.get(poolName);if (pool == null) {Integer poolSize = ConfigDao.getAppConfig().getConfigInteger("core.thread." + poolName + ".size", 100);// 改为使用可自动回收的线程池final String pName=poolName;pool = new ThreadPoolExecutor(poolSize, poolSize, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),new ThreadFactory() {int index = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("sunbox:%s-%04d",pName , (++index)));}});((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);threadPool.put(poolName, pool);}}}this.context = ThreadContext.getThreadContext();}public CommonThread resultMapping(String resultKey) {this.resultKey = resultKey;return this;}@Overridepublic void start() {if (resultKey != null)context.addResult(resultKey, pool.submit((Callable) this));else {pool.execute(this);}}public static ExecutorService getExec(){return exec;}public static Object executeWithTimeOut(int timeOutSeconds, Supplier<?> func){try {return getExec().submit(new Callable<Object>() {public Object call(){return func.get();}}).get(timeOutSeconds, TimeUnit.SECONDS);} catch (InterruptedException|ExecutionException|TimeoutException e) {throw CommonExceptionType.SimpleException.throwEx(e,"time out execute error.");}}@Overridepublic CommonThread call() throws Exception {this.currentContext = ThreadContext.getThreadContext();this.currentContext.setParent(context);run();return this;}public static void startWhileThread(Consumer<Void> run) {startWhileThread(run,100);}public static void startWhileThread(Consumer<Void> run,long interval) {new CommonThread() {public void run() {while (true) {try {run.accept(null);} catch (Exception e) {// TODO Auto-generated catch block} finally {sleep(interval);}}}}.start();}public static void startWhileThread(int sleepTime, SimpleFunc run) {startWhileThread(sleepTime, run, null, null);}public static void startWhileThread(int sleepTime, SimpleFunc run, Consumer<RuntimeException> catchConsumer, SimpleFunc outFinal) {new CommonThread() {public void run() {try {while (true) {try {try {run.func();} catch (IExceptionType.CommonException e) {if (CommonExceptionType.Continue == e.getExceptionType()) {continue;}if (CommonExceptionType.Break == e.getExceptionType()) {break;} else {throw e;}};} catch (RuntimeException e) {if (catchConsumer != null)catchConsumer.accept(e);} finally {sleep(sleepTime);}}} catch (Exception e) {} finally {if (outFinal != null)outFinal.func();}}}.start();}public static String printStackTrace() {return printStackTrace(Thread.currentThread().getStackTrace());}public static String printStackTrace(Exception e) {return printStackTrace(e.getStackTrace());}private static String printStackTrace(StackTraceElement[] stackTrace) {StringBuffer buf = new StringBuffer();for (StackTraceElement e : stackTrace) {buf.append(String.format("%s:%s:%s", e.getClassName(), e.getMethodName(), e.getLineNumber())).append("\n");}return buf.toString();}public static void sleep(int millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {logger.error(e, e);CommonExceptionType.CommonException.throwEx(e,"thread is interupted.");}}public static void sleep(int millis, Consumer<InterruptedException> consumer) {try {Thread.sleep(millis);} catch (InterruptedException e) {consumer.accept(e);}}public static void sleepSecond() {sleep(SECOND);}public static void sleep(long millis) {try {if (millis < 0)millis = 0;Thread.sleep(millis);} catch (InterruptedException e) {logger.error(e, e);}}public static void wait(Object obj) {try {synchronized (obj) {obj.wait();}} catch (InterruptedException e) {logger.error(e, e);}}public static void notify(Object obj) {synchronized (obj) {obj.notifyAll();}}public static <V> void cancelFuture(Future<V> future){if(future==null)return;try {future.cancel(true);}catch (Exception e){logger.error(e.getMessage(),e);}}public static <T, O> List<T> parallel(List<O> params, Consumer<O> func) {return parallel(params, func, true);}public static <T, O> List<T> parallel(List<O> params, Consumer<O> func, boolean waitForEnd) {if ((func == null) || (params == null) || (params.size() <= 0)) {return null;}String resultName = waitForEnd ? "ParallelWaitForEnd" : "Parallel";// 启动任务for (O param : params) {CommonThread work = new CommonThread(resultName) {public void run() {try {func.accept(param);} catch (Exception ex) {logger.error(ex.getMessage(), ex);}}}.resultMapping(resultName);work.start();}if (waitForEnd) {return ThreadContext.getThreadContext().getResults(resultName);} else {return null;}}public static <T> List<T> parallel(List<Runnable> functions) {return parallel(functions, true);}public static <T> List<T> parallel(List<Runnable> functions, boolean waitForEnd) {if ((functions == null) || (functions.size() <= 0)) {return null;}String resultName = waitForEnd ? "ParallelWaitForEnd" : "Parallel";// 启动任务for (Runnable func : functions) {CommonThread work = new CommonThread(resultName) {public void run() {try {func.run();} catch (Exception ex) {logger.error(ex.getMessage(), ex);}}}.resultMapping(resultName);work.start();}if (waitForEnd) {return ThreadContext.getThreadContext().getResults(resultName);} else {return null;}}public static void main(String[] args) {for (int i = 0; i < 1000; i++) {new CommonThread("线程池测试" + i, "test") {public void run() {Thread t = Thread.currentThread();System.out.println(String.format("thread id '%d',threa name '%s'", t.getId(), t.getName()));sleep(1000);}}.start();}while (true) {sleep(100);}}
}

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import sunbox.core.dao.app.ConfigDao;
import sunbox.core.enumer.CommonExceptionType;
import sunbox.core.enumer.IExceptionType;
import sunbox.core.util.SimpleFunc;

import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class CommonThread extends Thread implements Callable<CommonThread> {
    private static final Log logger = LogFactory.getLog(CommonThread.class);
    private static final Map<String, ExecutorService> threadPool = new ConcurrentHashMap<String, ExecutorService>();
    private static final AtomicInteger threadId = new AtomicInteger();

    private String poolName;

    ThreadContext context;
    ThreadContext currentContext;
    private ExecutorService pool;
    private String resultKey;
    protected Object callReuslt;

    protected static final int SECOND = 1000;

    public CommonThread() {
        this("sunbox-thread-" + threadId.incrementAndGet());
    }

    public CommonThread(String name) {
        this(name, null);
    }

    private static ExecutorService exec = Executors.newCachedThreadPool();

    public CommonThread(String name, String poolName) {
        super(name);
        super.setDaemon(true);
        if (poolName == null)
            poolName = "base";
        this.poolName = poolName;
        pool = threadPool.get(poolName);
        if (pool == null) {
            synchronized (threadPool) {
                pool = threadPool.get(poolName);

                if (pool == null) {
                    Integer poolSize = ConfigDao.getAppConfig().getConfigInteger("core.thread." + poolName + ".size", 100);
                    // 改为使用可自动回收的线程池
                    final String pName=poolName;
                    pool = new ThreadPoolExecutor(poolSize, poolSize, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),new ThreadFactory() {
                        int index = 0;
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, String.format("sunbox:%s-%04d",pName , (++index)));
                        }
                    });
                    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
                    threadPool.put(poolName, pool);
                }
            }
        }
        this.context = ThreadContext.getThreadContext();
    }

    public CommonThread resultMapping(String resultKey) {
        this.resultKey = resultKey;
        return this;
    }

    @Override
    public void start() {
        if (resultKey != null)
            context.addResult(resultKey, pool.submit((Callable) this));
        else {
            pool.execute(this);
        }
    }

    public static ExecutorService getExec(){
        return exec;
    }

    public static Object executeWithTimeOut(int timeOutSeconds, Supplier<?> func){
        try {
            return getExec().submit(new Callable<Object>() {
                public Object call(){
                    return func.get();
                }
            }).get(timeOutSeconds, TimeUnit.SECONDS);
        } catch (InterruptedException|ExecutionException|TimeoutException e) {
            throw CommonExceptionType.SimpleException.throwEx(e,"time out execute error.");
        }
    }
    @Override
    public CommonThread call() throws Exception {
        this.currentContext = ThreadContext.getThreadContext();
        this.currentContext.setParent(context);
        run();
        return this;
    }

    public static void startWhileThread(Consumer<Void> run) {
        startWhileThread(run,100);
    }
    public static void startWhileThread(Consumer<Void> run,long interval) {
        new CommonThread() {
            public void run() {
                while (true) {
                    try {
                        run.accept(null);
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                    } finally {
                        sleep(interval);
                    }
                }
            }
        }.start();
    }

    public static void startWhileThread(int sleepTime, SimpleFunc run) {
        startWhileThread(sleepTime, run, null, null);
    }

    public static void startWhileThread(int sleepTime, SimpleFunc run, Consumer<RuntimeException> catchConsumer, SimpleFunc outFinal) {
        new CommonThread() {
            public void run() {
                try {
                    while (true) {
                        try {
                            try {
                                run.func();
                            } catch (IExceptionType.CommonException e) {
                                if (CommonExceptionType.Continue == e.getExceptionType()) {
                                    continue;
                                }
                                if (CommonExceptionType.Break == e.getExceptionType()) {
                                    break;
                                } else {
                                    throw e;
                                }
                            }
                            ;
                        } catch (RuntimeException e) {
                            if (catchConsumer != null)
                                catchConsumer.accept(e);
                        } finally {
                            sleep(sleepTime);
                        }
                    }
                } catch (Exception e) {

                } finally {
                    if (outFinal != null)
                        outFinal.func();
                }
            }
        }.start();
    }

    public static String printStackTrace() {
        return printStackTrace(Thread.currentThread().getStackTrace());
    }

    public static String printStackTrace(Exception e) {
        return printStackTrace(e.getStackTrace());
    }

    private static String printStackTrace(StackTraceElement[] stackTrace) {
        StringBuffer buf = new StringBuffer();
        for (StackTraceElement e : stackTrace) {
            buf.append(String.format("%s:%s:%s", e.getClassName(), e.getMethodName(), e.getLineNumber())).append("\n");
        }
        return buf.toString();
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            logger.error(e, e);
            CommonExceptionType.CommonException.throwEx(e,"thread is interupted.");
        }
    }

    public static void sleep(int millis, Consumer<InterruptedException> consumer) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            consumer.accept(e);
        }
    }

    public static void sleepSecond() {
        sleep(SECOND);
    }

    public static void sleep(long millis) {
        try {
            if (millis < 0)
                millis = 0;
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            logger.error(e, e);
        }
    }

    public static void wait(Object obj) {
        try {
            synchronized (obj) {
                obj.wait();
            }
        } catch (InterruptedException e) {
            logger.error(e, e);
        }
    }

    public static void notify(Object obj) {
        synchronized (obj) {
            obj.notifyAll();
        }
    }

    public static <V> void cancelFuture(Future<V> future){
        if(future==null)
            return;
        try {
            future.cancel(true);
        }catch (Exception e){
            logger.error(e.getMessage(),e);
        }
    }

    public static <T, O> List<T> parallel(List<O> params, Consumer<O> func) {
        return parallel(params, func, true);
    }

    public static <T, O> List<T> parallel(List<O> params, Consumer<O> func, boolean waitForEnd) {
        if ((func == null) || (params == null) || (params.size() <= 0)) {
            return null;
        }
        String resultName = waitForEnd ? "ParallelWaitForEnd" : "Parallel";
        // 启动任务
        for (O param : params) {
            CommonThread work = new CommonThread(resultName) {
                public void run() {
                    try {
                        func.accept(param);
                    } catch (Exception ex) {
                        logger.error(ex.getMessage(), ex);
                    }
                }
            }.resultMapping(resultName);
            work.start();
        }
        if (waitForEnd) {
            return ThreadContext.getThreadContext().getResults(resultName);
        } else {
            return null;
        }
    }

    public static <T> List<T> parallel(List<Runnable> functions) {
        return parallel(functions, true);
    }

    public static <T> List<T> parallel(List<Runnable> functions, boolean waitForEnd) {
        if ((functions == null) || (functions.size() <= 0)) {
            return null;
        }
        String resultName = waitForEnd ? "ParallelWaitForEnd" : "Parallel";
        // 启动任务
        for (Runnable func : functions) {
            CommonThread work = new CommonThread(resultName) {
                public void run() {
                    try {
                        func.run();
                    } catch (Exception ex) {
                        logger.error(ex.getMessage(), ex);
                    }
                }
            }.resultMapping(resultName);
            work.start();
        }
        if (waitForEnd) {
            return ThreadContext.getThreadContext().getResults(resultName);
        } else {
            return null;
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            new CommonThread("线程池测试" + i, "test") {
                public void run() {
                    Thread t = Thread.currentThread();
                    System.out.println(String.format("thread id '%d',threa name '%s'", t.getId(), t.getName()));
                    sleep(1000);
                }
            }.start();
        }

        while (true) {
            sleep(100);
        }
    }
}
 

关键字:微型企业网络设计方案_代理网关app未运行怎么办_石家庄seo管理_指定关键词排名优化

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: