宝玛科技网
您的当前位置:首页Spring Cloud(十三):Spring 扩展

Spring Cloud(十三):Spring 扩展

来源:宝玛科技网
  • Spring扩展点
    • Bean的生命周期中的BeanPostProcessor扩展点
    • Spring扩展点梳理
  • Spring扩展点应用场景
    • 整合Nacos 服务注册
      • ApplicationListener扩展场景——监听容器中发布的事件
      • Lifecycle Nacos 发布订阅 & Eureka 服务启动、同步、剔除
      • Lifecycle扩展场景——管理具有启动、停止生命周期需求的对象
    • 整合Nacos 发布订阅 NacosWatch
    • 整合Eureka 注册
      • 扩展: Eureka Server端上下文的初始化是在SmartLifecycle#start中实现的
    • 整合Ribbon
      • SmartInitializingSingleton扩展场景—— 对容器中的Bean对象进行定制处理
    • 整合Feign
      • FactoryBean的扩展场景——将接口生成的代理对象交给Spring管理
    • 整合sentinel
      • HandlerInterceptor扩展场景——对mvc请求增强
      • SmartInitializingSingleton&FactoryBean结合场景——根据类型动态装配对象
    • 整合seata
      • AbstractAutoProxyCreator&MethodInterceptor结合场景——实现方法增强

Spring扩展点

Bean的生命周期中的BeanPostProcessor扩展点

Spring扩展点梳理

  • BeanFactoryPostProcessor
    • BeanDefinitionRegistryPostProcessor
      • postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) # 注册Bean 定义
  • BeanPostProcessor
    • InstantiationAwareBeanPostProcessor
      • postProcessBeforeInstantiation
      • postProcessAfterInstantiation
      • postProcessProperties
      • postProcessPropertyValues
    • AbstractAutoProxyCreator
      • postProcessAfterInitialization # 实例化之后AOP支持
  • @Import
    • ImportBeanDefinitionRegistrar
      • registerBeanDefinitions
    • ImportSelector
      • String[] selectImports(AnnotationMetadata importingClassMetadata)
      • default Predicate getExclusionFilter() { return null;}
  • Aware
    • ApplicationContextAware
      • setApplicationContext(ApplicationContext applicationContext)
    • BeanFactoryAware
      • setBeanFactory(BeanFactory beanFactory)
  • InitializingBean || @PostConstruct
  • FactoryBean (动态创建Bean)(&beanName factoryBean.getObject | beanName beanFactory.getBean)(Mybatis#mapper、Openfeign)
    • T getObject() throws Exception;
    • Class<?> getObjectType();
    • default boolean isSingleton() { return true; }
  • SmartInitializingSingleton Bean
    • void afterSingletonsInstantiated(); # 初始化之后调用 过滤器…
  • ApplicationListener
    • void onApplicationEvent(E event);
  • Lifecycle
    • SmartLifecycle
      • default boolean isAutoStartup() { return true; }
      • default void stop(Runnable callback)
    • LifecycleProcessor
  • HandlerInterceptor
    • default boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception
    • default void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable ModelAndView modelAndView) throws Exception
    • default void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler,@Nullable Exception ex) throws Exception
  • MethodInterceptor
    • Object invoke(MethodInvocation invocation) throws Throwable;

Spring扩展点应用场景

整合Nacos 服务注册

ApplicationListener扩展场景——监听容器中发布的事件

NamingService -> NacosNamingService => registerInstance

思考: 为什么整合Nacos注册中心后,服务启动就会自动注册,Nacos是如何实现自动服务注册的?

  1. Tomcat启动之后, 在ServletWebServerApplicationContext中,发布事件 ServletWebServerInitializedEvent

  2. spring-cloud-commons 包 AbstractAutoServiceRegistration 监听 WebServerInitializedEvent

public abstract class AbstractAutoServiceRegistration<R extends Registration>
		implements AutoServiceRegistration, ApplicationContextAware,
		ApplicationListener<WebServerInitializedEvent> {

	public void onApplicationEvent(WebServerInitializedEvent event) {
		bind(event); // .... register();
	}
}
  1. Nacos 实现ServiceRegistry.register

