百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Seata源码—6.Seata AT模式的数据源代理三

itomcoil 2025-06-15 16:58 8 浏览

大纲

1.Seata的Resource资源接口源码

2.Seata数据源连接池代理的实现源码

3.Client向Server发起注册RM的源码

4.Client向Server注册RM时的交互源码

5.数据源连接代理与SQL句柄代理的初始化源码

6.Seata基于SQL句柄代理执行SQL的源码

7.执行SQL语句前取消自动提交事务的源码

8.执行SQL语句前后构建数据镜像的源码

9.构建全局锁的key和UndoLog数据的源码

10.Seata Client发起分支事务注册的源码

11.Seata Server处理分支事务注册请求的源码

12.将UndoLog写入到数据库与提交事务的源码

13.通过全局锁重试策略组件执行事务的提交

14.注册分支事务时获取全局锁的入口源码

15.Seata Server获取全局锁的具体逻辑源码

16.全局锁和分支事务及本地事务总结

17.提交全局事务以及提交各分支事务的源码

18.全局事务回滚的过程源码


16.全局锁和分支事务及本地事务总结

获取到全局锁,才能注册分支事务成功,否则LockRetryPolicy重试。获取到全局锁,才能提交本地事务成功,否则LockRetryPolicy重试。


全局锁没有被其他事务(xid)获取,则当前事务(xid)才能获取全局锁成功。获取全局锁,会将当前分支事务申请全局锁的记录写入到数据库中。


17.提交全局事务以及提交各分支事务的源码

(1)Seata Client发起提交全局事务的请求

(2)Server向Client发送提交分支事务的请求

(3)Seata Client处理提交分支事务的请求

(4)全局事务的提交主要就是让各个分支事务把本地的UndoLog删除


(1)Seata Client发起提交全局事务的请求

-> TransactionalTemplate.execute()发起全局事务的提交
-> TransactionalTemplate.commitTransaction()
-> DefaultGlobalTransaction.commit()
-> DefaultTransactionManager.commit()
-> DefaultTransactionManager.syncCall()
-> TmNettyRemotingClient.sendSyncRequest()
把全局事务提交请求GlobalCommitRequest发送给Seata Server进行处理
//Template of executing business logic with a global transaction. 全局事务执行模版
public class TransactionalTemplate {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);
    
    public Object execute(TransactionalExecutor business) throws Throwable {
        //1.Get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }

        //1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
        //根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务
        //刚开始在开启一个全局事务的时候,是没有全局事务的
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        //1.2 Handle the transaction propagation.
        //从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED
        //也就是如果存在一个全局事务,就直接执行业务;如果不存在一个全局事务,就开启一个新的全局事务;
        Propagation propagation = txInfo.getPropagation();

        //不同的全局事务传播级别,会采取不同的处理方式
        //比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid
        //可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                ...
            }

            //1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            //set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                //2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
                //else do nothing. Of course, the hooks will still be triggered.
                //开启一个全局事务
                beginTransaction(txInfo, tx);
  
                Object rs;
                try {
                    //Do Your Business
                    //执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并提交一个一个分支事务
                    rs = business.execute();
                } catch (Throwable ex) {
                    //3. The needed business exception to rollback.
                    //发生异常时需要完成的事务
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                //4. everything is fine, commit.
                //如果一切执行正常就会在这里提交全局事务
                commitTransaction(tx);

                return rs;
            } finally {
                //5. clear
                //执行一些全局事务完成后的回调,比如清理等工作
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            //If the transaction is suspended, resume it.
            if (suspendedResourcesHolder != null) {
                //如果之前挂起了一个全局事务,此时可以恢复这个全局事务
                tx.resume(suspendedResourcesHolder);
            }
        }
    }
    
    //提交事务
    private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeCommit();
            tx.commit();
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);
        }
    }
    ...
}

//The type Default global transaction. 默认的全局事务
public class DefaultGlobalTransaction implements GlobalTransaction {
    private TransactionManager transactionManager;
    
    DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
        this.transactionManager = TransactionManagerHolder.get();//全局事务管理者
        this.xid = xid;
        this.status = status;
        this.role = role;
    }
    ...
    @Override
    public void commit() throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            //Participant has no responsibility of committing
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
        int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    retry--;
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global commit", ex);
                    }
                }
            }
        } finally {
            if (xid.equals(RootContext.getXID())) {
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] commit status: {}", xid, status);
        }
    }
    ...
}

public class TransactionManagerHolder {
    ...
    private TransactionManagerHolder() {
    }
    
    private static class SingletonHolder {
        private static TransactionManager INSTANCE = null;
        static {
            try {
                INSTANCE = EnhancedServiceLoader.load(TransactionManager.class);
                LOGGER.info("TransactionManager Singleton {}", INSTANCE);
            } catch (Throwable anyEx) {
                LOGGER.error("Failed to load TransactionManager Singleton! ", anyEx);
            }
        }
    }
    
    //Get transaction manager.
    public static TransactionManager get() {
        if (SingletonHolder.INSTANCE == null) {
            throw new ShouldNeverHappenException("TransactionManager is NOT ready!");
        }
        return SingletonHolder.INSTANCE;
    }
    ...
}

public class DefaultTransactionManager implements TransactionManager {
    ...
    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }
    
    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            //TMNettyRemotingClient会和Seata Server基于Netty建立长连接
            return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
        } catch (TimeoutException toe) {
            throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
        }
    }
    ...
}

(2)Server向Client发送提交分支事务的请求

ServerHandler的channelRead()方法会将收到的请求进行层层传递:首先交给DefaultCoordinator的onRequest()方法来进行处理,然后交给GlobalCommitRequest的handle()方法来进行处理,接着交给AbstractTCInboundHandler的handle()方法来进行处理,最后交给DefaultCoordinator的doGlobalCommit()方法来进行处理,也就是调用DefaultCore的commit()方法来提交全局事务

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
    ...
    @ChannelHandler.Sharable
    class ServerHandler extends ChannelDuplexHandler {
        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理
            processMessage(ctx, (RpcMessage) msg);
        }
        ...
    }
    ...
}

public abstract class AbstractNettyRemoting implements Disposable {
    ...
    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
        }
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
            //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
            //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            } finally {
                                MDC.clear();
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        ...
                    }
                } else {
                    try {
                        pair.getFirst().process(ctx, rpcMessage);
                    } catch (Throwable th) {
                        LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                    }
                }
            } else {
                LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
            }
        } else {
            LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
        }
    }
    ...
}

public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {
    private final RemotingServer remotingServer;
    ...
    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (ChannelManager.isRegistered(ctx.channel())) {
            onRequestMessage(ctx, rpcMessage);
        } else {
            try {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
                }
                ctx.disconnect();
                ctx.close();
            } catch (Exception exx) {
                LOGGER.error(exx.getMessage());
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
            }
        }
    }
    
    private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        Object message = rpcMessage.getBody();
        //RpcContext线程本地变量副本
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
        } else {
            try {
                BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
            } catch (InterruptedException e) {
                LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
            }
        }
        if (!(message instanceof AbstractMessage)) {
            return;
        }
        //the batch send request message
        if (message instanceof MergedWarpMessage) {
            ...
        } else {
            //the single send request message
            final AbstractMessage msg = (AbstractMessage) message;
            //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage
            AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
            //返回响应给客户端
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
        }
    }
    ...
}

public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
    ...
    @Override
    public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
        if (!(request instanceof AbstractTransactionRequestToTC)) {
            throw new IllegalArgumentException();
        }
        //传入的request其实就是客户端发送请求时的GlobalCommitRequest
        AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
        transactionRequest.setTCInboundHandler(this);
        return transactionRequest.handle(context);
    }
    ...
}

