业务中经常会存在一些耗时较长的任务,比如导入大量数据,批量数据修改,批量...等,这种可能持续了几分钟的任务,会导致http连接超时,返回错误信息。
解决问题的方法也很简单,执行操作的时候直接返回,然任务后台执行就好了,但是这样用户就没法知道任务具体有没有执行成功之类的了,操作回馈不够及时,当前也可以把当前任务的执行进度放到redis中,通过另外一个接口来轮询进度。
每个类似的业务逻辑都写上这样一套逻辑显然是有很多冗余的,所以我就自己写了异步任务工具类,用于处理这种类似的情况。
先说下大体的架构
* AsyncTaskManager : 用于创建异步任务,和对异步任务进行具体的操作。
* AsyncTask<T>: 异步任务类,继承自Runnable方法,用于执行具体的异步任务,并且准备上下文相关数据
* AsyncTaskController:用于给用户展示异步任务进度
示例代码
```java
@Slf4j
public class AsyncTaskManager {
private static final ThreadLocal<AsyncTask<?>> CURRENT_TASK = new ThreadLocal<>();
/**
* 异步操作任务调度线程池
*/
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(10);
private static <T> String createAsyncTask(String name, Callable<T> callable) {
return createAsyncTask(name, true, callable);
}
/**
* 创建一个异步任务
*
* @param name 任务名称
* @param extendsSession 是否继承会话信息
* @param callable 任务执行方法,
* @param <T> 任务返回值,可以为空
* @return 任务id, 后续用户这个id获取任务相关信息
*/
private static <T> String createAsyncTask(String name, boolean extendsSession, Callable<T> callable) {
CommonAssert.isNotNullOrEmpty(name, "任务名称不能为空");
if (CURRENT_TASK.get() != null) {
throw new BusinessException("无法在异步任务中创建嵌套的异步任务,请换用AsyncManager");
}
String id = UUID.fastUUID().toString();
AsyncTask<T> task = new AsyncTask<>(id, name, extendsSession, callable);
EXECUTOR.execute(task);
return id;
}
/**
* 更新任务进度
*
* @param current 当前进度
* @param total 总任务
*/
public static void updateProcess(Double current, Double total) {
AsyncTask<?> task = CURRENT_TASK.get();
if (task != null) {
task.updateProcess(current, total);
} else {
log.warn("当前不在异步任务中,无法调用此方法,at:" + getCallerInfo());
}
}
/**
* 记录日志到前端显示
*
* @param message 消息模板
* @param args 模板参数
*/
public static void log(String message, Object... args) {
AsyncTask<?> task = CURRENT_TASK.get();
if (task != null) {
task.log(message, args);
} else {
log.warn("当前不在异步任务中,无法调用此方法,at:" + getCallerInfo());
}
}
public static void message(String message, Object... args) {
AsyncTask<?> task = CURRENT_TASK.get();
if (task != null) {
task.setMessage(message, args);
} else {
log.warn("当前不在异步任务中,无法调用此方法,at:" + getCallerInfo());
}
}
/**
* 注册当前异步任务
*/
public static void registerTaskInfo(AsyncTask<?> task) {
if (CURRENT_TASK.get() != null) {
throw new BusinessException("当前异步任务已存在");
}
CURRENT_TASK.set(task);
}
/**
* 清除当前异步任务
*/
public static void clearTaskInfo() {
CURRENT_TASK.remove();
}
/**
* 当前方法调用者相关信息
*/
private static String getCallerInfo() {
StackTraceElement trace = new Throwable().getStackTrace()[2];
return trace.getClassName() + "." + trace.getMethodName() + ":" + trace.getLineNumber();
}
```
```java
@Slf4j
public class AsyncTask<T> implements Runnable {
private static final String PROCESS_STATUS_WAITING = "WAITING";
private static final String PROCESS_STATUS_RUNNING = "RUNNING";
private static final String PROCESS_STATUS_SUCCESS = "SUCCESS";
private static final String PROCESS_STATUS_ERROR = "ERROR";
private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
private final AsyncProcessInfo<T> asyncProcessInfo;
private final String redisKey;
private final String threadName;
private final Callable<T> callable;
private SecurityManager securityManager = null;
private Subject subject = null;
public AsyncTask(String id, String name, boolean extendsSession, Callable<T> callable) {
log.info("开始创建异步任务,id:{},name:{},extendsSession:{}", id, name, extendsSession);
this.asyncProcessInfo = new AsyncProcessInfo<T>();
this.asyncProcessInfo.setCreateTime(new Date());
this.asyncProcessInfo.setId(id);
this.asyncProcessInfo.setName(name);
this.asyncProcessInfo.setStatus(PROCESS_STATUS_WAITING);
this.threadName = "async-task-|" + Thread.currentThread().getName() + "|-" + name + "-" + IdUtils.fastUUID().substring(0, 3);
this.redisKey = CacheConstants.CacheKeys.SYSTEM_ASYNC_TASK_INFO + CacheConstants.CACHE_KEY_SEPARATOR + id;
this.callable = callable;
if (extendsSession) {
try {
//处理shiro相关的问题
securityManager = SecurityUtils.getSecurityManager();
subject = SecurityUtils.getSubject();
} catch (Exception e) {
throw new RuntimeException("当前线程不存在有效的会话", e);
}
}
this.writeToRedis();
}
public AsyncProcessInfo<T> getAsyncProcessInfo() {
return asyncProcessInfo;
}
private void writeErrorTosRedis(String errorInfo) {
log.error("异步任务执行异常,id:{},name:{}", asyncProcessInfo.getId(), asyncProcessInfo.getName());
asyncProcessInfo.setStatus(PROCESS_STATUS_ERROR);
asyncProcessInfo.setMessage(StringUtils.isEmpty(errorInfo) ? "异步任务执行异常" : errorInfo);
redisCache.setCacheObject(redisKey, asyncProcessInfo, 10, TimeUnit.MINUTES);
}
private void writeToRedis() {
log.info("开始更新异步任务状态:id:{},name:{}", asyncProcessInfo.getId(), asyncProcessInfo.getName());
this.checkInterrupted();
redisCache.setCacheObject(redisKey, asyncProcessInfo);
}
/**
* 检查当前线程是否收到了终止命令
*/
private void checkInterrupted() {
if (Thread.interrupted()) {
writeErrorTosRedis("异步任务被终止");
throw new RuntimeException("异步任务被终止");
}
}
/**
* 更新任务进度
*
* @param current 当前进度
* @param total 总任务
*/
public void updateProcess(Double current, Double total) {
this.checkInterrupted();
asyncProcessInfo.setTotal(total);
asyncProcessInfo.setCurrent(current);
writeToRedis();
}
/**
* 记录日志到前端显示
*
* @param message 消息模板
* @param args 模板参数
*/
public void log(String message, Object... args) {
this.checkInterrupted();
if (asyncProcessInfo.getLogs() == null) {
asyncProcessInfo.setLogs(new ArrayList<>());
}
asyncProcessInfo.getLogs().add(StringUtils.format(message, args));
writeToRedis();
}
/**
* 设置提示消息
*/
public void setMessage(String message, Object... args) {
this.checkInterrupted();
asyncProcessInfo.setMessage(StringUtils.format(message, args));
writeToRedis();
}
@Override
public void run() {
try {
log.info("开始执行异步任务,id:{},name:{}", asyncProcessInfo.getId(), asyncProcessInfo.getName());
AsyncTaskManager.registerTaskInfo(this);
Thread.currentThread().setName(threadName);
asyncProcessInfo.setExecuteTime(new Date());
asyncProcessInfo.setStatus(PROCESS_STATUS_RUNNING);
writeToRedis();
//继承会话设置
if (securityManager != null && subject != null) {
ThreadContext.bind(securityManager);
ThreadContext.bind(subject);
} else {
//不继承会话就关闭数据权限
DataScopeManager.disableDataScope();
}
T result = this.callable.call();
log.info("异步任务执行结束,id:{},name:{}", asyncProcessInfo.getId(), asyncProcessInfo.getName());
asyncProcessInfo.setResult(result);
asyncProcessInfo.setStatus(PROCESS_STATUS_SUCCESS);
redisCache.setCacheObject(redisKey, asyncProcessInfo, 10, TimeUnit.MINUTES);
} catch (Exception e) {
writeErrorTosRedis(e.getMessage());
} finally {
//清理数据
ThreadContext.remove();
ActionLogUtil.clean();
PageHelper.clearPage();
DataScopeManager.clearDataScope();
AsyncTaskManager.clearTaskInfo();
}
}
}
```
```java
public class AsyncProcessInfo<T> {
/**
* 任务的uuid
*/
private String id;
/**
* 任务名称
*/
private String name;
/**
* 任务创建时间
*/
private Date createTime;
/**
* 任务执行时间
*/
private Date executeTime;
/**
* 导出任务状态
* WAITING
* RUNNING
* SUCCESS
* ERROR
*/
private String status;
/**
* 任务总进度
*/
private Double total;
/**
* 任务当前执行进度
*/
private Double current;
/**
* 任务执行结果
*/
private T result;
/**
* 执行结果是个文件的情况下下载文件(暂时没用到)
*/
private Boolean downloadMode = false;
/**
* 任务执行日志
*/
private List<String> logs;
/**
* 任务消息
*/
private String message;
}
```
> 其中有个shiro相关的代码,不需要可以忽略
当在需要异步任务的时候,直接调用 AsyncTaskManager.createAsyncTask 返回一个id给前端,前端再根据id去轮询任务进度。前端的这部分,也可以直接封装成一个组件
统一的异步任务处理