thingsboard源码解析(1)–Actor系统流程

1. application 的目录架构

└── java
    └── org/thingsboard/server
        ├── ThingsboardInstallApplication.java (程序初始化入口-安装脚本等)
        ├── ThingsboardServerApplication.java (程序服务启动入口)
        ├── actors (Actor模型消息处理核心逻辑)
        ├── config (配置类)
        ├── controller (服务接口层)
        ├── exception (异常)
        ├── install (安装相关)
        ├── service (服务层)
        └── utils (工具类)
└── resources
    ├── banner.txt (应用启动时的控制台输出logo)
    ├── i18n (国际化相关)
    ├── logback.xml (日志框架的配置文件)
    ├── templates (模版)
    └── thingsboard.yml (配置文件)    

从java的入口中有两个:

org/thingsboard/server/ThingsboardInstallApplication.java
org/thingsboard/server/ThingsboardServerApplication.java

其中ThingsboardInstallApplication这个启动项对应的是tb的安装服务,主要是初始化数据库相关的信息;而ThingsboardServerApplication则对应core和rule-engine服务。IDE导入源码后,通过不同的启动类的方式来启动指定具体要启动的服务。

2. 项目初始化

由于项目是springboot 框架,在实际项目中,服务启动后做一些初始化工作,例如线程池初始化、文件资源加载、常驻后台任务启动(比如kafka consumer)等。一般初始化资源的方法有3类

  • Spring Bean初始化的InitializingBean,init-method和PostConstruct
  • ApplicationRunner与CommandLineRunner接口
  • Spring的事件机制

所以先搜索
@EventListener看看如下图

thingsboard源码解析(1)--Actor系统流程
image.png

@PostConstruct看看如下图

thingsboard源码解析(1)--Actor系统流程
image.png

看到log.info("Initializing actor system.");的注释,初步怀疑从这里开始。所以进入到这个函数再细看下:

    public void initActorSystem() {
        log.info("Initializing actor system.");
        actorContext.setActorService(this);  //初始化上下文
        TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
        system = new DefaultTbActorSystem(settings);//初始化系统,主要初始化任务执行扔线程池
        
        //初始化派发
        system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
        system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
        system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
        system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));

        actorContext.setActorSystem(system);

        appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
        actorContext.setAppActor(appActor);

        TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));
        actorContext.setStatsActor(statsActor);

        log.info("Actor system initialized.");
    }

DefaultActorService 完成了acotr的初始化:

  • 1.初始化上下文actorContext
  • 2.创建DefaultTbActorSystem,初始化任务执行的线程池;
  • 3.创建AppAcotr system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));, 这个是系统的根,是其它actor的parent actor, 即所有消息需要通过该actor;
    • 3.1.创建 AppActor的同时,需要创建对应的TbActorMailbox并与actor进行关联
    • 3.2.mailbox.initActor()至 AppActor 的init()方法
    • 3.3.mailbox 调用到 对应的Actor 的protected abstract boolean doProcess(TbActorMsg msg); 的实现, 即返回到AppActor的doProcess处理进来的消息
  • 4.AppActor的 doProcess处理时,调用initTenantActors()初始化租户Actor(TenantActor)
  • 5.调用至ctx.getOrCreateChildActor, ctx.getOrCreateChildActor中的cxt就是AppActor的mailbox引用,实际还是调回了system的TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent)这个方法,接下来同appActor创建即第3步骤一致
    • 5.1.创建 TenantActor的同时,需要创建对应的TbActorMailbox并与actor进行关联
    • 5.2.mailbox.initActor() 至 TenantActor的init()方法,里面调用了initRuleChains();
    • 5.3. TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);根据ruleChain创建RuleChainActor,这里又回到Actor的创建步骤。 一个Tenant对应多个RuleChain,并且有一个RootChain,所以TenantActor在initRuleChains()时候将其RootChain赋给rootChain,并将RuleChainActor的MailBox赋予rootChainActor;
    • 5.3中RuleChainActor的创建的会调用到RuleChainActor的init()方法,调用了createProcessor(ctx);initProcessor(ctx); 用来创建RuleChainActorMessageProcessor并开始用来处理RuleChainActor的消息processor.start(ctx);
    • 5.4. 在processor.start(ctx); RuleChainActorMessageProcessor 的 start方法又以RuleChain创建对应的RuleNodeActor,主要通过rule_noderelation这两张表中的数据进行排序,一个RuleNode对应一个 RuleNodeActor。
    • 创建 RuleNodeActor 又创建一个对应RuleNodeActorMessageProcessor来处理节点的状态变化,RuleNodeActorMessageProcessor初始化的时候tbNode = initComponent(ruleNode);会通过反射的方式拿到RuleNode对应的TbNode接口实现类,消息实际会通过这个实现类来进行处理。
      至此,Actor初始化完成。总结下关键点就是 system, Actor, mailbox 几个类, system创建actor和actor对应的mailbox, 并通过 actor的处理消息的doProcess方法层层递进创建 TenantActor、RuleChainActor、RuleNodeActor、TbNode直至末稍实现类
    @Override
    protected boolean doProcess(TbActorMsg msg) {
        if (!ruleChainsInitialized) {
            "initTenantActors();"//初始化租户Actor
            ruleChainsInitialized = true;
            if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) {
                log.warn("Rule Chains initialized by unexpected message: {}", msg);
            }
        }
......
    private void initTenantActors() {
        log.info("Starting main system actor.");
        try {
            // This Service may be started for specific tenant only.
            Optional isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
            if (isolatedTenantId.isPresent()) {
                Tenant tenant = systemContext.getTenantService().findTenantById(isolatedTenantId.get());
                if (tenant != null) {
                    log.debug("[{}] Creating tenant actor", tenant.getId());
                    getOrCreateTenantActor(tenant.getId());
                    log.debug("Tenant actor created.");
                } else {
                    log.error("[{}] Tenant with such ID does not exist", isolatedTenantId.get());
                }
            } else if (systemContext.isTenantComponentsInitEnabled()) {
                PageDataIterable tenantIterator = new PageDataIterable(tenantService::findTenants, ENTITY_PACK_LIMIT);
                boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
                boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
                for (Tenant tenant : tenantIterator) {
                    TenantProfile tenantProfile = tenantProfileCache.get(tenant.getTenantProfileId());
                    if (isCore || (isRuleEngine && !tenantProfile.isIsolatedTbRuleEngine())) {
                        log.debug("[{}] Creating tenant actor", tenant.getId());
                        getOrCreateTenantActor(tenant.getId());
                        log.debug("[{}] Tenant actor created.", tenant.getId());
                    }
                }
            }
            log.info("Main system actor started.");
        } catch (Exception e) {
            log.warn("Unknown failure", e);
        }
    }

    private TbActorRef getOrCreateTenantActor(TenantId tenantId) {
        return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId),
                () -> DefaultActorService.TENANT_DISPATCHER_NAME,
                () -> new TenantActor.ActorCreator(systemContext, tenantId));
    }

thingsboard源码解析(1)--Actor系统流程
image.png

本文章来源于互联网,如有侵权,请联系删除!原文地址:thingsboard源码解析(1)–Actor系统流程

相关推荐: Thingsboard改包名为IoT

1、修改pom thingsboard源码:thingsboardapplicationpom.xml UTF-8 ${basedir}/.. thingsboard /var/log/${pkg.name} /usr/share/${pkg.name} ${…