public class GlobalCommitRequest extends AbstractGlobalEndRequest {
    @Override
    public short getTypeCode() {
        return MessageType.TYPE_GLOBAL_COMMIT;
    }
    
    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return handler.handle(this, rpcContext);
    }
}

public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {
    ...
    @Override
    public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
        GlobalCommitResponse response = new GlobalCommitResponse();
        response.setGlobalStatus(GlobalStatus.Committing);
        exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
            @Override
            public void execute(GlobalCommitRequest request, GlobalCommitResponse response) throws TransactionException {
                try {
                    doGlobalCommit(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
                }
            }
           
            @Override
            public void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response, TransactionException tex) {
                super.onTransactionException(request, response, tex);
                checkTransactionStatus(request, response);
            }

            @Override
            public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {
                super.onException(request, response, rex);
                checkTransactionStatus(request, response);
            }
        }, request, response);
        return response;
    }
    
    protected abstract void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException;
    ...
}

public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
    private final DefaultCore core;
    ...
    @Override
    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        //调用DefaultCore.commit()方法提交全局事务
        response.setGlobalStatus(core.commit(request.getXid()));
    }
    ...
}

DefaultCore的commit()方法会调用DefaultCore的doGlobalCommit()方法,而doGlobalCommit()方法会获取全局事务的所有分支事务并进行遍历,然后把提交分支事务的请求BranchCommitRequest发送到Seata Client中。

public class DefaultCore implements Core {
    ...
    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        //just lock changeStatus

        boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                //Highlight: Firstly, close the session, then no more branch can be registered.
                globalSession.closeAndClean();

                if (globalSession.canBeCommittedAsync()) {
                    globalSession.asyncCommit();
                    MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
                    return false;
                } else {
                    globalSession.changeGlobalStatus(GlobalStatus.Committing);
                    return true;
                }
            }
            return false;
        });

        if (shouldCommit) {
            boolean success = doGlobalCommit(globalSession, false);
            //If successful and all remaining branches can be committed asynchronously, do async commit.
            if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
                globalSession.asyncCommit();
                return GlobalStatus.Committed;
            } else {
                return globalSession.getStatus();
            }
        } else {
            return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
        }
    }

    @Override
    public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
        //start committing event
        MetricsPublisher.postSessionDoingEvent(globalSession, retrying);

        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
        } else {
            //获取到全局事务的所有分支事务,并进行遍历提交
            Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
                //if not retrying, skip the canBeCommittedAsync branches
                if (!retrying && branchSession.canBeCommittedAsync()) {
                    return CONTINUE;
                }

                BranchStatus currentStatus = branchSession.getStatus();
                if (currentStatus == BranchStatus.PhaseOne_Failed) {
                    SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                    return CONTINUE;
                }
                try {
                    //发送请求给Seata Client提交分支事务
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
                    if (isXaerNotaTimeout(globalSession,branchStatus)) {
                        LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                        branchStatus = BranchStatus.PhaseTwo_Committed;
                    }
                    switch (branchStatus) {
                        case PhaseTwo_Committed:
                            SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                            return CONTINUE;
                        case PhaseTwo_CommitFailed_Unretryable:
                            //not at branch
                            SessionHelper.endCommitFailed(globalSession, retrying);
                            LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        default:
                            if (!retrying) {
                                globalSession.queueToRetryCommit();
                                return false;
                            }
                            if (globalSession.canBeCommittedAsync()) {
                                LOGGER.error("Committing branch transaction[{}], status:{} and will retry later", branchSession.getBranchId(), branchStatus);
                                return CONTINUE;
                            } else {
                                LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
                                return false;
                            }
                    }
                } catch (Exception ex) {
                    StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}", new String[] {branchSession.toString()});
                    if (!retrying) {
                        globalSession.queueToRetryCommit();
                        throw new TransactionException(ex);
                    }
                }
                return CONTINUE;
            });
            //Return if the result is not null
            if (result != null) {
                return result;
            }
            //If has branch and not all remaining branches can be committed asynchronously,
            //do print log and return false
            if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
                LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
                return false;
            }
            if (!retrying) {
                //contains not AT branch
                globalSession.setStatus(GlobalStatus.Committed);
            }
        }
        //if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
        //executed to improve concurrency performance, and the global transaction ends..
        if (success && globalSession.getBranchSessions().isEmpty()) {
            SessionHelper.endCommitted(globalSession, retrying);
            LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
        }
        return success;
    }
    ...
}

