分布式任务平台设计概要

任务

记录

从mq获取任务,再记录到job表,记录的信息有 funcName,args,createUser,priority,serviceName

读取

从job表读取任务

限流: 当待执行队列长度超missionQueueSize则等待readTimeDelayMillsWhenFlowControl后重新执行

限频: 每隔readInterval读取任务

执行

从待执行队列中取出任务发送给worker

并发:根据parallelConsumeLimit限制,同时发送任务

当无任务时等待get_new_mission事件触发

发送任务时不等待结果,然后修改job表中任务状态为E

完成

监听etcd完成任务的信息写入

按执行结果修改job表中任务状态,从missionInProcessMap中删除该任务

检查

定期检查执行中的任务状态,主要是执行时间

对于执行missionOutT重试超过限制ime超时的任务重新加入任务队列,重试超次数retryTimes的任务则在job表中标记为F,失败原因为重试超过限制

master

etcd

将任务完成信息同步到etcd中
优势:

  1. 主动通知任务完成,实现方便,无需master等待完成结果
  2. 避免进度丢失,单点问题
  3. 可以查询记录
  4. master选举
master选举

key为/mif/schedule/leader,当版本为0时创建并设置ttl,其他scheduler监听该key,当失效时重新选举

初始化

service/job/init

  1. 读取etcd中已存在的任务(已完成master未处理)
  2. 从job表载入正在执行的任务,加入missionInProcessMap
  3. 启动监听etcd任务完成put事件
  4. 执行任务检查
  5. 执行任务完成
  6. 开始循环读取任务,检查任务,执行任务

发送任务

通过http post触发部署在函数计算中的worker

  • fc判断服务启动完成存在问题,在启动未完成时接受请求返回502

当请求返回此错误时,采取重试

1
2
3
4
5
if (response.data.errorMessage.indexOf('Process exited unexpectedly before completing request') !== -1) {
const { missionId, serviceName, funcName, args } = data;
app.logger.error(error);
return app.send(missionId, serviceName, funcName, args);
}

worker

部署在函数计算,节省资源,方便扩容,无需健康检查

接收任务与rpc处理逻辑类似不再重复
·

中间件finishMission

待执行完成后向etcd插入结果

1
2
3
4
5
6
7
await next();
if (!ctx.missionId) return;
await ctx.app.etcdMission.put('finish/' + ctx.missionId).value(JSON.stringify({
missionId: ctx.missionId,
time: Date.now(),
status: 'S',
}));