[00:00.000 --> 00:13.320] Hello everyone, let's, all right, thank you. [00:13.320 --> 00:19.240] So I'm Harshad Angal, I work for PlanetSkill, and I'm a maintainer of Wittes, and today [00:19.240 --> 00:21.960] I'm going to talk about life of a query. [00:21.960 --> 00:22.960] Why not? [00:22.960 --> 00:27.040] Because I work in query serving team in Wittes. [00:27.040 --> 00:29.840] So what is Wittes basically? [00:29.840 --> 00:35.040] Wittes is nothing but it's used to scale out your MySQL, and it also manages your MySQL [00:35.040 --> 00:36.280] for you. [00:36.280 --> 00:43.760] It's a CNCF gadget project, and it is open source with a partial license 2.0. [00:43.760 --> 00:49.200] So this is the architecture of Wittes, which is basically, there are so many components, [00:49.200 --> 00:53.640] but today we'll just focus on few of them, which is one is VTGate, where your application [00:53.640 --> 00:58.920] connects to, and then we have VT Tablet, where once the application connects to VTGate, then [00:58.920 --> 01:02.800] the queries are sent over there, and then you have the MySQL, where actually your data [01:02.800 --> 01:06.320] are getting stored. [01:06.320 --> 01:10.640] Let's talk about few terminologies, which I'll be using across my presentation, which [01:10.640 --> 01:12.560] is one is key space. [01:12.560 --> 01:17.040] Key space is a logical representation of a database, and it's basically a collection [01:17.040 --> 01:19.000] of your physical databases. [01:19.000 --> 01:27.200] Shard, it's basically a subset of your data, which resides in a key space. [01:27.200 --> 01:29.760] And there's some term called Windex. [01:29.760 --> 01:36.960] Windex is similar to your MySQL indexes, which is basically, it's maintained by the Wittes, [01:36.960 --> 01:41.440] and there's a thing called primary Windexes, which means it will decide where your row [01:41.440 --> 01:46.040] actually live in a particular shard, or in particular table. [01:46.040 --> 01:50.560] So once your query comes in, then it decides, okay, this row, while inserting, where it [01:50.560 --> 01:56.600] should actually, which shard it should actually go to. [01:56.600 --> 02:00.680] So let's take some query, right, then we'll go further. [02:00.680 --> 02:06.000] So let's say we have two tables, customer and order table, and what we want to do is [02:06.000 --> 02:14.680] we want to find a customer who has at least a spend of 1000 bucks basically in their whole [02:14.680 --> 02:15.680] order history. [02:15.680 --> 02:20.280] So we have to take two tables, we take a join, and do a grouping on them, and take a filter [02:20.280 --> 02:23.960] on top of it. [02:23.960 --> 02:24.960] So first thing first. [02:24.960 --> 02:28.920] So the client wants to send a query to you, how they will do it. [02:28.920 --> 02:34.760] So first, the client has to connect to something called VVol VT gates, and so they can use [02:34.760 --> 02:35.760] MySQL protocol. [02:35.760 --> 02:44.720] You have all the MySQL drivers, like in different languages you have, and you can use the already [02:44.720 --> 02:47.640] available MySQL driver to connect directly to VT gate. [02:47.640 --> 02:51.960] VT gate supports the MySQL protocol, so you don't have to do anything on that front. [02:51.960 --> 02:59.880] But it also supports GRPC, and it was supported earlier before we implemented the MySQL protocol, [02:59.880 --> 03:03.960] but it still stays here for its own benefits. [03:03.960 --> 03:08.920] And the reason is like, in MySQL protocol, once you connect, it's like your session is [03:08.920 --> 03:13.480] tied, which means if you open a transaction, you have to commit transaction using that. [03:13.480 --> 03:18.560] But in GRPC, what the initial benefit you get is you can connect to any VT gate, and [03:18.560 --> 03:21.480] you start a transaction over there, and then you can connect to any other VT gate, and [03:21.480 --> 03:26.560] you can commit your transaction using that VT gate. [03:26.560 --> 03:30.240] So let's talk about the different phases that we have to go through. [03:30.240 --> 03:35.320] Once the query is now received by the VT gate, it has to go through parsing, rewriting, planning, [03:35.320 --> 03:36.400] and execution. [03:36.400 --> 03:40.520] And we'll talk in details about each of the phases, but this is what the VT gate does [03:40.520 --> 03:44.080] once it receives the query on its end. [03:44.080 --> 03:48.240] So in the parsing phase, now you receive the query. [03:48.240 --> 03:53.080] Now basically, it will parse it to know whether the query is tactically correct or not. [03:53.080 --> 03:57.560] And once it is tactically correct, then it constructs the abstract syntax tree of it. [03:57.560 --> 04:01.360] So here it will have the select expression from clauses, group by, and the having for [04:01.360 --> 04:05.480] the query we mentioned before. [04:05.480 --> 04:09.160] Now once it's parsed, it goes into the rewriting phase. [04:09.160 --> 04:15.280] And it's very important to have this rewriting phase because we are trying to mimic a single [04:15.280 --> 04:19.640] data store, though your data is distributed across multiple charts. [04:19.640 --> 04:24.520] And this rewriting phase, what we try to do is basically, we first try to see, oh, is [04:24.520 --> 04:28.120] there any literals, is there anything which we can parameterize? [04:28.120 --> 04:32.880] So once we are done with the planning phase, we don't have to plan again and again similar [04:32.880 --> 04:34.320] kind of query. [04:34.320 --> 04:39.240] So what we do is we say, okay, here's IC 1000, but I don't need to plan specifically [04:39.240 --> 04:40.240] for 1000. [04:40.240 --> 04:45.000] So I just make it a parameter, and then I plan for this kind of query rather than planning [04:45.000 --> 04:48.960] with the 1000. [04:48.960 --> 04:52.840] The other thing we do is we do some replacement functions. [04:52.840 --> 04:57.480] So like if you want to get last insert ID, you cannot just send it to any MySQL and get [04:57.480 --> 04:58.720] a last insert ID. [04:58.720 --> 05:04.440] So we do a session management at the VTGate level where for each session, we have to know [05:04.440 --> 05:07.640] what was the last insert ID inserted, so you cannot send directly. [05:07.640 --> 05:10.840] So we need a rewriting phase for these kind of functions. [05:10.840 --> 05:14.080] There are multiple, but I'm just talking about one of them, which is last insert ID. [05:14.080 --> 05:16.880] So you have to do it at the VTGate level. [05:16.880 --> 05:22.080] So then you are replacing that with function with a value, and you know what that value [05:22.080 --> 05:24.920] is for that session. [05:24.920 --> 05:30.960] And the third is you might have to add another SQL node after you construct the AST. [05:30.960 --> 05:36.160] So this is like if on the session you said, I cannot select more than 100 rows. [05:36.160 --> 05:41.080] So after AST, we have in the rewriting phase, we'll add a limit clause as well. [05:41.080 --> 05:47.160] So the planning happens with the limit in place in it. [05:47.160 --> 05:53.040] So after rewriting and the AST generation, before we go into the planning, the planner [05:53.040 --> 05:57.040] needs some additional information, which is it should know what are the key spaces that [05:57.040 --> 06:02.200] exist and what are the shards that the key space map to. [06:02.200 --> 06:10.160] And in the key spaces, what are the tables that exist and what are the indexes that you [06:10.160 --> 06:12.840] use for those tables. [06:12.840 --> 06:18.360] And these information is basically cached in the VTGate, and you have an event watcher [06:18.360 --> 06:19.360] which watches. [06:19.360 --> 06:22.400] And this information comes from, actually, we have something called Topo Server, which [06:22.400 --> 06:28.800] is ZooKeeper at CDSIM, which is where this metadata kind of information is stored. [06:28.800 --> 06:33.880] And let's see what the sharding information looks like that we get from that ZooKeeper. [06:33.880 --> 06:36.360] So what we call is called vschema. [06:36.360 --> 06:41.360] And here it's saying that there's one thing called commerce, and it's sharded. [06:41.360 --> 06:43.320] This is the index that exists. [06:43.320 --> 06:47.960] And these are the two tables, customer and the order table, which use, and they use [06:47.960 --> 06:59.120] the CID column, and they use this index function to shard their table. [06:59.120 --> 07:04.720] So now the VTGate knows about everything, that now it can go into the planning phase. [07:04.720 --> 07:12.200] And the first thing that it does is does the semantic analysis, which is it does basically [07:12.200 --> 07:17.480] scoping, binding, and typing, which is like it's validating, not validated your syntactic [07:17.480 --> 07:21.320] thing before, and now it's trying to validate whether you are using the right columns in [07:21.320 --> 07:25.520] the right places or not, whether they are actually scoped correctly or not, is the visibility [07:25.520 --> 07:30.160] correct or not for those operations, the operations you're using. [07:30.160 --> 07:34.320] And then once it validates that, then it goes into the binding phase, where it binds those [07:34.320 --> 07:38.680] columns to the table that it belongs to, and then it also does the typing, which means [07:38.680 --> 07:43.840] it tries to understand that, okay, once the results come back, we'll know that what type [07:43.840 --> 07:47.160] it will be of. [07:47.160 --> 07:52.240] The second is once the semantic analysis happens, then your AST is converted into an operator [07:52.240 --> 07:59.800] tree, which is basically a logical operator, like you have joints, and then you have tables [07:59.800 --> 08:02.880] and stuff, so it convert into those logical operators. [08:02.880 --> 08:08.640] And then once those logical operator are converted, it then goes into the optimization phase. [08:08.640 --> 08:14.760] So in the optimization phase, we basically make a decision that how the plan should look [08:14.760 --> 08:21.400] like, and in the innovators, basically what we do is we get the SQL, and ultimately after [08:21.400 --> 08:25.840] the planning phase, we'll get another SQL, because the data does not reside on the VTGit, [08:25.840 --> 08:31.760] it resides on the MySQL, and we want to optimize on the way that we should get less and less [08:31.760 --> 08:35.080] information back on the VTGit to process your data. [08:35.080 --> 08:42.080] So we try to push as much as down to the query to the MySQL so that that can resolve much [08:42.080 --> 08:45.000] faster for us than we do it at the VTGit. [08:45.000 --> 08:51.440] So in the optimization phase, basically after the optimization completes, we call something [08:51.440 --> 08:57.920] like routes, which tell us that, okay, this query will go to which all shards, so that's [08:57.920 --> 09:04.400] what the ultimate step becomes, and that's where we call it as a physical operator. [09:04.400 --> 09:12.400] And then once we have the physical operators, we transform the plan into an execution plan. [09:12.400 --> 09:17.400] And the execution plan is nothing but a collection of engine primitives that we call. [09:17.400 --> 09:22.360] So let's look at how the execution plan looks like. [09:22.360 --> 09:31.160] So if both the tables which I showed in the V schema were using same sharding function, [09:31.160 --> 09:35.240] so that's why we were able to basically make it as a single route, which is telling you [09:35.240 --> 09:42.080] that, okay, just go scatter to all the shards and just gather the result at the VTGit level [09:42.080 --> 09:46.120] and send it back to the user. [09:46.120 --> 09:52.840] But if both the tables, the customer table and the order table were using some different [09:52.840 --> 09:56.520] sharding functions, then you cannot merge it like that. [09:56.520 --> 10:01.200] Then it says basically, it will look something like that, like you'll have, now, because [10:01.200 --> 10:05.480] it gets very complex enough to show everything over here, so I'm just showing that there's [10:05.480 --> 10:09.320] a, there'll be two routes that will happen, and then ultimately the join will also happen [10:09.320 --> 10:13.480] at the VTGit level, sorting and projecting and the aggregation, and then finally the [10:13.480 --> 10:18.120] filtering based on how much customers have spent those thousand bucks. [10:18.120 --> 10:23.200] So everything happens on the VTGit. [10:23.200 --> 10:27.120] Let's just look at the two routes that I showed on the left. [10:27.120 --> 10:31.560] So first one we'll do is basically doing a scatter, and it's trying to do on the order [10:31.560 --> 10:38.840] table that, okay, give me for all the customer IDs what is the spend they have done, and [10:38.840 --> 10:41.320] then on the right-hand side, we need the customer emails. [10:41.320 --> 10:47.240] So for every customer ID, I need to get their email IDs, and because the customer ID was [10:47.240 --> 10:53.680] the sharding key, the primary index for that table, we are able to send the second query [10:53.680 --> 11:04.240] as a, only to a specific shard, we don't have to do again, go to all the shards. [11:04.240 --> 11:07.400] So what all steps happened in the execution? [11:07.400 --> 11:12.640] So first thing is, once we get the route, we resolve it using those indexes, and so [11:12.640 --> 11:16.560] that it can go to the specific correct shard. [11:16.560 --> 11:20.720] Another is then, it basically then we take a decide, okay, now we know which shard we [11:20.720 --> 11:25.920] have to go to, then it will also decide, okay, this is a query that we have to send it to [11:25.920 --> 11:30.920] something that we call the VT tablets, which basically in the shard we have VT tablets [11:30.920 --> 11:37.480] and MySQL, so it's sent to the VT tablet, and then once you receive the result back, [11:37.480 --> 11:44.640] you gather the information, but sometimes you also have to do transactions, so if transaction [11:44.640 --> 11:48.920] is needed, then the VT gate will also tell VT tablets, go and also open the transaction [11:48.920 --> 11:56.400] when you execute the query, and then the session, the transactions will be managed by the VT [11:56.400 --> 12:00.120] gate that will know in which shard what is the transaction being opened, so that when [12:00.120 --> 12:03.480] you do the commit, it knows which shard it has to send the commit information as well [12:03.480 --> 12:06.680] too. [12:06.680 --> 12:12.720] And the last thing that execution also handles is basically, if you have a select query and [12:12.720 --> 12:16.600] you want to read it on the replicas, then it also does the load balancing for you, so [12:16.600 --> 12:28.640] it doesn't overload a single replica. [12:28.640 --> 12:33.920] So now the query is being sent to VT tablets, and now the VT job is to basically get the [12:33.920 --> 12:35.840] result back to the VT gate. [12:35.840 --> 12:37.800] So what are the things that goes in the VT tablet? [12:37.800 --> 12:43.240] It's exactly similar thing that goes in the VT tablets as well, but what are different [12:43.240 --> 12:44.240] reasons? [12:44.240 --> 12:48.320] So first thing is, VT gate has some view of the VT tablet of what this state is, but the [12:48.320 --> 12:52.200] first thing that events the query received by the VT tablet, it does this, okay, let [12:52.200 --> 12:56.560] me validate whether that's current, am I able to serve this query or not, if any view [12:56.560 --> 13:03.800] has changed before, after sending the query and before it reads the VT tablet, so it validates [13:03.800 --> 13:07.560] if everything is okay, okay, then let's go and do the parsing. [13:07.560 --> 13:13.120] So it will parse the query, and the reason is it has to parse the query is because VT [13:13.120 --> 13:19.840] tablet has its checks and balances over here where it tries to put some limitation, like [13:19.840 --> 13:25.560] it says that don't overload the MySQL resource, so it will basically add its own limits onto [13:25.560 --> 13:32.240] the query so that it doesn't overload your MySQL, that you can configure and do things, [13:32.240 --> 13:35.880] and then it goes into the planning phase, and in the planning field, it basically tries [13:35.880 --> 13:42.360] to see whether did you have put any query rules, which means like any bad query, if [13:42.360 --> 13:47.880] you put like, okay, somebody does select star and without a wear condition and stuff, so [13:47.880 --> 13:51.800] it basically says, okay, this query cannot be executed because it's a bad query, so [13:51.800 --> 13:57.760] those kind of query rules you can put, and another thing that we check in the planning [13:57.760 --> 14:01.680] phase is basically it will say, okay, this is the user that sent this query, but is the [14:01.680 --> 14:03.880] user allowed to even access the tables or not? [14:03.880 --> 14:09.480] So if it's not allowed, then we throw the error back. [14:09.480 --> 14:14.320] After the planning phase, also VT tablet does this, again, and all these are there to just [14:14.320 --> 14:19.160] not to load your MySQL, so another thing that it does is query consolidation, which is basically [14:19.160 --> 14:25.960] it checks whether any same query is running on the MySQL or not. [14:25.960 --> 14:30.880] If it's already running, then it just waits for that query to return the result, and all [14:30.880 --> 14:35.040] the threads which are waiting for the same query will get the result and it will return [14:35.040 --> 14:40.560] back, so only one query ultimately gets executed on the MySQL and not all the queries get executed [14:40.560 --> 14:45.400] with the same exact same query, yes. [14:45.400 --> 14:50.360] So once it thinks, okay, nothing can be done, like it has to finally send it to MySQL, then [14:50.360 --> 14:56.600] it will use one of the connection from the connection pool to send the query to MySQL, [14:56.600 --> 15:00.080] and once the results are written back, it will send it to VTGate, VTGate, if it has to [15:00.080 --> 15:08.480] do some operations, it will do it, and finally the client will receive the query. [15:08.480 --> 15:14.160] So yeah, so that's what the life of query is, but then there are some custom operations [15:14.160 --> 15:19.880] which also affect your query path, and which is like when there's a plan maintenance going, [15:19.880 --> 15:27.600] like you're promoting one replica to primary for some reason, and there is basically if [15:27.600 --> 15:32.160] you are splitting your data, like you have some N charts and you want to go to two N [15:32.160 --> 15:39.320] charts or N plus one charts or such thing like that, so while doing those operations, [15:39.320 --> 15:45.360] what VTGate does is it notifies the topo that some operation is going, VTGate understands [15:45.360 --> 15:50.520] it, so it doesn't send query to the VTGate tablet, it basically buffers a certain duration, [15:50.520 --> 15:56.440] and once everything comes back right, then the queries start going to the VTGate tablet, [15:56.440 --> 16:07.880] or it times out and VTGate does the time out from itself rather than sending it down. [16:07.880 --> 16:10.800] So yeah, thank you. [16:10.800 --> 16:11.800] Questions? [16:11.800 --> 16:31.240] So currently we don't do cross-shared transactions, we do it, but in the best effort way, but [16:31.240 --> 16:35.760] it's currently on the application to know that they are doing cross-shared transactions, [16:35.760 --> 16:40.160] but we allow it, but the application should know that they are going cross-shared. [16:40.160 --> 16:41.160] Yes? [16:41.160 --> 17:05.720] So join I already showed in how we have the two routes, and then from one we get the [17:05.720 --> 17:16.440] result and the other one, this is join. [17:16.440 --> 17:24.240] So we have hash joins available, but then it will consume your memory, and otherwise [17:24.240 --> 17:29.280] it's like from one you'll get the result and then you send it to the other one, so [17:29.280 --> 17:47.560] it's just sequential. [17:47.560 --> 17:53.480] Aggregation happens at the VTGate level, he has a sorting layer, so before aggregation [17:53.480 --> 17:58.920] we have to sort, yes, that's what it was there in the diagram as well, that after joint [17:58.920 --> 18:03.800] there was sort layer, because you have to sort, so we have an in-memory sort, so there [18:03.800 --> 18:11.080] are multiple again sort based on what we can do the best, so there's a merge sort also [18:11.080 --> 18:16.560] and then you have a complete sort as well, based on what can we do best. [18:16.560 --> 18:24.800] And then sorting, then it goes to the aggregation. [18:24.800 --> 18:39.920] So that's why we said we try to push as much as with MySQL, so we actually push the sort [18:39.920 --> 18:44.360] as well to MySQL if possible for the merge sort, but if you do it in the memory then [18:44.360 --> 18:50.240] we still sort it, so it depends whether we can push it down or not, if you are able to [18:50.240 --> 19:12.400] push it down we push as much as to the MySQL, yes, thank you, all right I think we have [19:12.400 --> 19:28.780] a question, thank you very much, thank you. [19:28.780 --> 19:30.780] You