public abstract class AbstractCore implements Core {
    protected RemotingServer remotingServer;
    ...
    @Override
    public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
        try {
            BranchCommitRequest request = new BranchCommitRequest();
            request.setXid(branchSession.getXid());
            request.setBranchId(branchSession.getBranchId());
            request.setResourceId(branchSession.getResourceId());
            request.setApplicationData(branchSession.getApplicationData());
            request.setBranchType(branchSession.getBranchType());
            return branchCommitSend(request, globalSession, branchSession);
        } catch (IOException | TimeoutException e) {
            throw new BranchTransactionException(FailedToSendBranchCommitRequest, String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(), branchSession.getBranchId()), e);
        }
    }
    
    protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession, BranchSession branchSession) throws IOException, TimeoutException {
        BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(branchSession.getResourceId(), branchSession.getClientId(), request);
        return response.getBranchStatus();
    }
    ...
}

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
    ...
    @Override
    public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException {
        if (channel == null) {
            throw new RuntimeException("client is not connected");
        }
        RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
        return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
    }
    ...
}

public abstract class AbstractNettyRemoting implements Disposable {
    ...
    protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
        if (timeoutMillis <= 0) {
            throw new FrameworkException("timeout should more than 0ms");
        }
        if (channel == null) {
            LOGGER.warn("sendSync nothing, caused by null channel.");
            return null;
        }

        //把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里
        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeoutMillis);
        futures.put(rpcMessage.getId(), messageFuture);

        channelWritableCheck(channel, rpcMessage.getBody());

        //获取远程地址
        String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
        doBeforeRpcHooks(remoteAddr, rpcMessage);

        //异步化发送数据,同时对发送结果添加监听器
        //如果发送失败,则会对网络连接Channel进行销毁处理
        channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
                if (messageFuture1 != null) {
                    messageFuture1.setResultMessage(future.cause());
                }
                destroyChannel(future.channel());
            }
        });

        try {
            //然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应
            Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            doAfterRpcHooks(remoteAddr, rpcMessage, result);
            return result;
        } catch (Exception exx) {
            LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());
            if (exx instanceof TimeoutException) {
                throw (TimeoutException) exx;
            } else {
                throw new RuntimeException(exx);
            }
        }
    }
    ...
}

(3)Seata Client处理提交分支事务的请求

ClientHandler的channelRead()方法收到提交分支事务的请求后,会由RmBranchCommitProcessor的handleBranchCommit()方法进行处理。

-> AbstractRMHandler.onRequest()
-> BranchCommitRequest.handle()
-> AbstractRMHandler.handle()
-> AbstractRMHandler.doBranchCommit()
-> DataSourceManager.branchCommit()
-> AsyncWorker.branchCommit()异步化提交分支事务
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
    ...
    @Sharable
    class ClientHandler extends ChannelDuplexHandler {
        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            processMessage(ctx, (RpcMessage) msg);
        }
        ...
    }
    ...
}

public abstract class AbstractNettyRemoting implements Disposable {
    ...
    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
        }
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            } finally {
                                MDC.clear();
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        ...
                    }
                } else {
                    try {
                        pair.getFirst().process(ctx, rpcMessage);
                    } catch (Throwable th) {
                        LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                    }
                }
            } else {
                LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
            }
        } else {
            LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
        }
    }
    ...
}

