[00:00.000 --> 00:13.000] Let's write in, we don't have much time. [00:13.000 --> 00:14.000] My name is Paul Mazurel. [00:14.000 --> 00:18.200] I go by the name of Filmi Coton on Mastonon on Twitter. [00:18.200 --> 00:22.400] I've been a rest developer since 2016. [00:22.400 --> 00:27.880] I spent most of my career as a search developer, so it was only natural for me as my first [00:27.880 --> 00:31.880] pet project to develop a library to build search engines. [00:31.880 --> 00:35.880] So if you're familiar with Lucine, that's kind of like Lucine, but for the rest of the world. [00:35.880 --> 00:37.880] That was my first pet project. [00:37.880 --> 00:38.880] It grew. [00:38.880 --> 00:53.880] And two years ago, I co-founded a startup called Quit Quit, that is about building a search engine that is specialized for logs. [00:53.880 --> 00:55.880] Just a word about Quit Quit. [00:55.880 --> 00:57.880] I'm not here to do an advertisement. [00:57.880 --> 01:04.880] It's not a commercial, but it's related to the tool, so I need to explain to you what's the problem, which is the problem that we are trying to solve. [01:04.880 --> 01:07.880] Quit Quit is a Rust project. [01:07.880 --> 01:10.880] It's under the AGPL license. [01:10.880 --> 01:11.880] It's open source. [01:11.880 --> 01:15.880] You can find the source code on GitHub. [01:15.880 --> 01:18.880] And so we specialize on log search. [01:18.880 --> 01:28.880] So what's specific about log search compared to, let's say, an e-commerce search engine is that the data that we get is more or less immutable. [01:28.880 --> 01:35.880] So we assume that people will want to ingest documents into our search engine and won't modify it too much. [01:35.880 --> 01:44.880] So after ingestion, the document stays there until it goes out of its retention period, at which point we will delete it. [01:44.880 --> 01:50.880] Or maybe you might want to delete it if you have a request to comply to GDPR. [01:50.880 --> 01:57.880] We handle that kind of stuff, but you cannot modify it like you would do for any e-commerce website. [01:57.880 --> 02:09.880] And so one of the big differences in terms of efficiency is that when you deal with log search, the volume that you have to deal with has no limits. [02:09.880 --> 02:17.880] The largest amount that we've seen so far is people indexing 100 terabytes a day. [02:17.880 --> 02:19.880] So that's the volume of data. [02:19.880 --> 02:23.880] Imagine if it was actually generated not by machine but by humans. [02:23.880 --> 02:28.880] You would have to have a lot of people typing grief as to deal with that kind of volume. [02:28.880 --> 02:32.880] So that's something that you will only get if you're doing log search. [02:32.880 --> 02:43.880] And compared to any e-commerce website, most of the CPU is actually spent indexing and not searching because you have comparatively way less search and way more documents. [02:43.880 --> 02:51.880] So indexing is actually crucial to our problems, and that's very different from usual search engines. [02:51.880 --> 02:54.880] Indexing, what does it look like? [02:54.880 --> 02:56.880] That's the problem that we are trying to solve. [02:56.880 --> 02:58.880] Super oversimplified. [02:58.880 --> 03:01.880] We get, as an input, a stream of documents. [03:01.880 --> 03:05.880] It's interesting to have another idea for one pipeline of indexing. [03:05.880 --> 03:10.880] We have to deal with around 40 megabytes per second. [03:10.880 --> 03:17.880] And as an output, every 30 seconds, we write a file. [03:17.880 --> 03:29.880] We put it somewhere, and usually we register it on some metadata backends. [03:29.880 --> 03:32.880] And at this point, the file is searchable. [03:32.880 --> 03:37.880] And the rules of the game here is we want to have the highest possible throughput. [03:37.880 --> 03:42.880] And we want to keep what we call time to search as low as possible. [03:42.880 --> 03:50.880] So time to search is at the moment when JSON file is entering the system, we start the clock, [03:50.880 --> 03:55.880] and we measure how long it takes for it to go out of the system in the form of one of those files, [03:55.880 --> 03:59.880] at which point it is searchable. We need that to be as low as possible, [03:59.880 --> 04:03.880] and we need, it's very important to keep it very stable. [04:03.880 --> 04:08.880] We don't want to have, like, a period of time where it goes through the roof. [04:08.880 --> 04:11.880] So that's the whole game. [04:11.880 --> 04:13.880] And in that black box, we do a lot of stuff. [04:13.880 --> 04:16.880] I was voluntarily very simple. [04:16.880 --> 04:18.880] I won't go through all of the stuff that we do, [04:18.880 --> 04:24.880] but the important part here is every single of those steps is using different resources. [04:24.880 --> 04:26.880] The time is spent on different stuff. [04:26.880 --> 04:30.880] So, for instance, when we index things, when we build or in memory index, [04:30.880 --> 04:31.880] we are spending CPU. [04:31.880 --> 04:33.880] When we are writing the file, we use IO. [04:33.880 --> 04:39.880] When we upload with network, and sometimes we are waiting for something that is outside of the system. [04:39.880 --> 04:43.880] We spend no resources at all except we wait. [04:43.880 --> 04:47.880] So you might think that the implementation is obvious. [04:47.880 --> 04:51.880] We have one function for all of those steps, and we call them sequentially. [04:51.880 --> 04:55.880] But if you do that, you are wasting the resources of our system, of course. [04:55.880 --> 05:00.880] For instance, when you are uploading, you are spending your network resource, [05:00.880 --> 05:05.880] but your CPU is not doing anything, so you're wasting money. [05:05.880 --> 05:08.880] So the solution to this problem is relatively simple, [05:08.880 --> 05:10.880] but it's not that simple to implement. [05:10.880 --> 05:13.880] You want to streamline your pipeline. [05:13.880 --> 05:19.880] What I mean by streamlining in a very concrete way is you might have two steps, [05:19.880 --> 05:24.880] like indexing, spending CPU, upload, spending network. [05:24.880 --> 05:30.880] They go sequentially, but what you want is you want indexing to work on building the first file, [05:30.880 --> 05:34.880] and when it has finished, you start uploading, of course. [05:34.880 --> 05:37.880] But as you upload, you want to start working on the second file, [05:37.880 --> 05:42.880] so that you are spending CPU while you are doing your network stuff. [05:42.880 --> 05:46.880] That's what the kind of behavior that we want. [05:46.880 --> 05:49.880] And of course, this example is a little bit too simple [05:49.880 --> 05:52.880] because the second step here is shorter than the first one. [05:52.880 --> 05:55.880] It's a nice case, but if it was the other way around, [05:55.880 --> 06:03.880] we would have to have some kind of mechanism to deal with, to have back pressure. [06:03.880 --> 06:08.880] And in my experience, a lot of very good engineers are not familiar with the concept of back pressure, [06:08.880 --> 06:11.880] so let me explain what it is about. [06:11.880 --> 06:20.880] If you already know what it is, bear with me and enjoy the fine artworks that we have here. [06:20.880 --> 06:27.880] So the idea of back pressure is, imagine you are cleaning dishes with a friend. [06:27.880 --> 06:33.880] One of you is cleaning the plates, and the other one is wiping them dry. [06:33.880 --> 06:41.880] And the person wiping the dishes dry is a little bit too slow compared to the person cleaning the dishes. [06:41.880 --> 06:45.880] What's going to happen is that your plates will accumulate like forever. [06:45.880 --> 06:48.880] And in the computer system, it's a very common problem, [06:48.880 --> 06:51.880] and that's how you get out of memory errors. [06:51.880 --> 06:53.880] So the solution is rather simple. [06:53.880 --> 06:58.880] You need back pressure, which means you need some way to signal the person who is cleaning too fast [06:58.880 --> 07:00.880] that they should slow down. [07:00.880 --> 07:07.880] And the simplest mechanism to do this is you need to have some kind of limit on your stack of plates [07:07.880 --> 07:12.880] or your work queue or whatever you are using in your system. [07:12.880 --> 07:15.880] And then they stop once they reach the limit. [07:15.880 --> 07:18.880] So it's the simplest way you could have back pressure. [07:18.880 --> 07:26.880] So with all this said, the game here is how would you implement that? [07:26.880 --> 07:33.880] What would be your go-to implementation if you had to have such a system in one hour? [07:33.880 --> 07:41.880] And being Rust developers, I think that most of the people in this room will come with the following solution. [07:41.880 --> 07:48.880] So the upload part, it's not CPUV, it's just dealing with networks. [07:48.880 --> 07:54.880] So it's very natural to think, OK, I'm going to do that in a Tokyo task. [07:54.880 --> 07:56.880] And back pressure, that's easy. [07:56.880 --> 07:59.880] I already know what I'm going to use. [07:59.880 --> 08:04.880] I feel that the good solution is I'm going to have a channel with some capacity. [08:04.880 --> 08:10.880] And once it reaches capacity, of course, people sending work to my task, they will have to wait. [08:10.880 --> 08:15.880] It's going to be very nice and natural. [08:15.880 --> 08:20.880] And then on the indexing part, we will have the same mechanism. [08:20.880 --> 08:26.880] And only right now, we will have to actually do a lot of CPU heavy work. [08:26.880 --> 08:31.880] So maybe we won't run that in a Tokyo task and we will spawn our own thread [08:31.880 --> 08:34.880] or maybe we will use a thread pool to do that work. [08:34.880 --> 08:36.880] It would be better, right? [08:36.880 --> 08:37.880] And we use the same mechanism. [08:37.880 --> 08:39.880] We will have a channel to receive the work. [08:39.880 --> 08:44.880] The capacity here is much larger because the type of stuff that we put in the channel is very different. [08:44.880 --> 08:50.880] For the uploader, we are getting files, possibly they can be large, like 100 megabytes. [08:50.880 --> 08:53.880] So it makes sense to have it as small as possible. [08:53.880 --> 08:54.880] Here, it's documents. [08:54.880 --> 08:59.880] So it's for many documents, you will emit one file. [08:59.880 --> 09:04.880] You won't probably have capacity that this is larger than three. [09:04.880 --> 09:09.880] And yeah, everything is fine and handy. [09:09.880 --> 09:13.880] It's quite natural. [09:13.880 --> 09:16.880] So we just reinvented Actors. [09:16.880 --> 09:19.880] That's basically what Actors are. [09:19.880 --> 09:31.880] So Actors is a programming paradigm that has been invented in the 70s by a researcher called Karlewit. [09:31.880 --> 09:35.880] It has been popularized more recently with Erlong. [09:35.880 --> 09:39.880] And the actual formal definition is here. [09:39.880 --> 09:42.880] It's from Karlewit himself. [09:42.880 --> 09:47.880] And I'm going to read it even if it's a little bit weird to read slides out loud. [09:47.880 --> 09:49.880] This one is important. [09:49.880 --> 09:55.880] So an actor is a computational entity that, in response to a message it receives, can concurrently. [09:55.880 --> 09:59.880] A, send a finite number of messages to other Actors. [09:59.880 --> 10:01.880] We've done that. [10:01.880 --> 10:06.880] Two, create finite numbers of new Actors. [10:06.880 --> 10:09.880] We haven't been spawning any Actors in our example yet, [10:09.880 --> 10:15.880] but we do that in quick reads as something that we do especially for supervision or spawning a pipeline or stuff like that. [10:15.880 --> 10:18.880] So we do that. [10:18.880 --> 10:23.880] C, designates behavior to be used for the next message it receives. [10:23.880 --> 10:26.880] That one is a little bit fuzzy. [10:26.880 --> 10:27.880] Do we do this? [10:27.880 --> 10:29.880] No, we definitely don't. [10:29.880 --> 10:33.880] But if you water it down and you squint a little, [10:33.880 --> 10:39.880] the fact that the Actors have actually a state, [10:39.880 --> 10:45.880] and the whole point of having this Actors running on a specific thread is that [10:45.880 --> 10:50.880] it will be possible to handle a message and mutate our state. [10:50.880 --> 11:00.880] And mutating our state is a bit like designating the behavior that it will be able to use for the next message. [11:00.880 --> 11:06.880] So we ended up building our own Actors framework. [11:06.880 --> 11:12.880] So to be honest, I'm not trying to advertise for this framework. [11:12.880 --> 11:14.880] It's under the AGPL license, so you can use it. [11:14.880 --> 11:15.880] You're free to use it. [11:15.880 --> 11:19.880] If you want to take over it for kids and make it better, [11:19.880 --> 11:21.880] I'd be happy for us to use it. [11:21.880 --> 11:25.880] If you want to use it as is and you would like to have it as a MIT license, [11:25.880 --> 11:31.880] I'm perfectly happy to put it under MIT license as well, actually. [11:31.880 --> 11:35.880] But right now, it's not redesigned to be reused by other people. [11:35.880 --> 11:39.880] But I think I might be able to tell you about our journey [11:39.880 --> 11:45.880] and maybe it could inspire other people to write their own framework. [11:45.880 --> 11:49.880] There are actually a lot of Actors frameworks in Rust. [11:49.880 --> 11:56.880] This one is Actix, there are many others. [11:56.880 --> 12:00.880] So under our Actors framework, what does an Actor look like [12:00.880 --> 12:03.880] compared to our original snippets? [12:03.880 --> 12:05.880] Yeah, it looks like this. [12:05.880 --> 12:11.880] So I implemented the uploader there. [12:11.880 --> 12:14.880] So you have to implement first a trait called Actor [12:14.880 --> 12:18.880] where you will define a bunch of small properties about your Actor, [12:18.880 --> 12:20.880] especially the capacity that we have seen before. [12:20.880 --> 12:24.880] So it will be like the capacity of the channel that we described before. [12:24.880 --> 12:30.880] And then you will have to implement a handler trait [12:30.880 --> 12:33.880] for each type of message that you deal with. [12:33.880 --> 12:38.880] So, contrary to our example, now we can deal with several types of messages. [12:38.880 --> 12:42.880] Same Actor can receive different kind of requests, if you will. [12:42.880 --> 12:48.880] Another difference is most of the time you want to communicate with [12:48.880 --> 12:50.880] an Actor in an asynchronous way. [12:50.880 --> 12:54.880] That's how the Actor pattern is working usually. [12:54.880 --> 12:58.880] But sometimes it's handy to actually have some kind of reply [12:58.880 --> 13:00.880] when you do a request. [13:00.880 --> 13:03.880] It's a bummer because if you really want a reply, [13:03.880 --> 13:07.880] that means that you will be waiting for the Actor to process [13:07.880 --> 13:12.880] the entire queue and execute your command and then return the result. [13:12.880 --> 13:15.880] But you don't have to use it. [13:15.880 --> 13:23.880] Most of our messages don't use it, but when we need it, it's handy. [13:23.880 --> 13:26.880] And our indexer, it's about the same. [13:26.880 --> 13:32.880] The one thing that I want to point out is this thing on the Actor trait [13:32.880 --> 13:39.880] where we specialized what should be returned on the method RuntimeHandle. [13:39.880 --> 13:43.880] So, you remember that in our example, we said that an indexer [13:43.880 --> 13:44.880] spends a lot of CPU. [13:44.880 --> 13:47.880] We wanted to run it on a dedicated thread. [13:47.880 --> 13:49.880] We don't do that here. [13:49.880 --> 13:54.880] What we do is we target a specific Tokyo runtime, which is weird. [13:54.880 --> 13:57.880] So, that's an implementation shortcut that we used [13:57.880 --> 14:02.880] instead of running stuff on a thread pool. [14:02.880 --> 14:05.880] What we do is that we have several Tokyo runtime [14:05.880 --> 14:13.880] and we have a Tokyo runtime that is dedicated to act as a thread pool. [14:13.880 --> 14:17.880] The benefit of this is this implementation shortcut [14:17.880 --> 14:20.880] gives us the possibility to write exactly the same code [14:20.880 --> 14:24.880] for an Async Actor or Async Actor, which is neat. [14:24.880 --> 14:27.880] And by the way, we're not the only one to use that trick. [14:27.880 --> 14:36.880] InflexDB actually wrote a blog post about this a little bit of time ago. [14:36.880 --> 14:39.880] Now that we have a framework, you might have noticed [14:39.880 --> 14:41.880] that the code was not even shorter. [14:41.880 --> 14:45.880] We have seen a couple of features that we got from the framework, [14:45.880 --> 14:48.880] but what can we get like more? [14:48.880 --> 14:51.880] Within QuickWit, we have 25 different actors. [14:51.880 --> 14:55.880] What's cool when you have a framework is that you code stuff once [14:55.880 --> 14:58.880] and the benefit is multiplied by 25. [14:58.880 --> 15:00.880] What could be the benefit? [15:00.880 --> 15:04.880] So, hopefully, we get better structure and code [15:04.880 --> 15:06.880] that is a little bit more readable. [15:06.880 --> 15:09.880] People open up files and they know what to expect. [15:09.880 --> 15:12.880] I want to know what is the capacity associated to the cure of this actor. [15:12.880 --> 15:14.880] Where should I look? [15:14.880 --> 15:17.880] That's something that you get from having a framework. [15:17.880 --> 15:21.880] So, we will talk a little bit about that, [15:21.880 --> 15:24.880] but we get supervision from the framework. [15:24.880 --> 15:28.880] We get a neat solution to deal with time, [15:28.880 --> 15:33.880] which is probably the main reason why we don't use Actix today. [15:33.880 --> 15:37.880] And then we have a bunch of recipes to write unique tests, [15:37.880 --> 15:39.880] which is very important. [15:39.880 --> 15:46.880] We also have some stuff to be able to see what our actors are doing [15:46.880 --> 15:50.880] and we have a solution for discoverability also, [15:50.880 --> 15:54.880] but we won't talk about this in this talk. [15:54.880 --> 15:57.880] So, supervision. [15:57.880 --> 16:00.880] Of course, I would like to tell you our code is perfect [16:00.880 --> 16:05.880] and perfect code is, especially for telling the rest, it never fails. [16:05.880 --> 16:11.880] And in a sense, we don't experience panics or stuff like that, [16:11.880 --> 16:17.880] but we have to run our code, third-party code, [16:17.880 --> 16:23.880] user-defined code, like user, they can write a VRI script [16:23.880 --> 16:27.880] to transform their documents in the pipeline [16:27.880 --> 16:30.880] and that's running on our indexing pipeline. [16:30.880 --> 16:32.880] We have to do IO. [16:32.880 --> 16:35.880] We have to interact with different systems. [16:35.880 --> 16:38.880] For instance, we get our documents from a source. [16:38.880 --> 16:42.880] We're running the pipeline. We send them to a storage. [16:42.880 --> 16:44.880] We have different storage that are implemented. [16:44.880 --> 16:48.880] That's a lot of components and any one of them can fail [16:48.880 --> 16:50.880] and we want a very large amount of time. [16:50.880 --> 16:54.880] So, one solution to this, it's not discoverability, [16:54.880 --> 16:56.880] it's not like it works all of the time, [16:56.880 --> 17:00.880] but just try to turn it off and on again. [17:00.880 --> 17:03.880] It feels a little bit stupid, [17:03.880 --> 17:08.880] but you just restart everything from a blank state [17:08.880 --> 17:15.880] or blank slate and sometimes things work fine that way. [17:15.880 --> 17:18.880] So, the supervision works as follows. [17:18.880 --> 17:21.880] We have an actor that is in charge of supervising [17:21.880 --> 17:24.880] all of the actors that are in our pipeline. [17:24.880 --> 17:26.880] What he's doing is that it's pulling actors [17:26.880 --> 17:28.880] and when it detects that they failed, [17:28.880 --> 17:32.880] it will kill everyone and restart everyone. [17:32.880 --> 17:36.880] The definition of failure is a little bit sophisticated [17:36.880 --> 17:40.880] in our case, so it could be an actor that has returned [17:40.880 --> 17:43.880] an error from a handler or it panicked [17:43.880 --> 17:48.880] or we have some system to detect if an actor [17:48.880 --> 17:51.880] has not been progressing for three seconds. [17:51.880 --> 17:54.880] So, we have an option of progression in our framework. [17:54.880 --> 17:57.880] That's an original way to do stuff. [17:57.880 --> 18:00.880] And we use one for all supervision, [18:00.880 --> 18:04.880] which means that if one actor failed, [18:04.880 --> 18:10.880] we restart everyone. [18:10.880 --> 18:12.880] Okay, that was for supervision. [18:12.880 --> 18:14.880] Now, about handling time, [18:14.880 --> 18:18.880] which is probably the most interesting part of our framework. [18:18.880 --> 18:24.880] So, we need to be able to deal with the idea [18:24.880 --> 18:27.880] that, for instance, in our indexer, [18:27.880 --> 18:30.880] we want to emit a file after 30 seconds has passed. [18:30.880 --> 18:32.880] So, we have a condition like this. [18:32.880 --> 18:38.880] 30 seconds after the first document was added in that batch. [18:38.880 --> 18:41.880] And we cannot do time.sleep in the handler [18:41.880 --> 18:44.880] because it will block the entire processing of documents. [18:44.880 --> 18:46.880] So, the solution for this is rather simple. [18:46.880 --> 18:51.880] We have a method so that actors can ask the framework [18:51.880 --> 18:53.880] to send back a message to themselves [18:53.880 --> 18:57.880] after a given amount of time, 30 seconds here. [18:57.880 --> 19:01.880] And it seems like a very simple solution, [19:01.880 --> 19:02.880] but it has a problem. [19:02.880 --> 19:05.880] So, here I showed how it worked. [19:05.880 --> 19:07.880] The actor is sending a message to the scheduler. [19:07.880 --> 19:09.880] This is what is happening under the hood. [19:09.880 --> 19:11.880] It's sending a message to actually an actor [19:11.880 --> 19:15.880] that is run by the framework called the scheduler actor. [19:15.880 --> 19:19.880] And 30 seconds later, the scheduler will stack a message [19:19.880 --> 19:22.880] into the queue of the actor. [19:22.880 --> 19:25.880] The trouble there is, imagine that you already had a lot [19:25.880 --> 19:27.880] of messages in the queue of the actor. [19:27.880 --> 19:29.880] Then, where are your 30 seconds? [19:29.880 --> 19:33.880] Maybe we have one minute worth of messages in that queue. [19:33.880 --> 19:37.880] And then, our entire contract of, [19:37.880 --> 19:41.880] I want to emit a file every 30 seconds, it's broken. [19:41.880 --> 19:43.880] We cannot do that. [19:43.880 --> 19:47.880] So, the solution we went for is actually mailbox [19:47.880 --> 19:49.880] are a tiny bit more complicated. [19:49.880 --> 19:50.880] They have two queues. [19:50.880 --> 19:52.880] One is a low priority queue, the usual one. [19:52.880 --> 19:54.880] And we have a high priority queue. [19:54.880 --> 19:57.880] And the scheduler will put that in the high priority queue. [19:57.880 --> 20:01.880] So, as soon as the actor has finished dealing with the message [20:01.880 --> 20:03.880] that it was processing at the time, [20:03.880 --> 20:06.880] it will jump on this scheduled message. [20:06.880 --> 20:14.880] And we will get our nice 30 seconds call back. [20:14.880 --> 20:19.880] Testability, let me check the time to know. [20:19.880 --> 20:23.880] If I'm good or not, 20 minutes, perfect. [20:23.880 --> 20:29.880] Testability, we have a bunch of solutions to write tests. [20:29.880 --> 20:32.880] Let's go through code to see, like, [20:32.880 --> 20:35.880] actual RISD code to see how we can implement [20:35.880 --> 20:40.880] complicated real-life stuff and unit test it. [20:40.880 --> 20:45.880] So, the code that we will look at is a batch builder [20:45.880 --> 20:49.880] that is mimicking pretty much what we do in indexing. [20:49.880 --> 20:52.880] So, we have two possible conditions. [20:52.880 --> 20:57.880] We emit a file either because we have enough data [20:57.880 --> 21:01.880] and it's enough to cut a file. [21:01.880 --> 21:03.880] So, let's say if you have 100 messages, [21:03.880 --> 21:05.880] it's not 100 in relative, [21:05.880 --> 21:08.880] but if you have 100 messages, then you emit a file. [21:08.880 --> 21:12.880] Or, if 30 seconds has elapsed in the reception [21:12.880 --> 21:19.880] of the first document of the batch. [21:19.880 --> 21:21.880] So, let's start slow and easy. [21:21.880 --> 21:27.880] So, we have our actor here. [21:27.880 --> 21:30.880] So, this is the state of the actor, [21:30.880 --> 21:33.880] but this is the actor as well. [21:33.880 --> 21:38.880] So, something that is obvious is it will have to have some [21:38.880 --> 21:41.880] mailbox to push the speed that it produced to. [21:41.880 --> 21:45.880] It will have a document batch which will be a vex of string. [21:45.880 --> 21:47.880] So, document will be just string. [21:47.880 --> 21:49.880] It will append document to that. [21:49.880 --> 21:51.880] When it's big enough, it's now flash it [21:51.880 --> 21:54.880] and send it to the mailbox of the consumer. [21:54.880 --> 21:59.880] And one thing that is new here is we added some counters [21:59.880 --> 22:04.880] and we will be able in the unit test to do some assert [22:04.880 --> 22:10.880] on this internal state. [22:10.880 --> 22:13.880] I didn't talk about it, [22:13.880 --> 22:17.880] but the actor trait actually has an associated type [22:17.880 --> 22:19.880] which is called observable state. [22:19.880 --> 22:23.880] Of course, the whole idea of actors is to encapsulate your [22:23.880 --> 22:26.880] state into your thread or your token task [22:26.880 --> 22:28.880] and you're not supposed to be able to mutate it [22:28.880 --> 22:31.880] or even read it from the external world. [22:31.880 --> 22:37.880] But we have some thing that makes it possible to ask [22:37.880 --> 22:42.880] from outside the actor what is your observable state [22:42.880 --> 22:45.880] and it will send a message to the actor [22:45.880 --> 22:48.880] and the actor will send back the result [22:48.880 --> 22:50.880] of the observable state method here [22:50.880 --> 22:55.880] which is nifty for observability and for unit test. [22:55.880 --> 22:58.880] And then there is our handler. [22:58.880 --> 23:00.880] So, we will have two messages. [23:00.880 --> 23:03.880] One message will be receiving a document here. [23:03.880 --> 23:05.880] It was just a string as I told you. [23:05.880 --> 23:07.880] I wanted to keep stuff simple. [23:07.880 --> 23:11.880] And we will do several things. [23:11.880 --> 23:15.880] The first thing that we do is if this was the first document [23:15.880 --> 23:19.880] in the batch, we need to register our callback message [23:19.880 --> 23:23.880] using the schedule self message. [23:23.880 --> 23:26.880] We will append this document to our batch [23:26.880 --> 23:29.880] and then we check for the conditions. [23:29.880 --> 23:31.880] We have enough documents in the batch [23:31.880 --> 23:39.880] to actually emit a batch using our second batch emission [23:39.880 --> 23:41.880] condition. [23:41.880 --> 23:43.880] And in that case, we call emit batch. [23:43.880 --> 23:47.880] I didn't put the code of emit batch because it was too easy. [23:47.880 --> 23:49.880] Not very interesting. [23:49.880 --> 24:00.880] And then I didn't put the handler [24:00.880 --> 24:03.880] of the time out message, but you can guess it basically [24:03.880 --> 24:07.880] it's emitting the batch. [24:07.880 --> 24:10.880] And then when we want to unit that stuff, [24:10.880 --> 24:13.880] we write things like this. [24:13.880 --> 24:17.880] So, we have a universe in our unit test. [24:17.880 --> 24:19.880] It's a very important thing. [24:19.880 --> 24:23.880] We want to isolate our unit test one from each other. [24:23.880 --> 24:25.880] And the universe is in charge of this. [24:25.880 --> 24:28.880] So, all of the actors of your program [24:28.880 --> 24:30.880] have to belong to the same universe. [24:30.880 --> 24:33.880] Otherwise, they're not supposed to communicate together. [24:33.880 --> 24:37.880] And we will see that this isolation will make it possible [24:37.880 --> 24:40.880] to do something really cool in the next slide. [24:40.880 --> 24:44.880] And so, this universe makes it possible to make a fake mailbox [24:44.880 --> 24:49.880] that we create like the consumer side of things. [24:49.880 --> 24:52.880] We can create our batch builder and it's alone [24:52.880 --> 24:54.880] and send message to it. [24:54.880 --> 24:56.880] That's what we do there. [24:56.880 --> 24:59.880] So, yeah, I usually like to jump and point at the screen, [24:59.880 --> 25:04.880] but I've been told that I cannot cross the wait line. [25:04.880 --> 25:11.880] So, yeah, and then what we do when we want to create an assert [25:11.880 --> 25:14.880] is that we call this function called process pending and observe, [25:14.880 --> 25:19.880] which just means that we wait for all of the messages [25:19.880 --> 25:22.880] that are currently in the queue of the actor. [25:22.880 --> 25:25.880] We have all of those process and then we call observe [25:25.880 --> 25:30.880] and we get a snapshot of what is the observable state of the actor. [25:30.880 --> 25:33.880] And here, the observable state was a counter, [25:33.880 --> 25:37.880] so we check that it's equal to the number of documents that we wanted. [25:37.880 --> 25:43.880] And then we check that the consumer mailbox does contain [25:43.880 --> 25:50.880] our two batches, because 250 is 100 and 102 batches. [25:50.880 --> 25:53.880] And we also want to be able to check the timeout, [25:53.880 --> 25:56.880] because the timeout counter is working well. [25:56.880 --> 26:00.880] So, here, what is interesting is we created our universe, [26:00.880 --> 26:04.880] but with the method with accelerated time, [26:04.880 --> 26:07.880] and we would be marking time at this point. [26:07.880 --> 26:10.880] So, you won't have to wait 30 seconds to have your unit test [26:10.880 --> 26:12.880] to run for 30 seconds. [26:12.880 --> 26:17.880] It will do magic and the result will be exactly the same [26:17.880 --> 26:20.880] as if you were not accelerating time, [26:20.880 --> 26:23.880] but it will just be faster. [26:23.880 --> 26:26.880] And I will explain a little bit how it works. [26:26.880 --> 26:30.880] And so, for the unit test, obviously, [26:30.880 --> 26:34.880] we have to call some way to wait, [26:34.880 --> 26:37.880] and we call universe.sleep to do that. [26:37.880 --> 26:42.880] And it's important to use the universe.sleep and time.sleep [26:42.880 --> 26:46.880] because we are marking stuff, obviously. [26:46.880 --> 26:49.880] We cannot use the marking facilities that we have in Tokyo [26:49.880 --> 26:51.880] because we use several runtimes. [26:51.880 --> 26:55.880] And also, what we do is similar in semantics [26:55.880 --> 26:59.880] as pose and auto-advance if you're familiar with it, [26:59.880 --> 27:01.880] except we never freeze time. [27:01.880 --> 27:03.880] We keep time flowing, [27:03.880 --> 27:05.880] but what we do is tiny bit different. [27:05.880 --> 27:10.880] So, you can imagine that if you were not accelerating time, [27:10.880 --> 27:12.880] your actor execution would look like this. [27:12.880 --> 27:14.880] So, actor are processing stuff, [27:14.880 --> 27:17.880] and sometimes you don't have any message in any queue, [27:17.880 --> 27:19.880] or actor are either, [27:19.880 --> 27:23.880] and the only thing that will resume the processing [27:23.880 --> 27:26.880] is some time out to happen and the scheduler saying, [27:26.880 --> 27:28.880] okay, I have a message for you, [27:28.880 --> 27:32.880] you asked for a self-scheduled message. [27:32.880 --> 27:34.880] It's happening now. [27:34.880 --> 27:36.880] So, our framework detects that we are in a phase [27:36.880 --> 27:39.880] where no one is working and waiting for the scheduler, [27:39.880 --> 27:42.880] and in that case, and only in that case, we accelerate time. [27:42.880 --> 27:46.880] And that's why we get a result that is exactly the same [27:46.880 --> 27:50.880] as if we didn't accelerate time, but just faster. [27:50.880 --> 27:55.880] So, we compress our execution before, after. [27:55.880 --> 27:57.880] That's how it works. [28:02.880 --> 28:06.880] I wanted to show you the actual indexing pipeline [28:06.880 --> 28:08.880] in its full complexity. [28:08.880 --> 28:10.880] I said 25 actors, it's not 25 actors here, [28:10.880 --> 28:13.880] but we have other actors in other parts of the code [28:13.880 --> 28:15.880] because the pattern got quite popular. [28:15.880 --> 28:20.880] It's quite complex, but it makes me extremely good. [28:20.880 --> 28:24.880] It makes me feel good to be able to show that [28:24.880 --> 28:27.880] when we have to explain a new developer [28:27.880 --> 28:32.880] what the indexing is doing. [28:32.880 --> 28:34.880] We can point at things. [28:34.880 --> 28:38.880] Every single one of these box is doing one very simple thing. [28:38.880 --> 28:41.880] It has its own file, it has its own unit test. [28:41.880 --> 28:46.880] It makes me happy to have this very simple figure [28:46.880 --> 28:49.880] that we can discuss around. [28:49.880 --> 28:53.880] One thing that I need to talk to you about is one problem [28:53.880 --> 29:04.880] with Actors is if you have cycles in your Actors network, [29:04.880 --> 29:06.880] you might experience deadlock. [29:06.880 --> 29:08.880] And it's a pretty terrible thing that kind of deadlock [29:08.880 --> 29:11.880] because it can happen at any time in production, [29:11.880 --> 29:13.880] like it can work for one week [29:13.880 --> 29:19.880] and then you experience a deadlock and it's a scary thing. [29:19.880 --> 29:23.880] So there's a sufficient condition to have deadlocks. [29:23.880 --> 29:25.880] If you don't have any cycles, right? [29:25.880 --> 29:29.880] And usually that's the case when you are writing a pipeline. [29:29.880 --> 29:32.880] In the graph before, there was actually a cycle. [29:32.880 --> 29:35.880] We will have a look at it in a second. [29:35.880 --> 29:39.880] There is another, there is a nicer condition to have deadlocks. [29:39.880 --> 29:46.880] It's if the graph of your Actors where you removed all of the queue [29:46.880 --> 29:48.880] where you had an infinite capacity, [29:48.880 --> 29:51.880] if that one is a DAG, then you won't have deadlocks. [29:51.880 --> 29:54.880] And that's what we are doing here. [29:54.880 --> 29:59.880] So the loop, the cycle that we had was due to the fact [29:59.880 --> 30:03.880] that we have like an auxiliary pipeline [30:03.880 --> 30:05.880] that is merging the file together [30:05.880 --> 30:08.880] and there is an arrow over there. [30:08.880 --> 30:11.880] Sorry, I'm going to cross the line. [30:11.880 --> 30:15.880] I did it, I apologize. [30:15.880 --> 30:18.880] If you remove this arrow, then it's a DAG [30:18.880 --> 30:22.880] and that's sufficient condition to never experience deadlock. [30:22.880 --> 30:25.880] It helps me sleep at night. [30:25.880 --> 30:28.880] And yeah, we have a bunch of other features. [30:28.880 --> 30:31.880] The most important one I think I want to tell you about [30:31.880 --> 30:34.880] is that we measure back pressure. [30:34.880 --> 30:37.880] So the framework is automatically measuring back pressure [30:37.880 --> 30:41.880] and expose it as a promoter counter. [30:41.880 --> 30:43.880] That's really neat. [30:43.880 --> 30:45.880] Very useful for us. [30:45.880 --> 30:50.880] And let's use the rest of the time for questions. [30:50.880 --> 30:52.880] So... [30:52.880 --> 31:01.880] APPLAUSE [31:01.880 --> 31:04.880] So are there any questions in the room? [31:04.880 --> 31:06.880] Yes. [31:06.880 --> 31:08.880] The last slide of the previous slide was [31:08.880 --> 31:10.880] you didn't need parallel actors. [31:10.880 --> 31:13.880] What did you need like Fanny and Fanna out [31:13.880 --> 31:16.880] for having any parallel work? [31:16.880 --> 31:19.880] Oh, yes. [31:19.880 --> 31:22.880] So there is something that I didn't read [31:22.880 --> 31:25.880] but Sylvain was very fast [31:25.880 --> 31:28.880] and noticed that we don't have anything [31:28.880 --> 31:32.880] to be able to have several actor work on the same queue [31:32.880 --> 31:37.880] or work concurrently to process stuff faster. [31:37.880 --> 31:42.880] So yeah, strongly, we haven't needed that [31:42.880 --> 31:44.880] strongly enough to actually implement it. [31:44.880 --> 31:47.880] I wrote an implementation and never managed it [31:47.880 --> 31:49.880] because we didn't really need it. [31:49.880 --> 31:54.880] So indexing, we just spawn several pipelines [31:54.880 --> 31:56.880] on the same machine, not too much. [31:56.880 --> 31:59.880] So that part, it's unparalleled. [31:59.880 --> 32:05.880] But yeah, we just never need... [32:05.880 --> 32:07.880] We haven't needed it yet. [32:07.880 --> 32:09.880] I can't really tell where. [32:09.880 --> 32:14.880] Yes, exactly. [32:14.880 --> 32:19.880] So the parallel behavior. [32:19.880 --> 32:22.880] Sorry, you want me to repeat. [32:22.880 --> 32:24.880] So Sylvain was saying, [32:24.880 --> 32:27.880] we use more than one core [32:27.880 --> 32:29.880] just because within the pipeline [32:29.880 --> 32:30.880] we do the streamlining thing. [32:30.880 --> 32:32.880] So we may have different actors [32:32.880 --> 32:35.880] that are consuming the CPU at the same time [32:35.880 --> 32:37.880] but we don't have one actor going, [32:37.880 --> 32:39.880] oh, there's actually five instance of the actor [32:39.880 --> 32:43.880] and we are doing the work five times faster. [32:43.880 --> 32:47.880] So we didn't need that. [32:47.880 --> 32:49.880] Any more questions? [32:49.880 --> 32:50.880] Yes? [32:50.880 --> 32:52.880] Do you have the fairness system [32:52.880 --> 32:54.880] so that one actor doesn't keep [32:54.880 --> 32:56.880] on the processing restriction [32:56.880 --> 33:01.880] other orders in the process? [33:01.880 --> 33:03.880] So no, we don't have that. [33:03.880 --> 33:05.880] So one thing that we have, [33:05.880 --> 33:07.880] actually we don't want... [33:07.880 --> 33:09.880] We have the opposite problem. [33:09.880 --> 33:11.880] We don't want fairness. [33:11.880 --> 33:16.880] So if you look at our pipeline, [33:16.880 --> 33:18.880] the stuff that is taking a lot of CPU [33:18.880 --> 33:21.880] would be the indexer. [33:21.880 --> 33:23.880] Sylvain would take a lot of IO [33:23.880 --> 33:26.880] and we want to give it priority [33:26.880 --> 33:30.880] because it's the thing that we want it to use [33:30.880 --> 33:32.880] as much as CPU as it can. [33:32.880 --> 33:34.880] So we want to give it one core [33:34.880 --> 33:38.880] and we want it to use as much IO as it needs. [33:38.880 --> 33:41.880] And we would like to give it priority. [33:41.880 --> 33:43.880] So the way we do that is that we run it [33:43.880 --> 33:45.880] on a specific runtime [33:45.880 --> 33:48.880] and over there it has its dedicated core. [33:48.880 --> 33:51.880] For IO, the framework is actually not really helping. [33:51.880 --> 33:54.880] So what we do is that we have some IO throttling [33:54.880 --> 33:57.880] that makes it so that the other actors [33:57.880 --> 34:02.880] are not able to actually more write on disk faster [34:02.880 --> 34:04.880] and you can configure that [34:04.880 --> 34:07.880] and there's some corner of the table computation [34:07.880 --> 34:09.880] to compute what you should do. [34:09.880 --> 34:12.880] But yeah, other actors will not be able to write on disk [34:12.880 --> 34:16.880] faster than, let's say, 80 megabytes per second. [34:16.880 --> 34:20.880] And the merge that you have below, [34:20.880 --> 34:22.880] it's okay if it lags a little bit. [34:22.880 --> 34:26.880] That's the part that we want to be low priority [34:26.880 --> 34:29.880] and the part on the top we want to be high priority. [34:29.880 --> 34:31.880] So we don't have any fairness [34:31.880 --> 34:36.880] and we don't want any fairness. [34:36.880 --> 34:37.880] Yes? [34:37.880 --> 34:39.880] So I guess you're supervising that [34:39.880 --> 34:44.880] because otherwise the timeouts may also be delayed. [34:44.880 --> 34:47.880] So the supervisor is running on, [34:47.880 --> 34:49.880] it's very fast, it doesn't do much. [34:49.880 --> 34:51.880] So it's running on a Tokyo runtime [34:51.880 --> 34:53.880] that has a dedicated core [34:53.880 --> 34:56.880] and runs one bazillion actors, [34:56.880 --> 34:58.880] but they're all very light. [34:58.880 --> 35:00.880] So it doesn't matter at all. [35:00.880 --> 35:02.880] Okay, yeah. [35:02.880 --> 35:05.880] Yeah. [35:05.880 --> 35:08.880] Because, I mean, if your actors are very heavy now, [35:08.880 --> 35:11.880] of course, the supervisor will catch you. [35:11.880 --> 35:12.880] Yeah. [35:12.880 --> 35:14.880] At some point, because otherwise your timeouts [35:14.880 --> 35:15.880] will still be delayed. [35:15.880 --> 35:16.880] Yeah, absolutely. [35:16.880 --> 35:17.880] You're absolutely right, [35:17.880 --> 35:21.880] but the thing is it's running on its specific runtime [35:21.880 --> 35:23.880] and it's not CPU-HV, [35:23.880 --> 35:28.880] so there's plenty of core to work with. [35:28.880 --> 35:29.880] Yes? [35:29.880 --> 35:31.880] When you accelerate time in the testing universe, [35:31.880 --> 35:36.880] do you have to specify the steps in time you take? [35:36.880 --> 35:38.880] You mean the number of times? [35:38.880 --> 35:40.880] I assume that when you accelerate time, [35:40.880 --> 35:42.880] basically when nothing is happening, [35:42.880 --> 35:44.880] you take a time step and then see if something would have happened [35:44.880 --> 35:46.880] at that time point. [35:46.880 --> 35:48.880] Does that mean that you have to specify, [35:48.880 --> 35:52.880] we take steps of 100 milliseconds and then test every time [35:52.880 --> 35:54.880] if something would be happening now? [35:54.880 --> 35:57.880] No, it's not. [35:57.880 --> 36:00.880] We don't need to say how many steps we take. [36:00.880 --> 36:04.880] We don't need to say what is the resolution of the steps [36:04.880 --> 36:05.880] that we take. [36:05.880 --> 36:07.880] So the only thing that we do is that the scheduler, [36:07.880 --> 36:10.880] when it detects, it needs to accelerate time. [36:10.880 --> 36:13.880] It has some kind of heap that says, [36:13.880 --> 36:17.880] OK, the next event is actually in 70 milliseconds. [36:17.880 --> 36:20.880] So let's jump 70 milliseconds in the future [36:20.880 --> 36:22.880] and it triggers that event. [36:22.880 --> 36:26.880] And then the execution of the actor [36:26.880 --> 36:30.880] that was supposed to receive this message will go [36:30.880 --> 36:33.880] and if no actor is working anymore, [36:33.880 --> 36:35.880] then we re-accidate them again. [36:35.880 --> 36:42.880] So it's no steps, no resolution or nothing. [36:42.880 --> 36:43.880] Yes? [36:43.880 --> 36:46.880] How about reliability if you want to be sure [36:46.880 --> 36:48.880] that the bot line will make it to the index, [36:48.880 --> 36:52.880] you count them or how do you know they made it through? [36:52.880 --> 36:59.880] So, yeah, it should be the subject of another talk. [36:59.880 --> 37:02.880] Because now that's a super interesting question. [37:02.880 --> 37:07.880] So the pipeline, you want to know, [37:07.880 --> 37:09.880] to have an idea of what kind of semantics, [37:09.880 --> 37:12.880] delivery semantics that you want to have. [37:12.880 --> 37:15.880] We actually offer exactly one semantics [37:15.880 --> 37:18.880] and the way we deal with that is, [37:18.880 --> 37:20.880] so we didn't talk about that, [37:20.880 --> 37:24.880] we have the documents that are coming from a source actor [37:24.880 --> 37:27.880] and the source actors, when we spawn it, [37:27.880 --> 37:32.880] we tell it, OK, you need to stream messages [37:32.880 --> 37:35.880] from these specific checkpoints. [37:35.880 --> 37:39.880] And when we publish stuff like downstream, [37:39.880 --> 37:43.880] we publish stuff by running a transaction [37:43.880 --> 37:47.880] on our metadata store backend [37:47.880 --> 37:50.880] and that transaction updates the checkpoints [37:50.880 --> 37:53.880] of the stuff that we have published [37:53.880 --> 37:55.880] and it publishes the speed as well. [37:55.880 --> 37:58.880] So that when we restart everything, [37:58.880 --> 38:01.880] we can check in the metadata store [38:01.880 --> 38:03.880] what is the last checkpoint and it starts from there. [38:03.880 --> 38:05.880] And if there is an error, [38:05.880 --> 38:09.880] the metadata store will just yell at us and return an error. [38:09.880 --> 38:11.880] It will say, OK, no, something weird has been happening, [38:11.880 --> 38:14.880] maybe we all had two pipelines working at the same time [38:14.880 --> 38:20.880] and your checkpoints are overlapping and you have a problem. [38:20.880 --> 38:23.880] That's the way we work with this problem. [38:23.880 --> 38:24.880] Yes? [38:24.880 --> 38:27.880] At the universe, is that a special crate [38:27.880 --> 38:29.880] or is it in the standard category? [38:29.880 --> 38:33.880] No, the universe thing is something within our framework [38:33.880 --> 38:35.880] and that's what we use to be able to isolate [38:35.880 --> 38:38.880] typically different programs or different unit tests [38:38.880 --> 38:40.880] or different systems. [38:40.880 --> 38:44.880] Yeah, it's within our active framework. [38:46.880 --> 38:47.880] It is. [38:50.880 --> 38:52.880] Do we still have time? [38:52.880 --> 38:56.880] I think we have one more question or one more minute. [38:56.880 --> 38:58.880] I think there was a... [38:58.880 --> 38:59.880] Yes. [38:59.880 --> 39:01.880] I understand on this graph, [39:01.880 --> 39:07.880] the numbers on the rows indicate the capacity of the queue, right? [39:07.880 --> 39:10.880] The capacity of the channel between the actors, right? [39:10.880 --> 39:11.880] The numbers, yes. [39:11.880 --> 39:12.880] Yes. [39:12.880 --> 39:16.880] So we have a lot of tuning points in this system, right? [39:16.880 --> 39:17.880] Yes. [39:17.880 --> 39:19.880] With relation to the back pressure. [39:19.880 --> 39:21.880] So the question is, from your experience, [39:21.880 --> 39:24.880] how sensitive the performance of the system [39:24.880 --> 39:28.880] is to the tuning of back pressure on the channels? [39:28.880 --> 39:31.880] And maybe you have some kind of advice [39:31.880 --> 39:34.880] or a rule of thumb on what to choose [39:34.880 --> 39:37.880] for the best performance. [39:37.880 --> 39:38.880] Yes. [39:38.880 --> 39:40.880] So the question was, on this slide, [39:40.880 --> 39:43.880] all of the little numbers that we have on the arrow [39:43.880 --> 39:47.880] is the capacity of the different queues between actors [39:47.880 --> 39:49.880] that's a lot of parameters to tune. [39:49.880 --> 39:51.880] They probably have an impact on performance. [39:51.880 --> 39:53.880] Is there a cool recipe to... [39:53.880 --> 39:55.880] So the first question was, [39:55.880 --> 39:57.880] how much do they impact performance? [39:57.880 --> 39:59.880] And the second one is, [39:59.880 --> 40:03.880] do we have a nice recipe to be able to tune them maybe automatically? [40:03.880 --> 40:06.880] I'll go first because there is no more time. [40:06.880 --> 40:11.880] So they don't impact performance all that much [40:11.880 --> 40:16.880] as long as you got them a little bit correct. [40:16.880 --> 40:20.880] So you usually need to identify the stuff [40:20.880 --> 40:23.880] that should be at one, [40:23.880 --> 40:25.880] and then you put it at one [40:25.880 --> 40:27.880] and where you want a little bit of capacity. [40:27.880 --> 40:31.880] It should be quite obvious if you know your system. [40:31.880 --> 40:34.880] And I'm sure that there is a nice recipe [40:34.880 --> 40:36.880] to auto-detect that. [40:36.880 --> 40:38.880] I haven't found it. [40:38.880 --> 40:40.880] So if you have ideas, I'd love to... [40:40.880 --> 40:42.880] Usually that kind of question is someone [40:42.880 --> 40:44.880] who is thinking about something. [40:44.880 --> 40:47.880] So please come to me after the talk. [40:47.880 --> 40:50.880] And I'd love to hear your thoughts. [40:50.880 --> 40:52.880] Thank you, everyone. Time is up. [40:52.880 --> 41:02.880] Thank you very much.