[seata][分享] seata1.5.2 如何在二阶段发生异常时告警

2024-07-15 390 views
8
场景描述

RM二阶段发生异常时,TC是会进行不断的重试。直到成功为止 那么,问题就来了。一直重试虽然在一定程度上可能成功,但大部分情况下一直重试是不会成功的 这个时候,是需要作出一定的告警措施,比如发邮件,发钉钉之类的

比较幸运的是,seata提供了SPI,这样可以比较方便的实现。经过一番研究尝试,终于找到比较好的实现方式。如下,enjoy!

Step One Step Two

com.xxx.MyTCCResourceManager类代码。大部分抄官方的io.seata.rm.tcc.TCCResourceManager代码即可,这里只列出关键代码

    @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
        if (tccResource == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
        }
        Object targetTCCBean = tccResource.getTargetBean();
        Method commitMethod = tccResource.getCommitMethod();
        if (targetTCCBean == null || commitMethod == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
        }
        try {
            //BusinessActionContext
            BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
                applicationData);
            Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
            Object ret;
            boolean result;
            // add idempotent and anti hanging
            if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
                try {
                    result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                    throw e.getCause();
                }
            } else {
                ret = commitMethod.invoke(targetTCCBean, args);
                if (ret != null) {
                    if (ret instanceof TwoPhaseResult) {
                        result = ((TwoPhaseResult)ret).isSuccess();
                    } else {
                        result = (boolean)ret;
                    }
                } else {
                    result = true;
                }
            }
            LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
            if (!result) {
                // !!!FBI WARNING!!! 关键地方。把你做告警的代码放这里就可以了!!!
            }
            return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
        } catch (Throwable t) {
            String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
            LOGGER.error(msg, t);
            // !!!FBI WARNING!!! 关键地方。把你做告警的代码放这里就可以了!!!
            return BranchStatus.PhaseTwo_CommitFailed_Retryable;
        }
    }

    @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
        if (tccResource == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
        }
        Object targetTCCBean = tccResource.getTargetBean();
        Method rollbackMethod = tccResource.getRollbackMethod();
        if (targetTCCBean == null || rollbackMethod == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
        }
        try {
            //BusinessActionContext
            BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
                applicationData);
            Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);
            Object ret;
            boolean result;
            // add idempotent and anti hanging
            if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
                try {
                    result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,
                            args, tccResource.getActionName());
                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                    throw e.getCause();
                }
            } else {
                ret = rollbackMethod.invoke(targetTCCBean, args);
                if (ret != null) {
                    if (ret instanceof TwoPhaseResult) {
                        result = ((TwoPhaseResult)ret).isSuccess();
                    } else {
                        result = (boolean)ret;
                    }
                } else {
                    result = true;
                }
            }
            LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
            if (!result) {
                // !!!FBI WARNING!!! 关键地方。把你做告警的代码放这里就可以了!!!
            }
            return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        } catch (Throwable t) {
            String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
            LOGGER.error(msg, t);
            // !!!FBI WARNING!!! 关键地方。把你做告警的代码放这里就可以了!!!
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }

回答

1

分享另外一个。 在做这个告警的时候,一开始是想着用aop LocalTCC这个注解来实现的,但用了aop之后,会导致confirm和cancel拿不到全局事务id的!所以这种方法是不可取的。

反正也写了一些代码,就分享一下,记住,以下代码会导致问题的!!!

切面类
@Slf4j
public class LibBusinessSeataMethodInterceptor implements MethodInterceptor, Ordered {

    private volatile boolean disable = ConfigurationFactory.getInstance().getBoolean(
            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);

    private final PlatformSysNoticeRecordClient platformSysNoticeRecordClient;