public class RmBranchCommitProcessor implements RemotingProcessor {
    ...
    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
        Object msg = rpcMessage.getBody();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("rm client handle branch commit process:" + msg);
        }
        handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
    }

    private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
        BranchCommitResponse resultMessage;
        resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("branch commit result:" + resultMessage);
        }
        try {
            this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
        } catch (Throwable throwable) {
            LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);
        }
    }
    ...
}

public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler {
    ...
    @Override
    public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
        if (!(request instanceof AbstractTransactionRequestToRM)) {
            throw new IllegalArgumentException();
        }
        AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
        transactionRequest.setRMInboundMessageHandler(this);
        return transactionRequest.handle(context);
    }
    ...
}

public class BranchCommitRequest extends AbstractBranchEndRequest {
    @Override
    public short getTypeCode() {
        return MessageType.TYPE_BRANCH_COMMIT;
    }
    
    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return handler.handle(this);
    }
}

public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler {
    @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        BranchCommitResponse response = new BranchCommitResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
            @Override
            public void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
                doBranchCommit(request, response);
            }
        }, request, response);
        return response;
    }
    
    protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
        }
        BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch commit result: " + status);
        }
    }
    ...
}

//The type Data source manager. DataSourceManager是AT模式下的资源管理器
public class DataSourceManager extends AbstractResourceManager {
    //异步化worker
    private final AsyncWorker asyncWorker = new AsyncWorker(this);
    ...
    @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        //通过asyncWorker,异步化提交分支事务
        return asyncWorker.branchCommit(xid, branchId, resourceId);
    }
    ...
}

(4)全局事务的提交主要就是让各个分支事务把本地的UndoLog删除

public class AsyncWorker {
    ...
    public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
        Phase2Context context = new Phase2Context(xid, branchId, resourceId);
        addToCommitQueue(context);
        return BranchStatus.PhaseTwo_Committed;
    }
    
    private void addToCommitQueue(Phase2Context context) {
        if (commitQueue.offer(context)) {
            return;
        }
        CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor).thenRun(() -> addToCommitQueue(context));
    }
    
    void doBranchCommitSafely() {
        try {
            doBranchCommit();
        } catch (Throwable e) {
            LOGGER.error("Exception occur when doing branch commit", e);
        }
    }
    
    private void doBranchCommit() {
        if (commitQueue.isEmpty()) {
            return;
        }
        //transfer all context currently received to this list
        List<Phase2Context> allContexts = new LinkedList<>();
        commitQueue.drainTo(allContexts);
        //group context by their resourceId
        Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);
        groupedContexts.forEach(this::dealWithGroupedContexts);
    }
    
    private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
        DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
        if (dataSourceProxy == null) {
            LOGGER.warn("failed to find resource for {} and requeue", resourceId);
            addAllToCommitQueue(contexts);
            return;
        }

        Connection conn = null;
        try {
            conn = dataSourceProxy.getPlainConnection();
            UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());

            //split contexts into several lists, with each list contain no more element than limit size
            List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);
            //全局事务的提交,就是让各个分支事务把本地的undo logs删除掉即可
            for (List<Phase2Context> partition : splitByLimit) {
                deleteUndoLog(conn, undoLogManager, partition);
            }
        } catch (SQLException sqlExx) {
            addAllToCommitQueue(contexts);
            LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, sqlExx);
        } finally {
            IOUtil.close(conn);
        }
    }
    ...
}


18.全局事务回滚的过程源码

全局事务的回滚流程和提交流程几乎一样:

一.Seata Client发起全局事务回滚请求

二.Server向Client发送分支事务回滚请求

三.Seata Client处理分支事务回滚的请求

相关推荐

Java 如何从一个 List 中随机获得元素

概述从一个List中随机获得一个元素是有关List的一个基本操作,但是这个操作又没有非常明显的实现。本页面主要向你展示如何有效的从List中获得一个随机的元素和可以使用的一些方法。选择一个...

想月薪过万吗?计算机安卓开发之&quot;集合&quot;

