[00:00.000 --> 00:07.000] Welcome, good morning everybody. [00:07.000 --> 00:11.000] So today I want to talk a little bit about change data capture, CDC, [00:11.000 --> 00:14.000] stream processing, with the Patrick link. [00:14.000 --> 00:17.000] This talk is split into three parts. [00:17.000 --> 00:20.000] The first part is for people that have never heard of link before. [00:20.000 --> 00:25.000] The second part is maybe a little bit more deep, but I think it's really, really deep. [00:25.000 --> 00:30.000] And then the third part, we could dive really, really deep into under the hood. [00:30.000 --> 00:35.000] So just to make you particularly interested in the software and what we are doing here. [00:35.000 --> 00:39.000] So yeah, I already got an introduction, but just to summarize it. [00:39.000 --> 00:48.000] So like from the open source side, part of the link even before it became part of the Patrick software foundation in 2014. [00:48.000 --> 00:51.000] I'm a member of the management committee of the Patrick link. [00:51.000 --> 00:56.000] In the years I also made it to the top contributors according to additions in the top one. [00:56.000 --> 01:00.000] I don't know which refactoring I did to the top one contributor. [01:00.000 --> 01:07.000] Yeah, and among the core people that try to design things equal every day in the world. [01:07.000 --> 01:11.000] Can you realize, yeah, I went through a couple of companies. [01:11.000 --> 01:15.000] The latest one where I was a co-founder was in Maroc. [01:15.000 --> 01:19.000] Maroc got acquired by Confluent beginning of this year. [01:19.000 --> 01:23.000] And now I'm a principal software engineer at Confluent. [01:23.000 --> 01:25.000] So let's talk about it. [01:25.000 --> 01:32.000] Before I start with an introduction to the link, I would actually like to talk about stream processing in general. [01:32.000 --> 01:40.000] Because when you do stream processing, you basically always can identify roughly like four building blocks. [01:40.000 --> 01:42.000] So let's talk about those building blocks first. [01:42.000 --> 01:49.000] So first of all, you need streams, right? You want to have data, you maybe want to create some pipeline from source to sync. [01:49.000 --> 01:56.000] You might want to distribute your streams because you have a lot of data. [01:56.000 --> 01:59.000] So maybe you want to scale out and scale in depending on the load. [01:59.000 --> 02:02.000] You want to join streams together. [02:02.000 --> 02:03.000] You want to enrich streams. [02:03.000 --> 02:06.000] Maybe there is a control stream and the main stream. [02:06.000 --> 02:12.000] So you want to dynamically modify the behavior of the application while the application is running. [02:12.000 --> 02:15.000] And yeah, sometimes there is a bug in your application. [02:15.000 --> 02:21.000] Or you just want to trace certain behavior, then you also want to be in play streams. [02:21.000 --> 02:26.000] Time, working with time is also a very, very important concept. [02:26.000 --> 02:31.000] Because on one side, you want to make progress in your pipeline. [02:31.000 --> 02:36.000] But at some points, you also want to sync for an ISO if you have two streams. [02:36.000 --> 02:38.000] Maybe you want to wait for the other stream. [02:38.000 --> 02:43.000] Maybe you want to block or you want to buffer some of the streams. [02:43.000 --> 02:48.000] Maybe if the second event doesn't come in, you want to time out after some time. [02:48.000 --> 02:51.000] Maybe you also want to replay historical data. [02:51.000 --> 02:54.000] So you want to fast forward the time. [02:54.000 --> 02:57.000] You don't want to wait another hour to fire an hour window. [02:57.000 --> 03:01.000] No, this should be quicker. [03:01.000 --> 03:10.000] Then when we talk about buffering, what I just said, or in general, storing data state is a very important component for processing. [03:10.000 --> 03:18.000] State can be, for example, a machine learning model that is updated from time to time to classify your incoming streams. [03:18.000 --> 03:24.000] It can be a cache if you don't want to look up in the database for every record that comes in. [03:24.000 --> 03:31.000] In a general state, a low state can be an order of classified. [03:31.000 --> 03:36.000] And state also, at some point, needs to be acquired. [03:36.000 --> 03:45.000] If you have a state full streaming application, a very useful component is helped by actually making sure that I can create this natural of my streaming repository. [03:45.000 --> 03:48.000] So I wanted a state backup to my streaming application. [03:48.000 --> 03:55.000] I wanted to make a version of it, so every night I want to create a screenshot version. [03:55.000 --> 04:03.000] Maybe I want a full streaming application in a staging cluster, in a gelatin cluster, and play around with the data in the state. [04:03.000 --> 04:12.000] Maybe I want to do some testing, or I just want to time track the process of my application. [04:12.000 --> 04:19.000] So let's talk a little bit about what makes Flink unique compared to other competitors. [04:19.000 --> 04:27.000] First of all, what I just showed is that Flink is one of the best stream processors for all of these use cases and building blocks. [04:27.000 --> 04:35.000] So when you design a streaming application, you can start with a whiteboard and you just draw some circles. [04:35.000 --> 04:36.000] What do you actually want to do? [04:36.000 --> 04:38.000] Maybe you want to reform some sources. [04:38.000 --> 04:40.000] Maybe you want to normalize your data. [04:40.000 --> 04:41.000] You want to filter some data out. [04:41.000 --> 04:46.000] You want to join the data and in the end you really want to sync it somewhere else. [04:46.000 --> 04:47.000] But this is how it starts. [04:47.000 --> 04:52.000] And this is also how you have to reason about when you're creating a pipeline. [04:52.000 --> 04:58.000] And what Flink does under the hood is it has a parallelism and scalability built in. [04:58.000 --> 05:03.000] So you don't have to think of threading or network transfers or anything like that. [05:03.000 --> 05:08.000] Under the hood there are sharks, there are petitions depending on the connectors. [05:08.000 --> 05:14.000] There are sub-tasks that in parallel execute operations. [05:14.000 --> 05:22.000] Each of these tasks, some of these tasks can stay full and can have some storage local to the operator. [05:22.000 --> 05:23.000] Very important. [05:23.000 --> 05:28.000] So the state basically scales out and scales in with the operator. [05:28.000 --> 05:34.000] You don't need to vary to a database which would increase data. [05:34.000 --> 05:41.000] And then, of course, it travels and then the whole pipeline runs. [05:41.000 --> 05:43.000] And now comes the important part. [05:43.000 --> 05:52.000] What Flink explained really unique is that this possibility of creating an existence snapshot of your entire stream of technology. [05:52.000 --> 05:58.000] So there are, like, what we call the checkpoint barriers which are traveling through the topology [05:58.000 --> 06:02.000] and make a backup of each state of the operator. [06:02.000 --> 06:15.000] And then this snapshot is then persisted on a long-term storage like S3 or HDFS or some other distributed kind of system. [06:15.000 --> 06:20.000] When we talk about use cases, there are plenty of use cases. [06:20.000 --> 06:28.000] We have the process transactions, docs, IOTs, any kind of events, user interactions. [06:28.000 --> 06:37.000] People use it for broad detection for machine learning, for event-driven applications, for ETL, for data integration, for analytics. [06:37.000 --> 06:43.000] So Flink has become, over the last 10 years, it has become like a very large platform. [06:43.000 --> 06:53.000] You can connect various connectors from business stream systems, you can read and write files, databases, key value stores. [06:53.000 --> 07:00.000] And as I said, like also event-driven applications where maybe you want to send out an email or don't need a connector. [07:00.000 --> 07:04.000] You can also implement something custom that talks to somewhere as the API. [07:04.000 --> 07:09.000] So let's look at this scene. [07:09.000 --> 07:13.000] So I also want to quickly talk about Flink's API. [07:13.000 --> 07:17.000] So this is the API stack. [07:17.000 --> 07:24.000] The two main APIs are data stream API, table API or table table API. [07:24.000 --> 07:26.000] And there is also stateful functions. [07:26.000 --> 07:40.000] Stateful functions is a sub-project that tries to execute an actor model on a page of Flink, but will not go into detail here. [07:40.000 --> 07:48.000] So first of all, like all the APIs are built on a data flow runtime, so there's no batching or anything involved under the hood. [07:48.000 --> 07:50.000] It's really a data flow runtime. [07:50.000 --> 07:55.000] Whatever the result is ready, it will be streamed to the next operator. [07:55.000 --> 08:01.000] On top of that, there is a loader, the stream operator API, which you can use. [08:01.000 --> 08:03.000] But yeah, this is for X, of course, I would say. [08:03.000 --> 08:06.000] And then we have the mainstream APIs on top. [08:06.000 --> 08:15.000] And the specialty about table table API is that there is an optimizer on a planning stage in between. [08:15.000 --> 08:21.000] So the C helps you when you're not creating the most efficient pipelines. [08:21.000 --> 08:27.000] The optimizer will make sure that the streaming will be executed more efficiently. [08:27.000 --> 08:30.000] Yeah, let's also quickly look at the APIs. [08:30.000 --> 08:39.000] So this is like a basic example of creating a stream for just three elements. [08:39.000 --> 08:47.000] And then you're executing this on a cluster or in your IDE, then you're retrieving the result back. [08:47.000 --> 08:51.000] And you have an iterator locally, you can just print it locally. [08:51.000 --> 09:01.000] It's not very useful, but this is a minimal example of the Java API. [09:01.000 --> 09:11.000] The important thing is that the stream API basically exposes all the building blocks that I mentioned on my previous slide. [09:11.000 --> 09:22.000] So you can have very abstract operator typologies, and you can use built-in functions like map, process, and connect. [09:22.000 --> 09:28.000] Which each of them takes different functions, and then you can really define your business logic in those. [09:28.000 --> 09:33.000] They use different functions, and you can also use completely arbitrary Java records. [09:33.000 --> 09:40.000] For example, Python records that flow between the operator and conceptually. [09:40.000 --> 09:43.000] This is interesting when we talk about change data capture. [09:43.000 --> 09:47.000] Conceptually, the data stream API does not know about changes. [09:47.000 --> 09:53.000] It only knows about records, so there is no change flag or anything like that. [09:53.000 --> 09:59.000] So conceptually, the data stream API is an app that can only or insert only as long. [09:59.000 --> 10:06.000] And also when you look at the output, one, two, three, four, five, six. [10:06.000 --> 10:10.000] So let's take a look at table API and simple API. [10:10.000 --> 10:17.000] So usually you just say to this table API, order in SQL, because it's a unified API. [10:17.000 --> 10:22.000] You can decide whether you want to electrify your pipeline programmatically, [10:22.000 --> 10:28.000] or whether you want to use standard SQL for defining your topology. [10:28.000 --> 10:36.000] In the end, you also execute and you can also print locally in your IDP. [10:36.000 --> 10:40.000] Here, this API abstracts all building blocks. [10:40.000 --> 10:43.000] So you have no access to timers or state or anything like this. [10:43.000 --> 10:45.000] This will be on the foot. [10:45.000 --> 10:50.000] Also the operator topology is determined by the planner, not by you. [10:50.000 --> 10:55.000] The nice thing here is you can focus on your business logic. [10:55.000 --> 11:00.000] And you do this declaratively to optimize your business durations [11:00.000 --> 11:03.000] and make something out of it. [11:03.000 --> 11:11.000] Internally, it uses highly efficient records, also up to the engine, not to you. [11:11.000 --> 11:14.000] What you will see maybe is like a road type. [11:14.000 --> 11:18.000] If you really want to go out of table API, then you see a road type, [11:18.000 --> 11:22.000] which can work as a business program. [11:22.000 --> 11:27.000] And the interesting thing here is that conceptually, we are working with tables here, [11:27.000 --> 11:30.000] tables and views, you know, databases. [11:30.000 --> 11:34.000] But under the hood, there is actually a change level. [11:34.000 --> 11:38.000] And that's what I want to show in the following slides. [11:38.000 --> 11:43.000] But you can also see that, like for example, if you're disappearing here, [11:43.000 --> 11:48.000] you will get this output when you run it in the IDQ. [11:48.000 --> 11:55.000] And you already see that there is, of course, an F0 column with the 123 output. [11:55.000 --> 11:58.000] But there is an additional column, first column, [11:58.000 --> 12:05.000] which already shows that there is some change like attached to every record. [12:05.000 --> 12:09.000] In this case, it's just insert. [12:09.000 --> 12:14.000] The nice thing about Linux APIs is that you can mix and match them. [12:14.000 --> 12:18.000] So you can, for example, start with the S3 API, [12:18.000 --> 12:21.000] and then you go to table API, or the other way around. [12:21.000 --> 12:24.000] If you have SQL, you can do the detail in SQL first. [12:24.000 --> 12:27.000] And then if you have some more complex logic, [12:27.000 --> 12:31.000] like timer services, or like a very complex state, [12:31.000 --> 12:35.000] or whatever, then you can go to the S3 API, do it there, [12:35.000 --> 12:39.000] and then you can switch back to SQL, or you can just use the SQL for Nectar. [12:39.000 --> 12:42.000] But you find the entire pipeline in the S3 API. [12:42.000 --> 12:45.000] So that is up to you. [12:45.000 --> 12:55.000] But yeah, the APIs for that are present to go back and forth between those two. [12:55.000 --> 13:03.000] So now let's really talk about change.s3 processing. [13:03.000 --> 13:06.000] If you think about data processing, [13:06.000 --> 13:08.000] like in most of the cases, [13:08.000 --> 13:12.000] the main data processing is always consuming a stream of changes. [13:12.000 --> 13:17.000] Because if this would not be like a continuous input stream, [13:17.000 --> 13:21.000] then your company, your project, whatever it would actually be, [13:21.000 --> 13:22.000] right? [13:22.000 --> 13:27.000] So like it is actually very common that data flows in continuously. [13:27.000 --> 13:35.000] And the Flinky APIs, the Flink runtime sees everything basically as a stream. [13:35.000 --> 13:40.000] And it just distinguishes between a bounded stream and unbounded stream. [13:40.000 --> 13:45.000] So bounded means you define a start and an end, [13:45.000 --> 13:48.000] and the end was coming there. [13:48.000 --> 13:52.000] Unbounded means you start somewhere, [13:52.000 --> 13:54.000] and now it can be somewhere in the past, [13:54.000 --> 13:57.000] and then you start processing the future. [13:57.000 --> 13:59.000] So this is up to you. [13:59.000 --> 14:05.000] Yeah, if you really think a bit about this, [14:05.000 --> 14:10.000] actually batch processing is just a special case of stream processing. [14:10.000 --> 14:16.000] So batch processing means that through the bounded nature of the stream, [14:16.000 --> 14:21.000] I can maybe do some more specialized operators like sorting, for example. [14:21.000 --> 14:24.000] It's easier in such a thing. [14:24.000 --> 14:29.000] And you can also use different algorithms if you have sorted like this to a sort of a join, [14:29.000 --> 14:30.000] or something like this. [14:30.000 --> 14:36.000] So that the runtime has special operators and special handling of bounded streams. [14:36.000 --> 14:41.000] But in general, you can process everything for the stream. [14:41.000 --> 14:47.000] So both bounded and unbounded data. [14:47.000 --> 14:51.000] So how does actually things look like? [14:51.000 --> 14:55.000] So how can I work with streams and things like that? [14:55.000 --> 14:59.000] So the first answer to this, or I mentioned before, [14:59.000 --> 15:01.000] you actually don't work with streams. [15:01.000 --> 15:04.000] So what you work with is dynamic tables. [15:04.000 --> 15:07.000] So this is just a concept we call the dynamic tables. [15:07.000 --> 15:12.000] It's a concept similar to materialized views and materialized view maintenance. [15:12.000 --> 15:17.000] So what you do as a user is you define your tables. [15:17.000 --> 15:21.000] So on the left side, we have transactions on the right side. [15:21.000 --> 15:22.000] We have maybe revenue. [15:22.000 --> 15:28.000] And then you define, in the middle, you define a standing, running SQL31, [15:28.000 --> 15:35.000] which gets translated into a pipeline methodology next to the private branch. [15:35.000 --> 15:40.000] So then the question is, OK, if we have this big SQL kind of a database, [15:40.000 --> 15:44.000] and the answer to that is no, it's not a database, [15:44.000 --> 15:46.000] because we are not in charge of the data. [15:46.000 --> 15:49.000] So you can bring your own data and your own systems. [15:49.000 --> 15:52.000] So it's more like a process. [15:52.000 --> 15:55.000] It leads from all different kinds of systems. [15:55.000 --> 16:07.000] So if a table is not a stream, or if I don't work with streams, [16:07.000 --> 16:10.000] how does that actually relate with each other? [16:10.000 --> 16:20.000] And an interesting piece of interesting term here is called stream table duality. [16:20.000 --> 16:27.000] So you can basically see a stream as the change log of a continuously changing table. [16:27.000 --> 16:31.000] So it is possible, and I will also show an example shortly, [16:31.000 --> 16:36.000] that you can convert from a table into a stream and from a stream into a table. [16:36.000 --> 16:40.000] You can do a back and forth mapping after this possible. [16:40.000 --> 16:44.000] Usually, as a user, you don't see that. [16:44.000 --> 16:49.000] Under the hood, the runtime, all the sources, all the things, all the operators, [16:49.000 --> 16:52.000] they work with change logs under the hood. [16:52.000 --> 16:57.000] So in Flink, we have four different kinds of change tags for each record. [16:57.000 --> 17:07.000] So we have insertions, insertions are also the default input and output for bounded hash queries. [17:07.000 --> 17:13.000] And then we have update four, which basically removes a previously inputted result. [17:13.000 --> 17:19.000] Then we have update after to update something. [17:19.000 --> 17:28.000] And then we have to feed one of the last results. [17:28.000 --> 17:35.000] When we see only insert only in a log, then we call this an only or insert only log. [17:35.000 --> 17:40.000] If it contains some kind of division or update before, [17:40.000 --> 17:44.000] we call this updating table or an updating stream. [17:44.000 --> 17:52.000] And if it never contains an update before, but only update afters, [17:52.000 --> 17:59.000] then there's a primary key involved, and then we call this absurdity. [17:59.000 --> 18:02.000] So let me make a quick example. [18:02.000 --> 18:06.000] So again, we have on the left side those actions on the right side with value, [18:06.000 --> 18:11.000] and in the middle we have summing and sumproving by name of different sections. [18:11.000 --> 18:19.000] So what happens now is, like in the logical table, there is a new record coming in called Alice. [18:19.000 --> 18:23.000] This is how it will be represented in the change tag under the hood. [18:23.000 --> 18:26.000] And then this is what comes out. [18:26.000 --> 18:28.000] So we are summing here. [18:28.000 --> 18:33.000] So 256 is the first result that we are already looking at. [18:33.000 --> 18:35.000] So now the next variable comes in. [18:35.000 --> 18:41.000] Again, it will be added also to the last table. [18:41.000 --> 18:43.000] But now it comes in. [18:43.000 --> 18:46.000] There's another Alice, and we want to move like this. [18:46.000 --> 18:54.000] So that means the sum is not updated or we need to update the sum to the newest number. [18:54.000 --> 18:59.000] That means, first, we have to remove the old record. [18:59.000 --> 19:03.000] And if we want to materialize our change log into a table, [19:03.000 --> 19:06.000] so we also have to remove the row in the table. [19:06.000 --> 19:13.000] And then we can finally add the updated row in the table. [19:13.000 --> 19:15.000] And this is what the change log looks like. [19:15.000 --> 19:21.000] And if you would apply this change log to a SQL or to some key values there, [19:21.000 --> 19:29.000] or like the search or so, then the result would be there. [19:29.000 --> 19:36.000] And if we would define a primary key on the sync table, [19:36.000 --> 19:39.000] actually we don't need this update before, [19:39.000 --> 19:42.000] because then it would be now searching operation. [19:42.000 --> 19:47.000] And yeah, we can basically save 50% of traffic [19:47.000 --> 19:55.000] if we do not want to support that with the rows in the sync table. [19:55.000 --> 20:01.000] So I already mentioned that each sync and each source, [20:01.000 --> 20:05.000] that they declare a change log model which changes they and they can't consume. [20:05.000 --> 20:10.000] And yeah, I give like a quick example of various connectors. [20:10.000 --> 20:13.000] So when we, for example, read from a file system, [20:13.000 --> 20:15.000] this is usually a scan operation. [20:15.000 --> 20:20.000] So it is very common that when you read from a file that this is just insert only. [20:20.000 --> 20:23.000] There are no updates coming through the file system. [20:23.000 --> 20:27.000] Sometimes they do, but in the general case, [20:27.000 --> 20:31.000] you just scan through the Kafka file for example. [20:31.000 --> 20:34.000] Okay, so Kafka, in the early days, [20:34.000 --> 20:37.000] Kafka was actually just as a log for every N record [20:37.000 --> 20:42.000] that came in through Kafka was also considered like an insert only record. [20:42.000 --> 20:47.000] Then later, Kafka also added some absurd functionality. [20:47.000 --> 20:50.000] So we also have a connector called Kafka Absurd for that. [20:50.000 --> 20:56.000] That means when a value in Kafka is null, it means addition. [20:56.000 --> 21:00.000] So the Kafka Absurd connector, for example, [21:00.000 --> 21:07.000] would produce insertions and divisions. [21:07.000 --> 21:09.000] If you define the JVC connector, [21:09.000 --> 21:13.000] JVC also doesn't have this concept of updates. [21:13.000 --> 21:16.000] So in the same case, we would scan the entire table [21:16.000 --> 21:20.000] and just scan to only produce insertions. [21:20.000 --> 21:23.000] So we have all the insertions for JVC. [21:23.000 --> 21:26.000] But that comes like the most complex case. [21:26.000 --> 21:29.000] What happens if you use, for example, an easy one? [21:29.000 --> 21:35.000] You connect it to the database to consume the change of this particular from the database. [21:35.000 --> 21:39.000] You put this into Kafka and then you consume from Kafka. [21:39.000 --> 21:43.000] In this case, for example, this could, for example, [21:43.000 --> 21:49.000] all kinds of changes that can then be evaluated by the end. [21:49.000 --> 21:54.000] The optimizer basically tracks the changes through the entire topology [21:54.000 --> 21:58.000] and the sync prepares what it can digest. [21:58.000 --> 22:01.000] The optimizer could react sometimes with an error message, [22:01.000 --> 22:05.000] but sometimes there's more to it. [22:05.000 --> 22:10.000] So let's quickly also talk about these two different modes. [22:10.000 --> 22:13.000] So I already said that sometimes you can do upserts [22:13.000 --> 22:17.000] where there's no update before. [22:17.000 --> 22:21.000] And sometimes you need all four kind of changes. [22:21.000 --> 22:24.000] And this is called like retract versus absurd. [22:24.000 --> 22:30.000] So retract has this nice property that there is no primary key required. [22:30.000 --> 22:35.000] This works for almost all external systems, which is great. [22:35.000 --> 22:40.000] You can also support up with the pros, which you cannot support in the upsert. [22:40.000 --> 22:43.000] The table is called an absurd table. [22:43.000 --> 22:46.000] And interestingly, also retracts, [22:46.000 --> 22:52.000] so like this retracting of the previous admitted record [22:52.000 --> 22:55.000] is actually often required in distributed systems. [22:55.000 --> 23:00.000] And I also have a little example on the right side, but I will show shortly. [23:00.000 --> 23:04.000] So let me definitely explain this first. [23:04.000 --> 23:06.000] So this is a count of a count. [23:06.000 --> 23:09.000] So we are creating an histogram. [23:09.000 --> 23:12.000] The lexical variable itself is not so important. [23:12.000 --> 23:13.000] What is important? [23:13.000 --> 23:19.000] What is actually flowing in the cluster through the operators? [23:19.000 --> 23:22.000] So whatever record comes in, [23:22.000 --> 23:27.000] the first operator will identify this. [23:27.000 --> 23:30.000] Okay, this is the first time, so the count is one. [23:30.000 --> 23:36.000] And then since we want to do the count of a count, the next operator [23:36.000 --> 23:40.000] will also count this as a one, and it will keep some state. [23:40.000 --> 23:47.000] How many records have I seen for this particular count? [23:47.000 --> 23:51.000] So now comes the second record in. [23:51.000 --> 23:53.000] And we have to after the count. [23:53.000 --> 23:56.000] So now the count is two, but one anymore. [23:56.000 --> 24:00.000] And interestingly, if we do a hash partition for some collectors, [24:00.000 --> 24:05.000] it might be that the count ends up at a completely different operator. [24:05.000 --> 24:08.000] But what happens with the old count? [24:08.000 --> 24:12.000] So now you have two threads or two operators, [24:12.000 --> 24:15.000] parallel instances of the operator that have a count [24:15.000 --> 24:21.000] and that they need to remove the count in the other operator. [24:21.000 --> 24:24.000] This is why this case rejection is required, [24:24.000 --> 24:27.000] because the update before needs to go to the subclass one [24:27.000 --> 24:31.000] and remove the man outdated record. [24:31.000 --> 24:35.000] But in general, absurd is an optimization. [24:35.000 --> 24:37.000] It reduces traffic, reduces computation. [24:37.000 --> 24:40.000] And if it's possible, it's great, [24:40.000 --> 24:46.000] but usually there is a lot of reflections flowing on in the... [24:46.000 --> 24:50.000] And also have some examples here. [24:50.000 --> 24:57.000] Like if you would do an explain on some SQL query in the SQL, [24:57.000 --> 25:01.000] the bottom part is what you would see. [25:01.000 --> 25:05.000] So let's assume we have a table of transactions and a table of payment [25:05.000 --> 25:07.000] and a table of result. [25:07.000 --> 25:09.000] The table of result can consume all kinds of changes. [25:09.000 --> 25:11.000] I just took my table also. [25:11.000 --> 25:16.000] And you join transactions and payments. [25:16.000 --> 25:20.000] And in the explain, you also see that there is... [25:20.000 --> 25:24.000] You can get information in the explain about the change of mode. [25:24.000 --> 25:29.000] For example, if the input here is insert only, [25:29.000 --> 25:36.000] insert only, then also the join will produce insert only result. [25:36.000 --> 25:40.000] And for example, if we do an outer join, [25:40.000 --> 25:42.000] in this case the left outer join, [25:42.000 --> 25:44.000] then things become a bit more complex. [25:44.000 --> 25:47.000] Here you have insert only, insert only, [25:47.000 --> 25:50.000] but since that outer join will emit another first, [25:50.000 --> 25:53.000] like if there's one thing, like one record comes in, [25:53.000 --> 25:55.000] there's no matching record so far, [25:55.000 --> 25:58.000] and you have to emit another first for the other side, [25:58.000 --> 26:00.000] and then when the other side is coming in, [26:00.000 --> 26:02.000] then you have to remove another again [26:02.000 --> 26:05.000] and emit the final result. [26:05.000 --> 26:07.000] And that's why, for example, here, [26:07.000 --> 26:13.000] you have all kinds of changes coming out of the join. [26:13.000 --> 26:15.000] And then we can even make it more complicated. [26:15.000 --> 26:18.000] What happens if we define a primary key [26:18.000 --> 26:22.000] on transactions and payments? [26:22.000 --> 26:25.000] Then the optimizer will recognize, [26:25.000 --> 26:28.000] okay, that input spec and the right input spec [26:28.000 --> 26:30.000] will contain now a key key. [26:30.000 --> 26:31.000] That is great. [26:31.000 --> 26:33.000] So I can remove the update before, [26:33.000 --> 26:35.000] so you can see that there is one, [26:35.000 --> 26:37.000] that particular is not necessary anymore, [26:37.000 --> 26:44.000] because we can do upserts on the results. [26:44.000 --> 26:50.000] So this query is obviously more efficient than the other one. [26:50.000 --> 26:52.000] And the other good optimizer can also range [26:52.000 --> 26:55.000] between those different modes. [26:55.000 --> 26:57.000] I don't want to get details here, [26:57.000 --> 26:59.000] but if it's possible, like it's necessary, [26:59.000 --> 27:04.000] you can go from updating to the collection. [27:04.000 --> 27:08.000] But that's not also under the course of the [27:08.000 --> 27:10.000] information. [27:10.000 --> 27:13.000] And depending on the operators, [27:13.000 --> 27:18.000] you also can switch between these modes. [27:18.000 --> 27:21.000] So for example, if you have a regular join, [27:21.000 --> 27:23.000] to append only tables, [27:23.000 --> 27:26.000] then also the resulting table will be append only. [27:26.000 --> 27:28.000] And I showed already that [27:28.000 --> 27:30.000] if there's one of the tables updating, [27:30.000 --> 27:32.000] the results will be updating. [27:32.000 --> 27:34.000] And if there's some outer join, [27:34.000 --> 27:36.000] then the result is always updating. [27:36.000 --> 27:40.000] And now comes the interesting part. [27:40.000 --> 27:43.000] If you have append only table, [27:43.000 --> 27:45.000] and you join it to an updating table, [27:45.000 --> 27:47.000] there is a special kind of join, [27:47.000 --> 27:49.000] which we call temporal join. [27:49.000 --> 27:51.000] A temporal join will actually produce [27:51.000 --> 27:54.000] an append only table, [27:54.000 --> 27:57.000] because it looks at the table [27:57.000 --> 27:59.000] at a point, a specific point in time. [27:59.000 --> 28:02.000] That's a very interesting operator. [28:02.000 --> 28:04.000] Unfortunately, we don't have enough time, [28:04.000 --> 28:07.000] but I just want to show you an example [28:07.000 --> 28:12.000] of this very, very useful join operator. [28:12.000 --> 28:16.000] So let's assume we have some orders table, [28:16.000 --> 28:18.000] and orders have a currency, [28:18.000 --> 28:20.000] and there is a currency rates table. [28:20.000 --> 28:24.000] And obviously, you don't want to join those two tables [28:24.000 --> 28:27.000] with the latest currency rates, [28:27.000 --> 28:29.000] but you actually want to know [28:29.000 --> 28:31.000] what was the currency rate at the time [28:31.000 --> 28:34.000] when the order was created. [28:34.000 --> 28:37.000] And this syntax here with the persistent time as of [28:37.000 --> 28:40.000] actually allows you to consume [28:40.000 --> 28:43.000] all the changes from the rates table [28:43.000 --> 28:45.000] and join it with orders [28:45.000 --> 28:47.000] at the point on the order. [28:47.000 --> 28:49.000] This is just one example [28:49.000 --> 28:53.000] of a very sophisticated join operation. [28:53.000 --> 28:55.000] And by the way, for system time as of this season, [28:55.000 --> 28:57.000] we're going to have to be able to understand [28:57.000 --> 29:00.000] so carefully how we see what we're going to do. [29:00.000 --> 29:02.000] So I also have prepared a demo. [29:02.000 --> 29:04.000] I think we still have seven minutes left, [29:04.000 --> 29:09.000] so it should be good to see. [29:09.000 --> 29:13.000] So I also want to show you some of the CPC capabilities. [29:13.000 --> 29:15.000] So I will run everything in my IDE. [29:15.000 --> 29:18.000] I will use Java for this example. [29:18.000 --> 29:22.000] I have a MySQL container, [29:22.000 --> 29:26.000] and I'm running it, so we'll start with a SQL container. [29:26.000 --> 29:31.000] I'm processing it. [29:31.000 --> 29:38.000] And this container will create a MySQL instance, [29:38.000 --> 29:40.000] and it will also be filled already [29:40.000 --> 29:43.000] with, I think, three or four rows. [29:43.000 --> 29:46.000] I have a few examples here. [29:46.000 --> 29:48.000] I can simply run the examples [29:48.000 --> 29:50.000] and the main method of the IDE. [29:50.000 --> 29:52.000] So what I'm doing here [29:52.000 --> 29:55.000] is I'm creating different tables to connect to a SQL. [29:55.000 --> 29:57.000] One is a JPC one, [29:57.000 --> 29:59.000] which fully screens the table once, [29:59.000 --> 30:01.000] and the other one is a CPC one, [30:01.000 --> 30:03.000] which, like, continuously [30:03.000 --> 30:08.000] monitors the tables and the JPC. [30:08.000 --> 30:11.000] So let's just run this. [30:11.000 --> 30:26.000] So here we see the first three results. [30:26.000 --> 30:29.000] As you can see, the application has not stopped. [30:29.000 --> 30:32.000] So it is waiting for more records to come, [30:32.000 --> 30:35.000] and now I want to insert more values into MySQL. [30:35.000 --> 30:40.000] I could have used MySQL to see a line for that, [30:40.000 --> 30:43.000] but I can also use my SQL to see what I see. [30:43.000 --> 30:47.000] So I am having a regular, [30:47.000 --> 30:51.000] I can set into transaction JPC, values, blah, blah, blah, [30:51.000 --> 30:54.000] and I can run this main method here. [30:54.000 --> 30:57.000] So it's a bit overkill to use Spring for that, [30:57.000 --> 30:59.000] for just one value, [30:59.000 --> 31:01.000] but I think it's flexible enough [31:01.000 --> 31:04.000] you can also use it for the hash query [31:04.000 --> 31:08.000] of just setting one record into the database, [31:08.000 --> 31:10.000] and as you can see, [31:10.000 --> 31:14.000] we can show my SQL and from my SQL, [31:14.000 --> 31:18.000] via CPC to the link, [31:18.000 --> 31:21.000] and then to the next CPC. [31:21.000 --> 31:24.000] We have also more sophisticated examples here, [31:24.000 --> 31:28.000] and I don't think that we have more time for that, [31:28.000 --> 31:32.000] but we can do a lot of things with Spring SQL. [31:32.000 --> 31:34.000] I think I could spend a day [31:34.000 --> 31:39.000] and talk a little bit more about the way this works. [31:39.000 --> 31:43.000] So, put on the grid. [31:43.000 --> 31:46.000] Yeah, Spring SQL and it is very powerful, [31:46.000 --> 31:50.000] has been crafted over years and years and years, [31:50.000 --> 31:53.000] by many, many companies, many teams. [31:53.000 --> 31:57.000] It's very flexible for integration, [31:57.000 --> 32:01.000] for integrating various systems of different semantics, [32:01.000 --> 32:04.000] and there is way more. [32:04.000 --> 32:07.000] So, I just showed some operators, [32:07.000 --> 32:11.000] but we have a large, large coverage of SQL standard, [32:11.000 --> 32:14.000] so over-windows support for aggregating, [32:14.000 --> 32:18.000] for Spring, we support the recognized laws [32:18.000 --> 32:21.000] for pattern matching and complex plan processing. [32:21.000 --> 32:25.000] We have time for obsessions, windows for like, [32:25.000 --> 32:27.000] cutting your screen into pieces. [32:27.000 --> 32:31.000] Then there is a huge CPC connector ecosystem, [32:31.000 --> 32:33.000] not part of the fourth thing, [32:33.000 --> 32:36.000] but also quite useful as a little think-of-stars already. [32:36.000 --> 32:39.000] Then, something new, fatal store, [32:39.000 --> 32:43.000] which tries to be like the first streaming data warehouse [32:43.000 --> 32:45.000] kind of thing. [32:45.000 --> 32:48.000] It's not a very early version, but it's very promising. [32:48.000 --> 32:51.000] So, yeah, I would recommend to, yeah, [32:51.000 --> 32:54.000] maybe look into one of these sub-projects as well. [32:54.000 --> 32:58.000] Not only things, it's big, but also the ecosystem, [32:58.000 --> 33:02.000] around the thing, the growths and growths and growths. [33:02.000 --> 33:04.000] I'm happy to take questions. [33:04.000 --> 33:06.000] I think we have three minutes left, [33:06.000 --> 33:09.000] but otherwise I will also speak outside for any questions. [33:09.000 --> 33:11.000] Thank you very much. [33:11.000 --> 33:13.000] Thank you. [33:42.000 --> 33:44.000] Yeah, so like... [33:44.000 --> 33:46.000] Can you please repeat the question? [33:46.000 --> 33:48.000] The question is like, how does it, like, [33:48.000 --> 33:51.000] handle like, transactions that also take a lot of time [33:51.000 --> 33:54.000] before the transaction has ended. [33:54.000 --> 33:57.000] So, in general, I think we are not very good at [33:57.000 --> 34:00.000] transaction handling in things, [34:00.000 --> 34:03.000] but like you have with Data Stream API, [34:03.000 --> 34:05.000] you have a lot of possibilities to, [34:05.000 --> 34:08.000] like for example, you can buffer all the data from this [34:08.000 --> 34:13.000] transaction and stay at it after terrible time you want to, [34:13.000 --> 34:16.000] and then just wait until the transaction closes, [34:16.000 --> 34:20.000] and then you're creating the execution of the transaction, [34:20.000 --> 34:21.000] and that is possible. [34:21.000 --> 34:24.000] So, yeah, personally, I would maybe do some stuff [34:24.000 --> 34:27.000] in the Data Stream API first until the transaction ended, [34:27.000 --> 34:29.000] and then push that. [34:29.000 --> 34:34.000] Thank you. [34:34.000 --> 34:36.000] Yeah, thank you both. [34:36.000 --> 34:38.000] So, in terms of running times, [34:38.000 --> 34:41.000] do I just think from bottom all, [34:41.000 --> 34:45.000] if it's an hybrid, I shift my interpretation? [34:45.000 --> 34:47.000] No, I think it's creating its own, [34:47.000 --> 34:49.000] its own runtime. [34:49.000 --> 34:51.000] So, you will be able to do mutual apps, [34:51.000 --> 34:53.000] some types of software, [34:53.000 --> 34:55.000] some types of software, [34:55.000 --> 34:57.000] some types of software, [34:57.000 --> 34:59.000] and then just look at the, [34:59.000 --> 35:02.000] like, I don't know if there's a lot of data in the stream. [35:02.000 --> 35:03.000] No, I don't. [35:03.000 --> 35:04.000] Is that all there? [35:04.000 --> 35:05.000] Okay. [35:05.000 --> 35:06.000] Let me check. [35:06.000 --> 35:08.000] There's a library that's called the HGIS, [35:08.000 --> 35:10.000] which is not for you to be used, [35:10.000 --> 35:12.000] but we can also buy it, [35:12.000 --> 35:14.000] and that's the idea for a line. [35:14.000 --> 35:15.000] But in general, yeah, [35:15.000 --> 35:18.000] the platform, it's rather a platform, [35:18.000 --> 35:20.000] that's where it looks. [35:22.000 --> 35:23.000] Now, the next logical question, [35:23.000 --> 35:25.000] how far does it scale? [35:25.000 --> 35:27.000] I would say, [35:27.000 --> 35:29.000] as the question was, [35:29.000 --> 35:31.000] how far does it scale, [35:33.000 --> 35:35.000] I can tell you that [35:35.000 --> 35:37.000] probably from the system, [35:37.000 --> 35:39.000] and like, [35:39.000 --> 35:41.000] there's single state, [35:41.000 --> 35:43.000] there's a billion, [35:43.000 --> 35:44.000] all of us, [35:44.000 --> 35:46.000] and all of us in the day or so. [35:47.000 --> 35:49.000] Yeah, and there is Apple, [35:49.000 --> 35:51.000] that processes things, [35:51.000 --> 35:53.000] like all the big banks [35:53.000 --> 35:56.000] use it for credit card fraud detection [35:56.000 --> 35:57.000] and stuff like that. [35:57.000 --> 36:00.000] So, I don't think that, [36:00.000 --> 36:03.000] like most companies in this room, [36:03.000 --> 36:06.000] they will not reach the scalability limits of Link, [36:06.000 --> 36:08.000] because yeah, we are not at, [36:08.000 --> 36:11.000] unless here some Apple or Alibaba people [36:11.000 --> 36:12.000] are in this room, [36:12.000 --> 36:14.000] then maybe, I don't know. [36:17.000 --> 36:19.000] You said that the frame does not own data, [36:19.000 --> 36:21.000] but then there is this table store project, [36:21.000 --> 36:23.000] so is it like move towards, [36:23.000 --> 36:25.000] you know, more like ownership? [36:25.000 --> 36:27.000] Yeah, this table store is a very, [36:27.000 --> 36:29.000] very interesting approach, [36:29.000 --> 36:31.000] like it started last year, [36:31.000 --> 36:32.000] or two years ago, [36:32.000 --> 36:33.000] it's rather new. [36:33.000 --> 36:35.000] I think it was last year, early last year. [36:36.000 --> 36:39.000] It doesn't really fit to Apache Flink, [36:39.000 --> 36:41.000] but it's still, it's very useful, [36:41.000 --> 36:43.000] and yeah, we will see, [36:43.000 --> 36:45.000] maybe it will leave the, [36:45.000 --> 36:47.000] maybe it will leave the Apache Software Foundation soon, [36:47.000 --> 36:49.000] but yeah, not allowed to, [36:49.000 --> 36:51.000] and not the Software Foundation, [36:51.000 --> 36:53.000] but the Flink project itself, [36:53.000 --> 36:54.000] we will see, [36:56.000 --> 36:58.000] because it doesn't fit really well, [36:58.000 --> 36:59.000] but it's in general, [36:59.000 --> 37:02.000] like we still have this vision of Flink as a database. [37:02.000 --> 37:03.000] Yeah, we will see. [37:03.000 --> 37:04.000] Thank you. [37:06.000 --> 37:07.000] Sir, a question. [37:07.000 --> 37:09.000] This is about big states, [37:09.000 --> 37:10.000] because you mentioned that you can have like [37:10.000 --> 37:11.000] terabytes of state, [37:11.000 --> 37:14.000] but when you create a checkpoint, [37:14.000 --> 37:17.000] and if this checkpoint will be very big, [37:17.000 --> 37:20.000] and storage of it can be long, [37:20.000 --> 37:23.000] is it like a huge DC post to write into the store? [37:23.000 --> 37:24.000] Yeah, so the question was, [37:24.000 --> 37:28.000] like how can we actually snapshot large state in general? [37:28.000 --> 37:31.000] And this is exactly where Flink distinguishes, [37:31.000 --> 37:34.000] like where it differs from competitors, [37:34.000 --> 37:36.000] because there is a lot of engineering involved [37:36.000 --> 37:39.000] to make this as efficient as possible. [37:39.000 --> 37:41.000] I'm sure there's even more to do, [37:41.000 --> 37:43.000] it's still not perfectly efficient, [37:43.000 --> 37:45.000] there's more optimizations that you can do, [37:45.000 --> 37:49.000] but for example, there is like differential snapshots [37:49.000 --> 37:53.000] involved, there is local recovery involved, [37:53.000 --> 37:57.000] or like there are many, many algorithms under the hood [37:57.000 --> 38:00.000] to make it as quickly as possible. [38:00.000 --> 38:01.000] But yeah, of course, [38:01.000 --> 38:03.000] like if you have terabytes of state, [38:03.000 --> 38:05.000] and the machine has died completely, [38:05.000 --> 38:09.000] and then you obviously need to restore these terabytes [38:09.000 --> 38:13.000] of state from S3 into your task manager again, [38:13.000 --> 38:15.000] and this can take time. [38:15.000 --> 38:17.000] So like it tries its best, [38:17.000 --> 38:18.000] but yeah, of course, [38:18.000 --> 38:21.000] you need to do benchmarks for your use case. [38:29.000 --> 38:30.000] One last question. [38:39.000 --> 38:42.000] Yeah, so we guarantee exactly once and to end [38:42.000 --> 38:46.000] if the connectors, source and sync support that, [38:46.000 --> 38:48.000] like especially for state, [38:48.000 --> 38:50.000] so there are no duplicates in state, [38:50.000 --> 38:54.000] we might need to reprocess data during a failure, [38:54.000 --> 38:56.000] but yeah, like end to end, [38:56.000 --> 38:59.000] exactly once semantics are possible. [39:01.000 --> 39:02.000] Okay, then yeah, thank you very much, [39:02.000 --> 39:12.000] and I'm waiting outside.