当前位置: 首页 > news >正文

成人高考可以报考哪些大学网站seo关键词排名优化

成人高考可以报考哪些大学,网站seo关键词排名优化,公司网站开发建设什么会计科目,甘肃两学一做网站zeebe actor 模型🙋‍♂️ 如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。 环境⛅ zeebe release-8.1.14 actor.run() 是怎么开始的🌈 Lon…

zeebe actor 模型🙋‍♂️

如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。

环境⛅

zeebe release-8.1.14

actor.run() 是怎么开始的🌈

zeebe actor 模型

LongPollingActivateJobsHandler.java

以LongPollingActivateJobsHandler 的激活任务方法为例,我们可以看到run 方法实际上执行ActorControl类的run 方法,让我们进到run 方法中。

	private ActorControl actor;public void activateJobs(final InflightActivateJobsRequest request) {actor.run(() -> {final InFlightLongPollingActivateJobsRequestsState state =getJobTypeState(request.getType());if (state.shouldAttempt(failedAttemptThreshold)) {activateJobsUnchecked(state, request);} else {completeOrResubmitRequest(request, false);}});}

ActorControl

可以看到scheduleRunnable 的目标是构造ActorJob,然后将job 添加到ActorTask 中,添加的方式分为insert 和submit。其实到这里我们就可以认为actor.run 就已经结束了,因为insert 和submit 方法主要就是将job 添加到task 的jobQueues 中,对于job 的执行要等到队列不断被线程pop 到当前job 的时候。

	final ActorTask task;@Overridepublic void run(final Runnable action) {scheduleRunnable(action);}private void scheduleRunnable(final Runnable runnable) {final ActorThread currentActorThread = ActorThread.current();if (currentActorThread != null && currentActorThread.getCurrentTask() == task) {final ActorJob newJob = currentActorThread.newJob();newJob.setRunnable(runnable);newJob.onJobAddedToTask(task);// 插入到执行队列task.insertJob(newJob);} else {final ActorJob job = new ActorJob();job.setRunnable(runnable);job.onJobAddedToTask(task);// 提交到外部队列// submit 实际上是将task 放到thread group 里边task.submit(job);}}

job 是怎么被执行的⚡

并不是任意一个ActorControl 都可以执行run 方法的,按照上图所示,Actor 会在broker 生命周期开始要进行注册 ,也就是说ActorControl 中的task 会注册到taskQueues。然后“线程池”不断从taskQueues 中pop 出task,每个task 中又会有多个job,按照策略选取不同的job 执行,我们可以认为job 就是actor.run(Runnable runnable) 中的runnable。

Gateway.java

gateway 注册task

  private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(final ActivateJobsHandler handler) {final var future = new CompletableFuture<ActivateJobsHandler>();final var actor =Actor.newActor().name("ActivateJobsHandler").actorStartedHandler(handler.andThen(t -> future.complete(handler))).build();// 将task 注册到TaskQueuesactorSchedulingService.submitActor(actor);return future;}

ActorThreadGroup.java

就是上面提到的“线程池”,负责初始化每一条ActorThread 线程,并为其分配默认的WorkStealingGroup

	protected final String groupName;protected final ActorThread[] threads;protected final WorkStealingGroup tasks;protected final int numOfThreads;// 构造器,初始化每条线程,并为其分配一个默认的WorkStealingGroup 任务队列public ActorThreadGroup(final String groupName, final int numOfThreads, final ActorSchedulerBuilder builder) {this.groupName = groupName;this.numOfThreads = numOfThreads;tasks = new WorkStealingGroup(numOfThreads);threads = new ActorThread[numOfThreads];for (int t = 0; t < numOfThreads; t++) {final String threadName = String.format("%s-%d", groupName, t);final ActorThread thread =builder.getActorThreadFactory().newThread(threadName,t,this,tasks,builder.getActorClock(),builder.getActorTimerQueue(),builder.isMetricsEnabled());threads[t] = thread;}}// startpublic void start() {for (final ActorThread actorThread : threads) {// 启动每一个ActorThreadactorThread.start();}}

ActorThread.java

ActorThread 继承自Thread,可以看到start=>run=>doWork 的引用流程,在doWork 方法中,首先从taskScheduler 中获取当前task,然后执行当前task

// 继承自Thread @Overridepublic synchronized void start() {if (UNSAFE.compareAndSwapObject(this, STATE_OFFSET, ActorThreadState.NEW, ActorThreadState.RUNNING)) {// super.start 会执行下面的run 方法super.start();} else {throw new IllegalStateException("Cannot start runner, not in state 'NEW'.");}}// 主要执行doWork 方法@Overridepublic void run() {idleStrategy.init();while (state == ActorThreadState.RUNNING) {try {doWork();} catch (final Exception e) {LOG.error("Unexpected error occurred while in the actor thread {}", getName(), e);}}state = ActorThreadState.TERMINATED;terminationFuture.complete(null);}private void doWork() {submittedCallbacks.drain(this);if (clock.update()) {timerJobQueue.processExpiredTimers(clock);}// 从taskScheduler 中获取当前taskcurrentTask = taskScheduler.getNextTask();if (currentTask != null) {final var actorName = currentTask.actor.getName();try (final var timer = actorMetrics.startExecutionTimer(actorName)) {// 执行当前任务executeCurrentTask();}if (actorMetrics.isEnabled()) {actorMetrics.updateJobQueueLength(actorName, currentTask.estimateQueueLength());actorMetrics.countExecution(actorName);}} else {idleStrategy.onIdle();}}private void executeCurrentTask() {final var properties = currentTask.getActor().getContext();MDC.setContextMap(properties);idleStrategy.onTaskExecuted();boolean resubmit = false;try {// 真正执行当前任务resubmit = currentTask.execute(this);} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);LOG.error("Unexpected error occurred in task {}", currentTask, e);} finally {MDC.remove("actor-name");clock.update();}if (resubmit) {currentTask.resubmit();}}

ActorTask.java

ActorTask 的执行流程,它会不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll(),从这里可以看到submittedJobs 和fastlaneJobs 的区别!

找到job 后开始执行当前job

public boolean execute(final ActorThread runner) {schedulingState.set(TaskSchedulingState.ACTIVE);boolean resubmit = false;// 不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll()while (!resubmit && (currentJob != null || poll())) {currentJob.execute(runner);switch (currentJob.schedulingState) {case TERMINATED -> {final ActorJob terminatedJob = currentJob;// 从fastlaneJobs任务集合中拉取任务currentJob = fastLaneJobs.poll();// 如果是通过订阅触发的任务if (terminatedJob.isTriggeredBySubscription()) {final ActorSubscription subscription = terminatedJob.getSubscription();// 如果订阅是一次性的,那么在订阅触发后则将订阅移除if (!subscription.isRecurring()) {removeSubscription(subscription);}// 执行订阅的回调任务subscription.onJobCompleted();} else {runner.recycleJob(terminatedJob);}}case QUEUED ->// the task is experiencing backpressure: do not retry it right now, instead re-enqueue// the actor task.// this allows other tasks which may be needed to unblock the backpressure to runresubmit = true;default -> {}}if (shouldYield) {shouldYield = false;resubmit = currentJob != null;break;}}if (currentJob == null) {resubmit = onAllJobsDone();}return resubmit;}private boolean poll() {boolean result = false;result |= pollSubmittedJobs();result |= pollSubscriptions();return result;}

ActorJob.java

ActorJob 的执行逻辑

还记得上面说过ActorJob 可以理解为runnable 的吗,在invoke 中ActorJob 中的runnable 真正执行了,至此job 的执行过程结束

	void execute(final ActorThread runner) {actorThread = runner;observeSchedulingLatency(runner.getActorMetrics());try {// 执行actor 的 callable 或者 runnable 方法invoke();if (resultFuture != null) {resultFuture.complete(invocationResult);resultFuture = null;}} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);task.onFailure(e);} finally {actorThread = null;// 无论那种情况,成功或者失败,都要判断是否job 应该被resubmitted// in any case, success or exception, decide if the job should be resubmittedif (isTriggeredBySubscription() || runnable == null) {schedulingState = TaskSchedulingState.TERMINATED;} else {schedulingState = TaskSchedulingState.QUEUED;scheduledAt = System.nanoTime();}}}private void invoke() throws Exception {if (callable != null) {invocationResult = callable.call();} else {// only tasks triggered by a subscription can "yield"; everything else just executes onceif (!isTriggeredBySubscription()) {final Runnable r = runnable;runnable = null;r.run();} else {// runnable 真正执行runnable.run();}}}

总结📝

本文中的激活例子其实只是列举了Actor 的实现原理,想一想文中提到的功能用一个真正的线程池可以很好的解决。但是actor模型 的特性远不仅如此,对于其他特性在zeebe 中是如何实现的还请读者自己去挖掘🤏~

zeebe 团队真的是太喜欢functional programming了,找一个方法的调用链头都大了😅

http://www.yidumall.com/news/90179.html

相关文章:

  • wordpress资讯网站模板佛山网络公司 乐云seo
  • 大学生网站设计作品成品代码网络推广招聘
  • 重庆中小企业名录seo外包公司优化
  • 电器网站建设教你如何建立网站
  • 竭诚网络网站建设公司百度seo标题优化软件
  • wap网站建设策划方案网页优化包括
  • asp网站制作seo企业培训班
  • 静态网站需要服务器吗seo外包公司是啥
  • 怎么做诈骗网站成品短视频app下载有哪些
  • 站群cms系统软文代写发布网络
  • 广州易网外贸网站建设软文模板
  • 淘宝这种网站怎么做的微信运营方案
  • 建个网站多少钱app全球十大搜索引擎排名
  • 门户网站建设方案 ppt电商网站开发平台有哪些
  • 天津网站在哪里建设在线子域名二级域名查询工具
  • 万江仿做网站足球世界排名
  • wordpress优化思路深圳关键词推广整站优化
  • csharp网站开发推广之家
  • 网站pc和手机端分离怎么做河北seo推广公司
  • 做产品目录设计用什么网站好网站网络推广服务
  • dw做的网站设计网站排名查询
  • 直播做网站厦门谷歌seo
  • 网站用php做的吗浙江seo博客
  • 网站空间到期提示营销推广的形式包括
  • 陕西外贸英文网站建设关键词点击价格查询
  • 封面设计网站网站开通
  • 网站介绍视频怎么做的抚州seo排名
  • wps的ppt做网站超链接智能搜索引擎
  • 每个网站都有服务器吗seo的概念
  • 宝安公司网站建设比较好的重庆网络seo