Lifecycle Nacos 发布订阅 & Eureka 服务启动、同步、剔除

Lifecycle扩展场景——管理具有启动、停止生命周期需求的对象

start => AbstractApplicationContext.refresh -> finishRefresh() -> getLifecycleProcessor().onRefresh(); -> start()

整合Nacos 发布订阅 NacosWatch

public class NacosWatch
		implements ApplicationEventPublisherAware, SmartLifecycle, DisposableBean {


	@Override
	public void start() {
			...
			NamingService namingService = nacosServiceManager
					.getNamingService(properties.getNacosProperties());
			try {
				namingService.subscribe(properties.getService(), properties.getGroup(),
						Arrays.asList(properties.getClusterName()), eventListener);
			}
			catch (Exception e) {
				log.error("namingService subscribe failed, properties:{}", properties, e);
			}

			this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
					this::nacosServicesWatch, this.properties.getWatchDelay());
		}
	}

}

整合Eureka 注册

扩展: Eureka Server端上下文的初始化是在SmartLifecycle#start中实现的

EurekaServerInitializerConfiguration

整合Ribbon

SmartInitializingSingleton扩展场景—— 对容器中的Bean对象进行定制处理

所有的单例bean实例化之后调用

思考:为什么@Bean修饰的RestTemplate加上@LoadBalanced就能实现负载均衡功能?

@Bean
@LoadBalanced
public RestTemplate restTemplate() {
    return new RestTemplate();
}

LoadBalancerAutoConfiguration
对SmartInitializingSingleton的扩展,为所有用@LoadBalanced修饰的restTemplate(利用了@Qualifier 限定符)绑定实现了负载均衡逻辑的LoadBalancerInterceptor

@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();