集合的总结:/***Collection*List(存取有序,有索引,可以重复)*ArrayList*底层是数组实现的,线程不安全,查找和修改快,增和删比较慢*LinkedList*底层是...

China Narrows AI Talent Gap With U.S. as Research Enters Engineering Phase: Report

ImagegeneratedbyAITMTPOST--ChinaisclosinginontheU.S.intheAIindustry-academia-research...

大促系统优化之应用启动速度优化实践

作者:京东零售宋维飞一、前言本文记录了在大促前针对SpringBoot应用启动速度过慢而采取的优化方案,主要介绍了如何定位启动速度慢的阻塞点,以及如何解决这些问题。希望可以帮助大家了解如何定位该类问...

MyEMS开源能源管理系统核心代码解读004

本期解读:计量表能耗数据规范化算法:myems/myems-normalization/meter.py代码见底部这段代码是一个用于计算和存储能源计量数据(如电表读数)的小时值的Python脚本。它主...

Java接口与抽象类:核心区别、使用场景与最佳实践

Java接口与抽象类:核心区别、使用场景与最佳实践一、核心特性对比1.语法定义接口:interface关键字定义,支持extends多继承接口javapublicinterfaceDrawabl...

超好看 vue2.x 音频播放器组件Vue-APlayer

上篇文章给大家分享了视频播放器组件vue-aliplayer,这次给大家推荐一款音频插件VueAplayer。vue-aplayer一个好看又好用的轻量级vue.js音乐播放器组件。清爽漂亮的U...

Linq 下的扩展方法太少了,MoreLinq 来啦

一:背景1.讲故事前几天看同事在用linq给内存中的两个model做左连接,用过的朋友都知道,你一定少不了一个叫做DefaultIfEmpty函数,这玩意吧,本来很流畅的from......

MapReduce过程详解及其性能优化(详细)

从JVM的角度看Map和ReduceMap阶段包括:第一读数据:从HDFS读取数据1、问题:读取数据产生多少个Mapper??Mapper数据过大的话,会产生大量的小文件,由于Mapper是基于虚拟...

手把手教你使用scrapy框架来爬取北京新发地价格行情(实战篇)

来源:Python爬虫与数据挖掘作者:霖hero前言关于Scrapy理论的知识,可以参考我的上一篇文章,这里不再赘述,直接上干货。实战演练爬取分析首先我们进入北京新发地价格行情网页并打开开发者工具,如...

屏蔽疯狂蜘蛛,防止CPU占用100%(mumu模拟器和雷电模拟器哪个更占用cpu)

站点总是某个时间段莫名的cpu100%,资源占用也不高,这就有必要怀疑爬虫问题。1.使用"robots.txt"规范在网站根目录新建空白文件,命名为"robots.txt&#...

Web黑客近年神作Gospider:一款基于Go语言开发的Web爬虫,要收藏

小白看黑客技术文章,一定要点首小歌放松心情哈,我最爱盆栽!开始装逼!Gospider是一款运行速度非常快的Web爬虫程序,对于爱好白帽黑客的小白来说,可谓是佳作!Gospider采用厉害的Go语言开发...

用宝塔面板免费防火墙屏蔽织梦扫描网站

今天教大家在免费的基础上屏蔽织梦扫描,首先您要安装宝塔面板,然后再安装免费的防火墙插件,我用的是Nginx免费防火墙,然后打开这个插件。设置GET-URL过滤设置一条简单的宝塔面板的正则规则就可以屏蔽...

蜘蛛人再捞4千万美元 连续三周蝉联北美票房冠军

7月15日讯老马追踪票房数据的北美院线联盟今天表示,“蜘蛛人:离家日”(Spider-Man:FarFromHome)击退两部新片的挑战,连续第2周勇夺北美票房冠军,海捞4530万美元。法新...

夏天到了,需要提防扁虱,真是又小又恐怖的动物

夏天马上要到了,你知道吗,扁虱是这个夏天最危险的动物之一,很少有动物能比它还凶猛。Whenitcomestosummer'slittledangers,fewarenastiert...