    public LibBusinessSeataMethodInterceptor(PlatformSysNoticeRecordClient platformSysNoticeRecordClient) {
        this.platformSysNoticeRecordClient = platformSysNoticeRecordClient;
    }

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // 没在seata事务中,或者没开启seata事务,直接返回
        // 对,用这种aop来做的话,就是这里会有问题!!!,拿不到全局事务id的!!!
        if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {
            return invocation.proceed();
        }
        Method method = invocation.getMethod();
        LocalTCC localTCC = method.getDeclaringClass().getAnnotation(LocalTCC.class);
        if (localTCC == null) {
            return invocation.proceed();
        }
        Method[] methods = method.getDeclaringClass().getMethods();
        TwoPhaseBusinessAction twoPhaseBusinessAction = null;
        for (Method methodExist : methods) {
            twoPhaseBusinessAction = methodExist.getAnnotation(TwoPhaseBusinessAction.class);
            if (twoPhaseBusinessAction != null) {
                break;
            }
        }
        // 极端情况,找不到这个注解方法
        if (twoPhaseBusinessAction == null) {
            log.info(
                    "Not found twoPhaseBusinessAction method. class: {}, methods: {}",
                    method.getDeclaringClass().getName(),
                    methods
            );
            return invocation.proceed();
        }
        String commitMethodName = twoPhaseBusinessAction.commitMethod();
        String cancelMethodName = twoPhaseBusinessAction.rollbackMethod();

        if (!method.getName().equals(commitMethodName) && !method.getName().equals(cancelMethodName)) {
            return invocation.proceed();
        }

        try {
            return invocation.proceed();
        } catch (Throwable e) {
            Object[] args = invocation.getArguments();
            // 理论上应该是第一参数为BusinessActionContext
            if (args.length > 0 && args[0] instanceof BusinessActionContext) {
                BusinessActionContext businessActionContext = (BusinessActionContext) args[0];
                sendDingTalkNotify(String.format(
                        "Seata onBeginFailure. txId: %s, branchId: %s, actionName: %s, stackTrace: %s",
                        businessActionContext.getXid(),
                        businessActionContext.getBranchId(),
                        businessActionContext.getActionName(),
                        JSON.toJSONString(e.getStackTrace())
                ));
            }

            throw e;
        }
    }

    private void sendDingTalkNotify(String message) {
        // 告警代码
    }

    @Override
    public int getOrder() {
        return Integer.MIN_VALUE;
    }
}
配置类
@Configuration
public class LibBusinessSeataInterceptorConfig {

    @Bean
    public DefaultPointcutAdvisor defaultPointcutAdvisor(PlatformSysNoticeRecordClient platformSysNoticeRecordClient) {
        LibBusinessSeataMethodInterceptor libBusinessSeataMethodInterceptor =
            new LibBusinessSeataMethodInterceptor(platformSysNoticeRecordClient);
        AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
        // 只拦截confirm和cancel方法
        pointcut.setExpression("execution(public boolean com.ppwang..*.data..*.*(io.seata.rm.tcc.api.BusinessActionContext))");

        DefaultPointcutAdvisor advisor = new DefaultPointcutAdvisor();
        advisor.setPointcut(pointcut);
        advisor.setAdvice(libBusinessSeataMethodInterceptor);
        return advisor;
    }
}
7

建议放到Discussions去,或者给官网https://github.com/seata/seata.github.io 提交一篇博客pr

2

还有个问题我想讨论下,作为用户视角,你觉得发出告警的是client,还是server呢?比如server下发给client,其执行一直失败,这个其实server端也能感知到,比如在prometheus打个点,是否也能做成告警? 但是我总觉得client去处理的话自定义打点肯定比server要细致,更容易也更精确知道一些问题

6

感谢回复。我昨晚看完你的回复之后,想了一夜,觉得你说的比较符合实情,放在server用prometheus做告警这样会比较综合处理一些,这样会更省事。而放client的话,虽然说是说能更细致,但复杂度多很多 @a364176773

8

@a364176773 我看了下server-1.5.2的metrics。如果做监控的话,需要为每个接入的分布式事务做才行,没有统一的指标。如下图所示 image