@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
		final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
	return () -> restTemplateCustomizers.ifAvailable(customizers -> {
		for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
			for (RestTemplateCustomizer customizer : customizers) {
				customizer.customize(restTemplate);
			}
		}
	});
}
/**
 * Annotation to mark a RestTemplate or WebClient bean to be configured to use a
 * LoadBalancerClient.
 * @author Spencer Gibb
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier //Qualifier
public @interface LoadBalanced {
}

LoadBalancerInterceptor

	@Configuration(proxyBeanMethods = false)
	@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
	static class LoadBalancerInterceptorConfig {

		@Bean
		public LoadBalancerInterceptor loadBalancerInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final LoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
				List<ClientHttpRequestInterceptor> list = new ArrayList<>(
						restTemplate.getInterceptors());
				list.add(loadBalancerInterceptor);
				restTemplate.setInterceptors(list);
			};
		}

	}

整合Feign

FactoryBean的扩展场景——将接口生成的代理对象交给Spring管理

思考:为什么Feign接口可以通过@Autowired直接注入使用?Feign接口是如何交给Spring管理的?

@FeignClient(value = "order-service", path = "/order")
public interface OrderFeignService {

    @GetMapping(value = "/info/{productId}")
    public String findOrderInfoByProductId(@PathVariable("productId") Integer productId);
}
@SpringBootApplication
@EnableFeignClients(basePackages = "com.mx.use.feign")
@EnableHystrix
@EnableHystrixDashboard //开启 Hystrix 监控功能
public class OrderServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderServiceApplication.class, args);
    }
}

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {
	...
}

FactoryBean => FeignClientsRegistrar -> scan(FeignClient) -> registerFeignClient -> FeignClientFactoryBean

@Override
public Object getObject() {
	return getTarget(); //
}

整合sentinel

HandlerInterceptor扩展场景——对mvc请求增强

AbstractSentinelInterceptor#preHandle

public abstract class AbstractSentinelInterceptor implements HandlerInterceptor {
    ...
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
        throws Exception {
        try {
            String resourceName = getResourceName(request);

            if (StringUtil.isEmpty(resourceName)) {
                return true;
            }
            
            if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
                return true;
            }
            
            // Parse the request origin using registered origin parser.
            String origin = parseOrigin(request);
            String contextName = getContextName(request);
            ContextUtil.enter(contextName, origin);
            //Sentinel入口
            Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
            request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
            return true;
        } catch (BlockException e) {
            try {
                handleBlockException(request, response, e);
            } finally {
                ContextUtil.exit();
            }
            return false;
        }
    }
    ...
}

SmartInitializingSingleton&FactoryBean结合场景——根据类型动态装配对象

SentinelDataSourceHandler

#Sentinel持久化读数据源设计,利用了SmartInitializingSingleton扩展点
SentinelDataSourceHandler#afterSingletonsInstantiated
# 注册一个FactoryBean类型的数据源 
》SentinelDataSourceHandler#registerBean
》》NacosDataSourceFactoryBean#getObject
# 利用FactoryBean获取到读数据源
》》new NacosDataSource(properties, groupId, dataId, converter)

NacosDataSourceFactoryBean

public class NacosDataSourceFactoryBean implements FactoryBean<NacosDataSource> {
	...
	@Override
	public NacosDataSource getObject() throws Exception {
		...
		return new NacosDataSource(properties, groupId, dataId, converter);
	}
	...
}

SentinelDataSourceHandler

public class SentinelDataSourceHandler implements SmartInitializingSingleton {
	...
	@Override
	public void afterSingletonsInstantiated() {
		...
		//sentinel.nacos.config.serverAddr=${sentinel.nacos.config.serverAddr}
		//NacosDataSource
		AbstractDataSourceProperties abstractDataSourceProperties = dataSourceProperties.getValidDataSourceProperties();
		abstractDataSourceProperties.setEnv(env);
		abstractDataSourceProperties.preCheck(dataSourceName);
		registerBean(abstractDataSourceProperties, dataSourceName+ "-sentinel-" + validFields.get(0) + "-datasource"); //NacosDataSource
		...
	}
	...
}

整合seata (Spring Aop)

AbstractAutoProxyCreator&MethodInterceptor结合场景——实现方法增强

GlobalTransactionScanner

GlobalTransactionalInterceptor

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {
	...

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            if (!localDisable) {
                if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                    AspectTransactional transactional;
                    if (globalTransactionalAnnotation != null) {
                        transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes());
                    } else {
                        transactional = this.aspectTransactional;
                    }
                    // 执行事务 transactionalTemplate.execute()
                    return handleGlobalTransaction(methodInvocation, transactional);
                } else if (globalLockAnnotation != null) {
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                }
            }
        }
        return methodInvocation.proceed();
    }
	...
}
public class TransactionalTemplate {
    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. Get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        // 1.2 Handle the transaction propagation.
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
                    // If transaction is existing, suspend it.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // Execute without transaction and return.
                    return business.execute();
                case REQUIRES_NEW:
                    // If transaction is existing, suspend it, and then begin new transaction.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // Continue and execute with new transaction
                    break;
                case SUPPORTS:
                    // If transaction is not existing, execute without transaction.
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    // Continue and execute with new transaction
                    break;
                case REQUIRED:
                    // If current transaction is existing, execute with current transaction,
                    // else continue and execute with new transaction.
                    break;
                case NEVER:
                    // If transaction is existing, throw exception.
                    if (existingTransaction(tx)) {
                        throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                    , tx.getXid()));
                    } else {
                        // Execute without transaction and return.
                        return business.execute();
                    }
                case MANDATORY:
                    // If transaction is not existing, throw exception.
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    // Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
                //    else do nothing. Of course, the hooks will still be triggered.
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
                commitTransaction(tx);

                return rs;
            } finally {
                //5. clear
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            // If the transaction is suspended, resume it.
            if (suspendedResourcesHolder != null) {
                tx.resume(suspendedResourcesHolder);
            }
        }
    }
}

因篇幅问题不能全部显示,请点此查看更多更全内容