spring异步线程池源码解读
- 基本使用
- 源码解读
- 开启后怎么生效的
- @EnableAsync
- AsyncConfigurationSelector
- ProxyAsyncConfiguration
- AsyncAnnotationBeanPostProcessor
- 请求进入,创建代理
- 怎么异步执行的
- AsyncExecutionInterceptor
- AsyncExecutionAspectSupport
- 其他
- 总结
基本使用
配置异步线程池,开始异步线程
// 开始异步线程池使用
@EnableAsync
@Configuration
public class AsyncThreadConfig {@Resourceprivate AsyncThreadConfigProperties properties;// 配置异步线程池@Bean("async-1")@Lazypublic ThreadPoolTaskExecutor threadPoolAsync1(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(properties.getCorePoolSize1());executor.setMaxPoolSize(properties.getMaxPoolSize1());executor.setKeepAliveSeconds(properties.getKeepAliveSeconds1());executor.setQueueCapacity(properties.getQueueCapacity1());executor.setThreadNamePrefix(properties.getThreadNamePrefix1());executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(properties.getAwaitTerminationSeconds1());executor.initialize();return executor;}
}
使用
@Service
public class AsyncService {private static Logger logger= LoggerFactory.getLogger(AsyncService.class);@Async("async-1")public void printTime(){SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");logger.info("调用时间 {}",sdf.format(new Date()));}
}
源码解读
开启后怎么生效的
@EnableAsync
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 可以看到这里导入了一个 AsyncConfigurationSelector
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
...
}
AsyncConfigurationSelector
// 会根据@EnableAsync中的mode字段导入不同的配置类,
// 这里默认为PROXY,即使用ProxyAsyncConfiguration配置类
public String[] selectImports(AdviceMode adviceMode) {switch (adviceMode) {case PROXY:return new String[] {ProxyAsyncConfiguration.class.getName()};case ASPECTJ:return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};default:return null;}
}
ProxyAsyncConfiguration
// 这里创建了一个 AsyncAnnotationBeanPostProcessor实例加入spring容器
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();bpp.configure(this.executor, this.exceptionHandler);Class extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {bpp.setAsyncAnnotationType(customAsyncAnnotation);}bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));bpp.setOrder(this.enableAsync.getNumber("order"));return bpp;
}
AsyncAnnotationBeanPostProcessor
// 这里创建了一个AsyncAnnotationAdvisor类,最终会构造出一个AnnotationAsyncExecutionInterceptor
public void setBeanFactory(BeanFactory beanFactory) {super.setBeanFactory(beanFactory);AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);if (this.asyncAnnotationType != null) {advisor.setAsyncAnnotationType(this.asyncAnnotationType);}advisor.setBeanFactory(beanFactory);this.advisor = advisor;
}// AsyncAnnotationAdvisor 构造方法
public AsyncAnnotationAdvisor(@Nullable Supplier executor, @Nullable Supplier exceptionHandler) {Set> asyncAnnotationTypes = new LinkedHashSet<>(2);asyncAnnotationTypes.add(Async.class);try {asyncAnnotationTypes.add((Class extends Annotation>)ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));}catch (ClassNotFoundException ex) {// If EJB 3.1 API not present, simply ignore.}this.advice = buildAdvice(executor, exceptionHandler);this.pointcut = buildPointcut(asyncAnnotationTypes);
}protected Advice buildAdvice(@Nullable Supplier executor, @Nullable Supplier exceptionHandler) {// AsyncAnnotationAdvisor#buildAdviceAnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);interceptor.configure(executor, exceptionHandler);return interceptor;
}
请求进入,创建代理
// AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization
// 这里的实例是AsyncAnnotationBeanPostProcessor,也就是上面创建并加入的实例类
// AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization
public Object postProcessAfterInitialization(Object bean, String beanName) {// ... 部分代码省略// 判断是否符合代理条件// 对类或者方法进行校验(判断类或者方法有没有对应的注解注释)if (isEligible(bean, beanName)) {ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);if (!proxyFactory.isProxyTargetClass()) {evaluateProxyInterfaces(bean.getClass(), proxyFactory);}proxyFactory.addAdvisor(this.advisor);customizeProxyFactory(proxyFactory);// Use original ClassLoader if bean class not locally loaded in overriding class loaderClassLoader classLoader = getProxyClassLoader();if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();}return proxyFactory.getProxy(classLoader);}// No proxy needed.return bean;
}
怎么异步执行的
AsyncExecutionInterceptor
public Object invoke(final MethodInvocation invocation) throws Throwable {Class> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);// 获取 executor AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}Callable
AsyncExecutionAspectSupport
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {// 尝试缓存获取AsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;// 获取委托执行任务的异步线程名字, 也就是方法上@Async注解里的value字段// 如果方法上没获取到,则获取方法上对应的类String qualifier = getExecutorQualifier(method);// 如果没有则使用默认执行器,有则从spring中获取名字为qualifier值的执行器if (StringUtils.hasLength(qualifier)) {targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {targetExecutor = this.defaultExecutor.get();}if (targetExecutor == null) {return null;}// 判断是否要包装,然后进行缓存executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));this.executors.put(method, executor);}return executor;
}
其他
判断需要创建代理
// AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible()
protected boolean isEligible(Object bean, String beanName) {return (!AutoProxyUtils.isOriginalInstance(beanName, bean.getClass()) &&super.isEligible(bean, beanName));
}// AbstractAdvisingBeanPostProcessor#isEligible()
protected boolean isEligible(Class> targetClass) {// 尝试获取缓存Boolean eligible = this.eligibleBeans.get(targetClass);if (eligible != null) {return eligible;}// 没有advisor,也就没有代理的必要了 advisor一般是拦截器if (this.advisor == null) {return false;}// 校验是否需要代理,缓存结果eligible = AopUtils.canApply(this.advisor, targetClass);this.eligibleBeans.put(targetClass, eligible);return eligible;
}// ApoUtils#canApply()
public static boolean canApply(Advisor advisor, Class> targetClass, boolean hasIntroductions) {// 不同类型走不同分支,这里走 PointcutAdvisor 分支if (advisor instanceof IntroductionAdvisor) {return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);}else if (advisor instanceof PointcutAdvisor) {PointcutAdvisor pca = (PointcutAdvisor) advisor;return canApply(pca.getPointcut(), targetClass, hasIntroductions);}else {// It doesn't have a pointcut so we assume it applies.return true;}
}public static boolean canApply(Pointcut pc, Class> targetClass, boolean hasIntroductions) {Assert.notNull(pc, "Pointcut must not be null");// 校验类上是否有 @Async注解 filter定义看下面if (!pc.getClassFilter().matches(targetClass)) {return false;}MethodMatcher methodMatcher = pc.getMethodMatcher();if (methodMatcher == MethodMatcher.TRUE) {// No need to iterate the methods if we're matching any method anyway...return true;}IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;if (methodMatcher instanceof IntroductionAwareMethodMatcher) {introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;}// 扫描类方法的上是否有注解Set> classes = new LinkedHashSet<>();// 当前类为非代理类加入if (!Proxy.isProxyClass(targetClass)) {classes.add(ClassUtils.getUserClass(targetClass));}// 获取当前类的所有实现接口,包括子类classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));// 遍历扫描,如果类方法中有被 @Async注解注释的,则返回true,其他falsefor (Class> clazz : classes) {Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);for (Method method : methods) {if (introductionAwareMethodMatcher != null ?introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :methodMatcher.matches(method, targetClass)) {return true;}}}return false;
}
filter定义的地方
// 启动的时候, AsyncAnnotationAdvisor 构造方法
public AsyncAnnotationAdvisor(@Nullable Supplier executor, @Nullable Supplier exceptionHandler) {...// pointcut构建this.pointcut = buildPointcut(asyncAnnotationTypes);
}// asyncAnnotationTypes 只有一个Async
protected Pointcut buildPointcut(Set> asyncAnnotationTypes) {ComposablePointcut result = null;for (Class extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {// 类 pointCutPointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);// 方法 pointCutPointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);if (result == null) {result = new ComposablePointcut(cpc);}else {result.union(cpc);}// 合并result = result.union(mpc);}return (result != null ? result : Pointcut.TRUE);
}
总结
@Async
的实现是基于代理,所以如果方法内调用异步方法实际是不会生效的,即还是当前线程执行