Scheduling in Akka with Quartz – a wise solution.
20 minutes reading
One of the most common challenges is when your Akka application is executing several tasks based on a scheduler (day, times, frequency etc) and you need to change some parameters or even the scheduler’s configuration. You have to stop your Akka application.
In our example, the scheduler’s configuration can be modified and the changes will be applied to Akka actors as well as the messages that these actors can receive. All this without stop our application. So what are we going to do in our example:
First of all I going to tell you my environment and what I want:
Why do not use linux cron + crontable and execute shell scripting files:
We have to deal with this scenario in which Akka libraries does not have any acceptable solution. The are several options on the market, some of them:
So in my case I going to use the last one, Akka-Quartz Scheduler, there are several reason, Apache Camel in my opinion is too complicated for just for use his Timer and related to Quartz have the same problem and too oriented to java community, so every kind of listener has to be implemented and we need to work in an akka environment.
I will not tell here what is better o worst option for my “akka cron environment”. Scheduler and Timers are a very complicated aspects in programming so if you want a right opinion you will need a deep benchmarking between the three of them. It is not the intention of this document.
My problem arise when I needed to execute several task following specific schedule (day/time) indefinitely for long term. This tasks should have different patterns and the schedule will vary definitely in time, the task that we execute today will be different than the task that I am going to execute next week indeed and the same with its schedule.
So I am going to tell you my environment and what I want:
Why do not use linux cron + crontable and execute shell scripting files:
This is the example that I bring to readers today:
Our project has a very important aspect:
1 2 3 4 5 6 7 8 9 |
quartz { defaultTimezone = "Europe/London" schedules { moviepages { description = "A cron job that fires off every month" expression = "0 30 2 2 * ? *" } } } |
code 1.0
See this reference to the CronExpression. Because the info is published every first day in the month I will check at the beginning of the second day every month. So this is an internal configuration ref: code 1.0 because it never change(I have to review every month for a schedule about the pages related to new films that will be added or updated)
The Second of them will indicate me when will be updated each page that has the information, during the whole month. This will be an external configuration file that indicate when will be updated or added any new page throughout the month. So that day that the page be updated or added our actor will collect information about it. Every month this external file has to be changed with another new schedule.
An example of my external file (cronmoviepages.conf) that you can find on my github is below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
## https://github.com/lightbend/config/blob/master/HOCON.md ## cronexpresssion = "Seconds Minutes Hours Day-of-month Month Day-of-Week Year(Optional)" schedule { defaultTimezone = "Europe/London" moviepagemejorenvo = [ { cronexpresssion = "0 4 2 20 9 ? *" moviepage = "40" } { cronexpresssion = "0 4 2 20 9 ? *" moviepage = "41" } ] } |
code 1.1
In code 1.1 cronexpression means when the actor has to update the associate page from the movie website. Internally the software will group all moviepage with the same cronexpression and will fire the jobs with each common group.
We have two main cores:
1 2 3 4 5 6 7 8 9 10 11 |
object BootScheduled extends App { //https://github.com/lambdista/config val system = ActorSystem(ConstantsObject.ScheduledIoTDaemon) // This actor will control the whole System ("IoTDaemon_Scheduled") val daemonScheduled = system.actorOf(IoTAdmin.props, ConstantsObject.ScheduledIoTDaemonProcessing) // This actor will control the reScheduling about time table for update new pages with films in original version val reeschedule = system.actorOf(ReSchedulingJob.props, ConstantsObject.DaemonReShedulingVoMoviePages) //Use system's dispatcher as ExecutionContext //QuartzSchedulerExtension is scoped to that ActorSystem and there will only ever be one instance of it per ActorSystem QuartzSchedulerExtension(system).schedule("moviepages", reeschedule, FireSchedule(daemonScheduled)) } |
code 1.3
In the previous code 1.3 on line 10 It will use the internal configuration from our application.conf ref. code 1.0. At the same time this line is responsible of every second day of the month at 2.30 am fire a job that will read the schedule a new scheduler ref. code 1.1(cronmoviepages.conf), the external file that can be read periodically and it indicates what jobs be re-scheduled every time with the new schedule.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
..................... cronExpreesionMatches.foreach( cronExpreesionMatch => { val (cronExpreesion, listOfMoviePages) = cronExpreesionMatch log.info( "Programming Schedule for cronexpression: {} including matches: {}", cronExpreesion,listOfMoviePages.mkString("-") ) val headList = listOfMoviePages.head // TODO it is important to kill all Scheduled jobs that has been created ONCE time that the work has done /** This code generate a warning because the reschedule job never exist * any more because headList used for named it never is the same*/ QuartzSchedulerExtension(system).rescheduleJob( s"Movie-Page-Scheduler$headList", scheduledDaemon, ParseUrlFilms(listOfMoviePages), Option("Scheduling "), cronExpreesion, None, defaultTimezone) } ) log.info("schedule get new MoviePages") } ..................... |
code 1.4
In the previous code 1.4 on line 16 we re-schedule an actor (IoTAdmin) to be fire following the external configuration and then it will launch concurrently so many actors(ProcessMoviePage) as many pages need to be updated. You can appreciate that in the code below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
........ class IoTAdmin extends Actor with ActorLogging with MessageToTargetActors{ override def preStart(): Unit = log.info("Start process in Akka Scheduler Example") override def postStop(): Unit = log.info("Stop process in Akka Schedule Example") // TODO checking perfromance: http://docs.scala-lang.org/overviews/collections/performance-characteristics.html var watched = Map.empty[ActorRef, Int] override def receive = { case ParseUrlFilms(urlPageNunmList, optActorRef) => urlPageNunmList.foreach( urlpagenumber => { val currentActorRef: ActorRef = context.actorOf(ProcessMoviePage.props) watched += currentActorRef -> urlpagenumber // watcher for every actor that is created 'cause the actor need to know when the process have finished context.watch(currentActorRef) // TODO urlspatterns<=varconf val processUrlFilmMessage = ProccessPage( s"http://mejorenvo.com/p${urlpagenumber}.html", optActorRef.fold(self)(ref=>ref) ) sendParserUrlMessage(currentActorRef, processUrlFilmMessage) }) ............. |
code 1.5
Every specific day that was configured in cronmoviepages.conf file the IoTAdmin actor will receive a ParseUrlFilms message to process the new page that should be updated.
This post and the project can be a base or skeleton for you if you want to create a process with the following specifications:
This is the idea of our cron but more in deep because some time we need to do more complex thing, external to our OS.
I have used Akka-Quartz Scheduler with docker container and it works pretty well. I have NOT tested the use of this libraries in an Akka Clustering Environment with the complexity that these kind of implementations entail. You can get access to Akka Documentation about to try of configure seed nodes on any PaaS and run it. You can do it manually but when you are working on a PaaS you need to do it automatically. The explanation in Akka Library documentation and specifically the reference to Cluster Bootstrap was not working at the time of writing this article. I will try to explain and implement it in next post.