上一篇简单分析了mqtt-transport的代码,说明了设备连接的流程,相对的设备状态上报的流程基本一致,所以就不再自己分析了,有需要的小伙伴可以自己去研究一下。
接下来准备探索core和rule-engine的源码
虽然官网上把它分成了两个独立的部分,但由于这两个服务之间有一些交叉的功能,所以被直接写在了一个服务里面,关键代码都在application这个服务里面。
这个module里面有两个启动项,分别是ThingsboardInstallApplication和ThingsboardServerApplication,其中ThingsboardInstallApplication这个启动项对应的是tb的安装服务,主要是初始化数据库相关的信息。而ThingsboardServerApplication则对应core和rule-engine服务。打包的时候会打包在一起,然后通过启动类的方式来显示的指定具体要启动的服务。
可以看到tb其实做的确实很到位,连数据库的初始化也包装成了一个服务,只需要通过启动参数的方式就可以完成整个数据库的初始化,而不需要自己去跑脚本,而且连数据库的升级也封装了进去,这个确实很赞!
说完了一堆废话,我们转回正题,开始分析core和rule-engine服务,先看一下大致的包结构,主要有一下这些包(config、utils之类的包就不深入研究了)
actors
controller
install
service
主要有一些难度的是actor这个包,所以这次就针对这个Actor模型来进行一次详细的分析。
Actor详解
Actor模型是TB处理消息的核心机制,具体的Actor模型的相关知识可以自行百度,这里这谈TB的Actor设计
1、由org.thingsboard.server.actors.service.DefaultActorService进行Actor系统的初始化
@PostConstruct
public void initActorSystem() {
log.info("Initializing actor system.");
actorContext.setActorService(this);
TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
system = new DefaultTbActorSystem(settings);
system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));
actorContext.setActorSystem(system);
appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
// 将appActor作为根消息传递的代理actor
actorContext.setAppActor(appActor);
TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));
// 将statsActor作为根统计数据的代理
actorContext.setStatsActor(statsActor);
log.info("Actor system initialized.");
}
2、初始化的时候会先创建一个Actor系统(org.thingsboard.server.actors.DefaultTbActorSystem)并设置他的一些线程池以及Actor系统的上下文(org.thingsboard.server.actors.ActorSystemContext)。
3、接下来会创建一个根Actor(org.thingsboard.server.actors.app.AppActor),并将其设置为Actor系统的根Actor,这个Actor是所有消息的入口,是其他所有Actor的Parent。
4、初始化完成之后,通过Spring的ApplicationReadyEvent事件,DefaultActorService会往DefaultTbActorSystem的根Actor的Mail里面发送一条AppInitMsg消息。
5、AppActor会从它的Mail里面把消息拿出来,通过process方法进行处理。
@Override
protected boolean doProcess(TbActorMsg msg) {
if (!ruleChainsInitialized) {
initTenantActors();
ruleChainsInitialized = true;
if (msg.getMsgType() != MsgType.APP_INIT_MSG) {
log.warn("Rule Chains initialized by unexpected message: {}", msg);
}
}
switch (msg.getMsgType()) {
case APP_INIT_MSG:
break;
case PARTITION_CHANGE_MSG:
ctx.broadcastToChildren(msg);
break;
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case QUEUE_TO_RULE_ENGINE_MSG:
onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
break;
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg, false);
break;
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg, true);
break;
default:
return false;
}
return true;
}
5、第一次收到消息的时候会先进行TenantActor的初始化,从数据库的tenant表里面拿出所有的数据,并进行加载,每一个Tenant对应一个TenantActor。
private void initTenantActors() {
log.info("Starting main system actor.");
try {
// This Service may be started for specific tenant only.
Optional isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
if (isolatedTenantId.isPresent()) {
Tenant tenant = systemContext.getTenantService().findTenantById(isolatedTenantId.get());
if (tenant != null) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId());
log.debug("Tenant actor created.");
} else {
log.error("[{}] Tenant with such ID does not exist", isolatedTenantId.get());
}
} else if (systemContext.isTenantComponentsInitEnabled()) {
//分页查询的方式加载租户信息,每个租户会对应一个TenantActor
PageDataIterable tenantIterator = new PageDataIterable(tenantService::findTenants, ENTITY_PACK_LIMIT);
boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
for (Tenant tenant : tenantIterator) {
if (isCore || (isRuleEngine && !tenant.isIsolatedTbRuleEngine())) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId());
log.debug("[{}] Tenant actor created.", tenant.getId());
}
}
}
log.info("Main system actor started.");
} catch (Exception e) {
log.warn("Unknown failure", e);
}
}
6、初始化TenantActor的时候同时会初始化其对应的RuleChainActor
@Override
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
log.info("[{}] Starting tenant actor.", tenantId);
try {
Tenant tenant = systemContext.getTenantService().findTenantById(tenantId);
if (tenant == null) {
cantFindTenant = true;
log.info("[{}] Started tenant actor for missing tenant.", tenantId);
} else {
// This Service may be started for specific tenant only.
Optional isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
if (isRuleEngineForCurrentTenant) {
try {
if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenant.isIsolatedTbRuleEngine())) {
log.info("[{}] Going to init rule chains", tenantId);
initRuleChains();
} else {
isRuleEngineForCurrentTenant = false;
}
} catch (Exception e) {
cantFindTenant = true;
}
}
log.info("[{}] Tenant actor started.", tenantId);
}
} catch (Exception e) {
log.warn("[{}] Unknown failure", tenantId, e);
// TODO: throw this in 3.1?
// throw new TbActorException("Failed to init actor", e);
}
}
protected void initRuleChains() {
for (RuleChain ruleChain : new PageDataIterable(link -> ruleChainService.findTenantRuleChains(tenantId, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
RuleChainId ruleChainId = ruleChain.getId();
log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
visit(ruleChain, actorRef);
log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
}
}
7、由于一个Tenant对应多个Rule Chain,并且有一个RootChain,所以TenantActor初始化的时候会将其RootChain赋给rootChain,并将RuleChainActor的Mail代理给到rootChainActor。
8、RuleChainActor初始化的时候同时会初始化其对应的RuleChainActorMessageProcessor,RuleChainActorMessageProcessor会负责消息的具体处理。
9、RuleChainActorMessageProcessor初始化的时候会初始化RuleChain对应的RuleNodeActor,主要通过rule_node 和 relation这两张表中的数据进行排序,每一个RuleNode都会对应一个RuleNodeActor。
@Override
public void start(TbActorCtx context) {
if (!started) {
RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
if (ruleChain != null) {
List ruleNodeList = service.getRuleChainNodes(tenantId, entityId);
log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
// Creating and starting the actors;
for (RuleNode ruleNode : ruleNodeList) {
log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
//RuleNodeCtx对象包含了self这个对rule chain的Mail引用,所有的rule node可以通过self的方式将数据给到RuleChainActor
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
initRoutes(ruleChain, ruleNodeList);
started = true;
}
} else {
onUpdate(context);
}
}
private void initRoutes(RuleChain ruleChain, List ruleNodeList) {
nodeRoutes.clear();
// Populating the routes map;
for (RuleNode ruleNode : ruleNodeList) {
List relations = service.getRuleNodeRelations(TenantId.SYS_TENANT_ID, ruleNode.getId());
log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());
if (relations.size() == 0) {
nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
} else {
for (EntityRelation relation : relations) {
log.trace("[{}][{}][{}] Processing rule node relation [{}]", tenantId, entityId, ruleNode.getId(), relation.getTo());
//校验rule node与relation中的是否对应
if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
if (ruleNodeCtx == null) {
throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
}
}
nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList())
.add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
}
}
}
firstId = ruleChain.getFirstRuleNodeId();
firstNode = nodeActors.get(firstId);
state = ComponentLifecycleState.ACTIVE;
}
10、RuleNodeActor初始化的时候会创建对应的RuleNodeActorMessageProcessor,RuleNodeActorMessageProcessor初始化的时候会通过反射的方式拿到RuleNode对应的TbNode接口实现类,消息实际会通过这个实现类来进行处理。
private TbNode initComponent(RuleNode ruleNode) throws Exception {
TbNode tbNode = null;
if (ruleNode != null) {
Class> componentClazz = Class.forName(ruleNode.getType());
tbNode = (TbNode) (componentClazz.newInstance());
tbNode.init(defaultCtx, new TbNodeConfiguration(ruleNode.getConfiguration()));
}
return tbNode;
}
至此,Actor系统就初始化完成了,整理一下可以得到这样的一张图,左边是流程图,右边是结构图
Actor消息处理详解
这里以设备状态上报为例来进行分析
1、org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService.launchConsumer()方法接收到来自Transport的消息,并将消息丢到Actor系统上下文ActorSystemContext中。
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
QueueToRuleEngineMsg msg;
ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
Set relationTypes = null;
if (relationTypesList != null) {
if (relationTypesList.size() == 1) {
relationTypes = Collections.singleton(relationTypesList.get(0));
} else {
relationTypes = new HashSet(relationTypesList);
}
}
msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());
actorContext.tell(msg);
}
2、ActorSystemContext把数据发送到根Actor(即AppActor)的邮箱中。
public void tell(TbActorMsg tbActorMsg) {
appActor.tell(tbActorMsg);
}
3、AppActor从邮箱中收到消息然后通过process方法进行处理,根据消息类型,会找到对应设备的TenantActor并将消息投递到期邮箱中进行下一步的处理。
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (SYSTEM_TENANT.equals(msg.getTenantId())) {
msg.getTbMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
} else {
if (!deletedTenants.contains(msg.getTenantId())) {
getOrCreateTenantActor(msg.getTenantId()).tell(msg);
} else {
msg.getTbMsg().getCallback().onSuccess();
}
}
}
4、TenantActor从邮箱中拿到数据并通过process方法进行处理,他会首先去寻找其root chain actor,并将消息投递到Rule Chain Actor对应的TbActorMailbox邮箱中进行处理。
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (!isRuleEngineForCurrentTenant) {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
return;
}
TbMsg tbMsg = msg.getTbMsg();
if (tbMsg.getRuleChainId() == null) {
if (getRootChainActor() != null) {
getRootChainActor().tell(msg);
} else {
tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));
log.info("[{}] No Root Chain: {}", tenantId, msg);
}
} else {
try {
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
} catch (TbActorNotRegisteredException ex) {
log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId());
//TODO: 3.1 Log it to dead letters queue;
tbMsg.getCallback().onSuccess();
}
}
}
5、RuleChainActor从邮箱中拿到数据之后通过process方法将消息丢给它的RuleChainActorMessageProcessor进行处理。
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case QUEUE_TO_RULE_ENGINE_MSG:
processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
.....
6、RuleChainActorMessageProcessor接收到数据之后,会先确认有没有指定处理的node,如果没有就将其给到这个RuleChain对应的第一个RuleNodeActor进行处理。
void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
TbMsg msg = envelope.getTbMsg();
log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);
if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
try {
checkActive(envelope.getTbMsg());
//从 rule chain对象中获取到第一个rule node的id
RuleNodeId targetId = msg.getRuleNodeId();
RuleNodeCtx targetCtx;
//正常情况下没有指定的话会是个null
if (targetId == null) {
targetCtx = firstNode;
msg = msg.copyWithRuleChainId(entityId);
} else {
targetCtx = nodeActors.get(targetId);
}
if (targetCtx != null) {
log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId);
pushMsgToNode(targetCtx, msg, "");
} else {
//当rule node 不存在的时候直接返回成功结果
log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId);
msg.getCallback().onSuccess();
}
} catch (RuleNodeException rne) {
envelope.getTbMsg().getCallback().onFailure(rne);
} catch (Exception e) {
envelope.getTbMsg().getCallback().onFailure(new RuleEngineException(e.getMessage()));
}
} else {
onTellNext(envelope.getTbMsg(), envelope.getTbMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());
}
}
7、RuleNodeActor从邮箱中收到消息之后会通过process方法,将数据发到RuleNodeActorMessageProcessor进行进一步的处理。
8、RuleNodeActorMessageProcessor会将消息丢给TbNode的实现类进行数据的最后处理。
void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
checkActive(msg.getMsg());
if (ruleNode.isDebugMode()) {
systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
}
try {
tbNode.onMsg(msg.getCtx(), msg.getMsg());
} catch (Exception e) {
msg.getCtx().tellFailure(msg.getMsg(), e);
}
}
9、TbNode的实现类处理完之后会调用DefaultTbContext,告知处理结果,然后会根据返回结果重新拼装消息,并将消息传递回RuleChainActor。
private void tellNext(TbMsg msg, Set relationTypes, Throwable th) {
if (nodeCtx.getSelf().isDebugMode()) {
relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
}
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
}
10、RuleChainActor接收到消息,由于消息类型发生了变更,所有会调用其对应的RuleChainActorMessageProcessor的onTellNext方法进行下一步处理。
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case QUEUE_TO_RULE_ENGINE_MSG:
processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
break;
//消息从rule node发往rule chain中的下一个 rule node
//rule node会通过 DefaultTbContext 获取rule chain中的下一个node,并将消息进行投递
case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
break;
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
break;
case PARTITION_CHANGE_MSG:
processor.onPartitionChangeMsg((PartitionChangeMsg) msg);
break;
case STATS_PERSIST_TICK_MSG:
onStatsPersistTick(id);
break;
default:
return false;
}
return true;
}
11、RuleChainActorMessageProcessor会根据rule node的关联关系找到下一个处理消息的RuleNodeActor,当没有找到的时候则说明这条数据已经处理完毕。如果可以找到则丢到pushToTarget()这个方法进行处理。
private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set relationTypes, String failureMessage) {
try {
checkActive(msg);
EntityId entityId = msg.getOriginator();
TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
//根据 relationTypes过滤出下一个target
List relations = nodeRoutes.get(originatorNodeId).stream()
.filter(r -> contains(relationTypes, r.getType()))
.collect(Collectors.toList());
int relationsCount = relations.size();
if (relationsCount == 0) {
log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
if (relationTypes.contains(TbRelationTypes.FAILURE)) {
RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
if (ruleNodeCtx != null) {
msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
} else {
log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
}
} else {
msg.getCallback().onSuccess();
}
} else if (relationsCount == 1) {
for (RuleNodeRelation relation : relations) {
log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
pushToTarget(tpi, msg, relation.getOut(), relation.getType());
}
} else {
MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());
log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relations);
for (RuleNodeRelation relation : relations) {
EntityId target = relation.getOut();
putToQueue(tpi, msg, callbackWrapper, target);
}
}
} catch (RuleNodeException rne) {
msg.getCallback().onFailure(rne);
} catch (Exception e) {
msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
}
}
12、服务下一个target的类型是RULE_NODE则将消息发送给它,如果是RULE_CHAIN则重新组装消息,并通知RuleChainActor对应的TenantActor进行处理。
private void pushToTarget(TopicPartitionInfo tpi, TbMsg msg, EntityId target, String fromRelationType) {
if (tpi.isMyPartition()) {
switch (target.getEntityType()) {
case RULE_NODE:
pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);
break;
case RULE_CHAIN:
parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType));
break;
}
} else {
putToQueue(tpi, msg, new TbQueueTbMsgCallbackWrapper(msg.getCallback()), target);
}
}
13、TenantActor收到来自RuleChainActor发来的消息之后会寻找下一个RuleChainActor来处理消息。
private void onRuleChainMsg(RuleChainAwareMsg msg) {
getOrCreateActor(msg.getRuleChainId()).tell(msg);
}
至此整个消息的处理流程大致梳理了一下,至于消息在TenantActor、RuleChainActor和RuleNodeActor之间多次的流转就不再赘述了。
有空会补一张流程图再进行梳理一遍。
来源:https://www.jianshu.com/p/4d8af40b3f93