[00:00.000 --> 00:07.000] Okay, you're good to go. [00:07.000 --> 00:09.000] Thank you. [00:09.000 --> 00:10.000] Okay. [00:10.000 --> 00:11.000] Thanks. [00:11.000 --> 00:21.000] I think the technical issues are now solved. [00:21.000 --> 00:25.000] Thanks again, everyone, for being here. [00:25.000 --> 00:27.000] So I'm going to talk today about Apache VIN. [00:27.000 --> 00:29.000] Apache VIN is a framework for data processing that runs [00:29.000 --> 00:34.000] on top of several platforms, and it's especially meant [00:34.000 --> 00:36.000] for doing streaming analytics. [00:36.000 --> 00:38.000] So Javier already introduced me. [00:38.000 --> 00:39.000] My name is Israel. [00:39.000 --> 00:43.000] So I work as a cloud data engineer in Google Cloud, [00:43.000 --> 00:46.000] helping customers doing data engineering on top of Google [00:46.000 --> 00:47.000] Cloud. [00:47.000 --> 00:49.000] A lot of the work that I do is actually helping customers [00:49.000 --> 00:52.000] with Apache VIN, and particularly Dataflow, [00:52.000 --> 00:55.000] which is our runner for Apache VIN. [00:55.000 --> 00:58.000] So Apache VIN, what is it? [00:59.000 --> 01:01.000] Apache VIN is a framework for data processing. [01:01.000 --> 01:05.000] It allows to run data pipelines, like Flink, like Spark, [01:05.000 --> 01:09.000] like so many other big data systems. [01:09.000 --> 01:11.000] It has two main features. [01:11.000 --> 01:15.000] The first one is that it's a unified computing model [01:15.000 --> 01:17.000] for batch and streaming. [01:17.000 --> 01:21.000] Any pipeline that you have written in Apache VIN for a batch [01:21.000 --> 01:25.000] use case, you may easily use it in streaming as well. [01:25.000 --> 01:28.000] So the same code, you reuse all that code, [01:28.000 --> 01:31.000] and you have to add some small additions [01:31.000 --> 01:34.000] that we are going to talk about in a bit. [01:34.000 --> 01:39.000] And the other main feature is that it runs everywhere, [01:39.000 --> 01:42.000] and you can run, you can write your pipeline in many different [01:42.000 --> 01:44.000] languages, for some definition of everywhere. [01:44.000 --> 01:51.000] So you can write your pipeline in Java, in Python, in Go. [01:51.000 --> 01:55.000] You may also run your pipeline in any of the programming [01:55.000 --> 01:58.000] languages of the Java Victor machine, for instance. [01:58.000 --> 02:02.000] So I have here highlighted Scala, because that's a framework [02:02.000 --> 02:05.000] called SIO, the Don't Buy Spotify, on top of Apache VIN [02:05.000 --> 02:10.000] for, let's say, Scala native development of pipeline. [02:10.000 --> 02:17.000] So you don't have to use Java looking code in Scala. [02:17.000 --> 02:22.000] So it's a functional code. [02:22.000 --> 02:25.000] There are lots of people using, for instance, Kotlin also, [02:25.000 --> 02:31.000] on top of the Java Victor machine with the Java VIN SDK. [02:31.000 --> 02:34.000] So that's about the programming language that you may use. [02:34.000 --> 02:39.000] And you may run a VIN pipeline on top of runners. [02:39.000 --> 02:43.000] So there's a direct runner for running and local testing [02:43.000 --> 02:44.000] pipelines. [02:44.000 --> 02:48.000] It's not meant for, let's say, to be used, let's say, [02:48.000 --> 02:50.000] with real world use cases. [02:50.000 --> 02:52.000] But then you can run your pipeline on top of Dataflow, [02:52.000 --> 02:56.000] on top of Flink, on top of Hazelcast, Spark, [02:56.000 --> 02:57.000] many different runners. [02:57.000 --> 02:59.000] So basically, when you write the pipeline in Apache VIN, [02:59.000 --> 03:02.000] you are not tied to the platform where you're running. [03:02.000 --> 03:04.000] So you may move it to different platforms, [03:04.000 --> 03:06.000] which are minor comments. [03:06.000 --> 03:09.000] So Apache VIN is a theoretical model for computing. [03:09.000 --> 03:12.000] Not all the runners implement the model, [03:12.000 --> 03:15.000] let's say, to the same degree of extent. [03:15.000 --> 03:19.000] So right now, as of now, I would say Dataflow and Flink [03:19.000 --> 03:24.000] are probably the ones that are fully covered, [03:24.000 --> 03:27.000] or the runners may have some gaps. [03:27.000 --> 03:28.000] See the example. [03:28.000 --> 03:29.000] Hadoop. [03:29.000 --> 03:31.000] You may run Apache VIN pipeline on top of Hadoop also, [03:31.000 --> 03:33.000] but you cannot run streaming on top of Hadoop [03:33.000 --> 03:35.000] because it doesn't support streaming. [03:35.000 --> 03:37.000] So it also depends on the capability of the runner. [03:37.000 --> 03:39.000] So what you're able to do with VIN, [03:39.000 --> 03:41.000] it depends on the capabilities of the runner. [03:41.000 --> 03:42.000] So there's no magic. [03:42.000 --> 03:43.000] So if VIN is batch and streaming, [03:43.000 --> 03:45.000] but if your platform doesn't support streaming, for instance, [03:45.000 --> 03:47.000] so you cannot do streaming. [03:47.000 --> 03:49.000] So let's talk about streaming. [03:49.000 --> 03:51.000] What's the problem with streaming? [03:51.000 --> 03:52.000] It's extremely interesting. [03:52.000 --> 03:55.000] So in streaming, you are getting data, a lot of data, continuously. [03:55.000 --> 03:58.000] There's no beginning, and there's no end. [03:58.000 --> 04:00.000] So you cannot know in advance where [04:00.000 --> 04:03.000] are the boundaries of your data. [04:03.000 --> 04:06.000] It's coming continuously from many different places. [04:06.000 --> 04:07.000] Think, I don't know. [04:07.000 --> 04:11.000] Like you are designing a mobile application for a game [04:11.000 --> 04:14.000] or whatever, and people are using it and sending events [04:14.000 --> 04:18.000] to your systems every once in a while. [04:18.000 --> 04:24.000] So because the application is deployed in the wild world, [04:24.000 --> 04:26.000] data will come. [04:26.000 --> 04:28.000] Who knows how? [04:28.000 --> 04:30.000] So it will come out of order. [04:30.000 --> 04:33.000] So some users will be in the underground [04:33.000 --> 04:35.000] and without the phone coverage. [04:35.000 --> 04:37.000] They still try attempting to send in events, [04:37.000 --> 04:40.000] and then they will send the events late. [04:40.000 --> 04:43.000] Like, for instance, here, let's see if I can put the pointer. [04:43.000 --> 04:52.000] So this is data that is supposed to be produced [04:52.000 --> 04:54.000] around 8 in the morning. [04:54.000 --> 04:56.000] Maybe there are network latencies and so on, [04:56.000 --> 04:59.000] but more or less you get it around 8 in the morning [04:59.000 --> 05:00.000] in your system. [05:00.000 --> 05:03.000] So this is the time where you are seeing the data in your system. [05:03.000 --> 05:08.000] But for whatever reason, you may also get very late data. [05:08.000 --> 05:10.000] And depending on what you want to do, [05:10.000 --> 05:14.000] you may want to process this data as it was produced, [05:14.000 --> 05:16.000] as you are receiving it. [05:16.000 --> 05:18.000] So this is actually the problem with micro-batching. [05:18.000 --> 05:21.000] So I remember when I started hearing about streaming [05:21.000 --> 05:24.000] in the first days, many years ago, [05:24.000 --> 05:27.000] a lot of people said, oh, Spark, I don't like it [05:27.000 --> 05:29.000] because it does micro-batching. [05:29.000 --> 05:31.000] It doesn't do real streaming. [05:32.000 --> 05:33.000] I had no clue what they meant. [05:33.000 --> 05:36.000] It's like, well, you have to group things to process it somehow. [05:36.000 --> 05:39.000] So if the data is infinite, you will need to process it. [05:39.000 --> 05:41.000] The problem with micro-batching, which is not happening [05:41.000 --> 05:44.000] in Spark anymore, so this was really ancient times, [05:44.000 --> 05:46.000] the problem with micro-batching is that you are doing the batches [05:46.000 --> 05:48.000] as you see the data. [05:48.000 --> 05:52.000] And then you may have the data that belongs together [05:52.000 --> 05:54.000] in buckets that are separate. [05:54.000 --> 05:56.000] Like, for instance, this was data that was produced [05:56.000 --> 05:57.000] at 8 in the morning. [05:57.000 --> 05:59.000] If you are doing buckets of one hour, [05:59.000 --> 06:02.000] so well, you may capture here this message, [06:02.000 --> 06:04.000] but then if you have late data, you will capture it [06:04.000 --> 06:07.000] in a bucket that doesn't belong, [06:07.000 --> 06:10.000] that element doesn't belong with the rest of the elements there [06:10.000 --> 06:12.000] if you want to process them together. [06:12.000 --> 06:17.000] So you need to solve this problem of lack of order in streaming. [06:17.000 --> 06:21.000] And this is what you can easily solve with Apache Bin. [06:21.000 --> 06:23.000] Let's talk about the watermark. [06:23.000 --> 06:26.000] There are many ways of doing stream processing. [06:26.000 --> 06:32.000] One of the most popular is using a concept of watermark. [06:32.000 --> 06:34.000] There is no one dimension of time. [06:34.000 --> 06:36.000] There are two dimensions of times. [06:36.000 --> 06:38.000] At least that is the event time, [06:38.000 --> 06:41.000] the moment in which the data was produced, [06:41.000 --> 06:43.000] and that is the processing time, [06:43.000 --> 06:45.000] the moment in which you see the data. [06:45.000 --> 06:47.000] They will never be the same. [06:47.000 --> 06:49.000] They can be really close sometimes, [06:49.000 --> 06:52.000] but you cannot grant it how close or how far [06:52.000 --> 06:54.000] you are going to be from that moment. [06:55.000 --> 06:58.000] So we put time in two dimensions. [06:58.000 --> 07:00.000] So the ideal is, for instance, [07:00.000 --> 07:03.000] like this straight line in blue, for sure, [07:03.000 --> 07:05.000] this zone is impossible. [07:05.000 --> 07:07.000] You cannot see data before it's produced, [07:07.000 --> 07:11.000] or not yet at least according to the laws of physics. [07:11.000 --> 07:14.000] But then most likely what will happen [07:14.000 --> 07:16.000] is that you will have some delay. [07:16.000 --> 07:18.000] Sometimes it will be closer to the ideal, [07:18.000 --> 07:21.000] sometimes it will be farther from the ideal. [07:22.000 --> 07:26.000] And you need to take this into account. [07:26.000 --> 07:28.000] So let's see an example [07:28.000 --> 07:30.000] that might be a little bit more telling. [07:30.000 --> 07:31.000] So Star Wars. [07:31.000 --> 07:34.000] So have you ever watched a Star Wars movie? [07:34.000 --> 07:38.000] So Star Wars were released out of order, out of order. [07:38.000 --> 07:41.000] So the first movie was Episode 4. [07:41.000 --> 07:42.000] This is purely streaming. [07:42.000 --> 07:44.000] This is what happens in streaming. [07:44.000 --> 07:46.000] You are expecting events at the beginning of the session, [07:46.000 --> 07:49.000] in the middle of the session, the end of the session, [07:49.000 --> 07:51.000] and then you get end of the session, [07:51.000 --> 07:53.000] middle of the session, beginning of the session. [07:53.000 --> 07:56.000] And you need to reorder things. [07:56.000 --> 07:58.000] Depending on what you want to do, [07:58.000 --> 08:00.000] you may need to reorder. [08:00.000 --> 08:02.000] If you don't care, look, I don't want to, [08:02.000 --> 08:03.000] I don't care. [08:03.000 --> 08:05.000] I just want to count how many movies per year were released. [08:05.000 --> 08:07.000] Well, you don't need the event time. [08:07.000 --> 08:10.000] But if you want to reconstruct the story [08:10.000 --> 08:14.000] who did what before or after, [08:14.000 --> 08:17.000] what happened, so you need to actually [08:17.000 --> 08:19.000] be able to reconstruct that time. [08:19.000 --> 08:22.000] So the time where the movies were released, [08:22.000 --> 08:24.000] its processing time, event time is [08:24.000 --> 08:27.000] the time in which actually the events are happening. [08:27.000 --> 08:29.000] And this is the kind of problems that we can solve [08:29.000 --> 08:32.000] using Apache VIN or Flink or many other streaming systems. [08:32.000 --> 08:35.000] Let's see how we can deal with this. [08:35.000 --> 08:38.000] The classical way is using windowing. [08:38.000 --> 08:42.000] Windowing is grouping things based on temporal properties. [08:42.000 --> 08:44.000] When we do a data pipeline, [08:44.000 --> 08:46.000] we need to solve one question, [08:46.000 --> 08:50.000] which is what we are going to compute. [08:50.000 --> 08:52.000] But if you want to do this in streaming [08:52.000 --> 08:54.000] and group things based on temporal properties, [08:54.000 --> 08:57.000] you need to answer three additional questions. [08:57.000 --> 08:59.000] Where, when, and how. [08:59.000 --> 09:01.000] Let's see some details. [09:01.000 --> 09:02.000] What? This is easy. [09:02.000 --> 09:03.000] We are going to be aggregating. [09:03.000 --> 09:05.000] So this is Java code, [09:05.000 --> 09:07.000] and it's in Java here as an example. [09:07.000 --> 09:09.000] So it's Apache VIN API. [09:09.000 --> 09:11.000] I haven't entered into details. [09:11.000 --> 09:13.000] There's a link at the end with more details. [09:13.000 --> 09:15.000] So don't mind the details right now. [09:15.000 --> 09:17.000] So we are aggregating things together. [09:17.000 --> 09:19.000] So this is what we are happening. [09:19.000 --> 09:21.000] We are not doing any kind of temporal based logic yet. [09:21.000 --> 09:23.000] We are just aggregating stuff together. [09:23.000 --> 09:25.000] So we are summing up all the numbers here. [09:25.000 --> 09:27.000] So this is the operation that we are doing. [09:27.000 --> 09:28.000] Same as in batch. [09:28.000 --> 09:29.000] This is in batch. [09:29.000 --> 09:31.000] So imagine that we are getting this batch. [09:31.000 --> 09:34.000] The problem is that when we are working in streaming, [09:34.000 --> 09:36.000] we don't see the full data at once. [09:36.000 --> 09:39.000] And we need to produce output at some point [09:39.000 --> 09:42.000] so we cannot wait forever. [09:42.000 --> 09:44.000] So we need to decide how to group things together. [09:44.000 --> 09:45.000] So for instance here, [09:45.000 --> 09:50.000] we are going to group things in windows of two minutes. [09:50.000 --> 09:54.000] But the windows of two minutes are not in processing time. [09:54.000 --> 09:56.000] They are in event time. [09:56.000 --> 10:00.000] For instance, so here, this message here, [10:00.000 --> 10:02.000] so this message here, [10:02.000 --> 10:04.000] we see it around 12, [10:04.000 --> 10:07.000] and we put it in the window of 12. [10:07.000 --> 10:09.000] But this message over here, [10:09.000 --> 10:14.000] so this was received between 12.08 and 12.09. [10:14.000 --> 10:16.000] And we are able still to attribute it [10:16.000 --> 10:19.000] to assign it to the window between 12 and 12.02 [10:19.000 --> 10:21.000] in event time. [10:21.000 --> 10:23.000] Because, well, so we can wait for late data [10:23.000 --> 10:25.000] and put it in the right window, [10:25.000 --> 10:27.000] despite the message being quite late [10:27.000 --> 10:29.000] compared to the processing time. [10:29.000 --> 10:31.000] And same with the rest of windows. [10:31.000 --> 10:33.000] Now the question is, okay, good. [10:33.000 --> 10:36.000] So you are waiting until 12.08. [10:36.000 --> 10:40.000] So what if your message shows up at 8 p.m.? [10:40.000 --> 10:41.000] What do you do? [10:41.000 --> 10:43.000] Like eight hours after. [10:43.000 --> 10:45.000] So we need to do another decision, okay? [10:45.000 --> 10:47.000] So we have already made the decision [10:47.000 --> 10:49.000] on how we are going to group things together. [10:49.000 --> 10:50.000] Here is with easy windows. [10:50.000 --> 10:52.000] There are more windows in Apache bin, [10:52.000 --> 10:53.000] not entering into details right now. [10:53.000 --> 10:59.000] But now we need to decide how long do we wait, okay? [10:59.000 --> 11:02.000] So we are going to wait until the watermark. [11:02.000 --> 11:05.000] Okay, the watermark is this relationship [11:05.000 --> 11:07.000] between processing time and event time [11:07.000 --> 11:10.000] that in the case of bin and depending on the runner. [11:10.000 --> 11:12.000] It's calculated and estimated on the fly [11:12.000 --> 11:16.000] as data goes through our pipeline. [11:16.000 --> 11:19.000] And it's this curve is estimated. [11:19.000 --> 11:22.000] And when you trespass the watermark, [11:22.000 --> 11:25.000] you have a certain degree of warranty [11:25.000 --> 11:27.000] that your data is complete, okay? [11:27.000 --> 11:28.000] A certain degree of warranty, okay? [11:28.000 --> 11:29.000] It cannot be granted because, well, [11:29.000 --> 11:31.000] so the future cannot be known, okay? [11:31.000 --> 11:33.000] So we cannot travel in time, okay? [11:33.000 --> 11:36.000] So here, for instance, so we are processing data [11:36.000 --> 11:39.000] in the watermark and the nine, this number here [11:39.000 --> 11:42.000] that we were processing before, [11:42.000 --> 11:44.000] now it's left out of the window. [11:44.000 --> 11:47.000] So what does it mean if we are processing data? [11:47.000 --> 11:49.000] We were summing up numbers, that number, [11:49.000 --> 11:53.000] that nine, we are not counting it. [11:53.000 --> 11:56.000] As soon as we see it in our pipeline, [11:56.000 --> 11:59.000] it will be dropped, like lost, okay? [11:59.000 --> 12:02.000] So the pipeline will ignore it, okay? [12:02.000 --> 12:05.000] And it may make sense, okay? [12:05.000 --> 12:07.000] So you cannot wait forever. [12:07.000 --> 12:11.000] At some point, you will have to stop and move on, okay? [12:11.000 --> 12:13.000] But maybe you want to take it into account, okay? [12:13.000 --> 12:16.000] So maybe you, I don't know, like this is a billing, [12:16.000 --> 12:19.000] invoicing thing and every penny counts, okay? [12:19.000 --> 12:21.000] So then you need to process it. [12:21.000 --> 12:24.000] Well, you have to take yet another decision. [12:24.000 --> 12:26.000] How we are gonna wait for late data [12:26.000 --> 12:30.000] and how we are gonna actually update the data, okay? [12:30.000 --> 12:32.000] Here, I'm summing numbers. [12:32.000 --> 12:34.000] It's easy, commutative, associative, [12:34.000 --> 12:35.000] really no big deal, okay? [12:35.000 --> 12:39.000] So I can do it like, say, I can do it, [12:39.000 --> 12:42.000] I can do it like a monoid in big data processing, [12:42.000 --> 12:44.000] so I can just take the aggregation, [12:44.000 --> 12:46.000] the previous aggregation and keep aggregating. [12:46.000 --> 12:48.000] I don't need to keep all the numbers [12:48.000 --> 12:50.000] that I have seen so far, so it is easy. [12:50.000 --> 12:53.000] In other cases, for any non-associative, [12:53.000 --> 12:55.000] non-commutative operation, so you may need [12:55.000 --> 12:59.000] to have actually full data to produce an update, okay? [12:59.000 --> 13:01.000] And if you are working in streaming, [13:01.000 --> 13:04.000] maybe you don't want to accumulate all the data, okay? [13:04.000 --> 13:06.000] Because that will increase the amount of resources [13:06.000 --> 13:07.000] that you will need for your pipeline. [13:07.000 --> 13:10.000] It will have impact in performance, latency and so on. [13:10.000 --> 13:13.000] So here, we are accumulating because the operation allows it [13:13.000 --> 13:16.000] and we are actually waiting for late data, okay? [13:16.000 --> 13:19.000] So now, we are waiting for late data, [13:19.000 --> 13:20.000] but we don't want to wait forever. [13:20.000 --> 13:22.000] We want to have some numbers, okay? [13:22.000 --> 13:26.000] So we are actually producing several outputs per window, okay? [13:26.000 --> 13:28.000] So like for instance here, continuing with the first, [13:28.000 --> 13:30.000] so when the watermark is trespassed, [13:30.000 --> 13:32.000] we produce an output, okay? [13:32.000 --> 13:34.000] And then when we see the new number, [13:34.000 --> 13:35.000] so we produce the output. [13:35.000 --> 13:37.000] We produce it really late, okay? [13:37.000 --> 13:40.000] But well, so we cannot make magic, okay? [13:40.000 --> 13:42.000] So this is when we see the data, [13:42.000 --> 13:45.000] so we cannot process it earlier than this, okay? [13:45.000 --> 13:48.000] We may actually decide to produce data, some output, [13:48.000 --> 13:50.000] even before the watermark, [13:50.000 --> 13:53.000] because the watermark can be really slow. [13:53.000 --> 13:55.000] It depends on the pace of the updates of the data. [13:55.000 --> 13:56.000] If for whatever reason, [13:56.000 --> 14:00.000] users are sending your data with a lot of lateness, [14:00.000 --> 14:03.000] the watermark can progress really slowly, okay? [14:03.000 --> 14:06.000] And so the watermark, how you produce output is always [14:06.000 --> 14:08.000] a trade-off in the streaming between completeness [14:08.000 --> 14:09.000] and latency. [14:09.000 --> 14:11.000] You need to make a decision, okay? [14:11.000 --> 14:15.000] So here, we put an early trigger. [14:15.000 --> 14:19.000] So we're producing output soon, low latency, [14:19.000 --> 14:21.000] but it's incomplete, because well, [14:21.000 --> 14:23.000] so later on we're gonna keep seeing numbers [14:23.000 --> 14:25.000] until the watermark. [14:25.000 --> 14:26.000] Good. [14:26.000 --> 14:30.000] So basically, this is streaming in Apache Bin in 10 minutes. [14:30.000 --> 14:34.000] This is a lot of information, explained very quickly. [14:34.000 --> 14:36.000] If you want to get deeper, [14:36.000 --> 14:38.000] if you want to get deeper, [14:38.000 --> 14:40.000] there's this example here, okay? [14:40.000 --> 14:42.000] So in Java and in Python, [14:42.000 --> 14:44.000] so it's available in the two languages, [14:44.000 --> 14:47.000] and you can see everything that we have seen [14:47.000 --> 14:50.000] in the previous slides with all details, okay? [14:50.000 --> 14:52.000] And you may run this locally if you want, [14:52.000 --> 14:56.000] so you don't have to have like an environment, [14:56.000 --> 14:58.000] so like a cloud environment, a cluster, [14:58.000 --> 15:00.000] or a stream process or anything like that, [15:00.000 --> 15:06.000] so it may run locally with some synthetic data, [15:06.000 --> 15:09.000] made-up data, okay? [15:09.000 --> 15:12.000] Now, this is the classic way of doing streaming in Apache Bin. [15:12.000 --> 15:15.000] This has been around for years already, okay? [15:15.000 --> 15:18.000] So this is the same model that is implemented in Spark, [15:18.000 --> 15:20.000] it's the same model that is implemented in Flink, [15:20.000 --> 15:23.000] so they are all kind of similar. [15:23.000 --> 15:27.000] There are other things that you can also do in Apache Bin [15:27.000 --> 15:31.000] in streaming, like anything that you can do in Apache Bin, [15:31.000 --> 15:33.000] you can also do it in streaming, [15:33.000 --> 15:36.000] and I'm gonna highlight here a couple of those, okay? [15:36.000 --> 15:38.000] I'm leaving out a lot of stuff, [15:38.000 --> 15:41.000] because, well, so time is limited, [15:41.000 --> 15:44.000] and leave it out for instance SQL, [15:44.000 --> 15:47.000] so that was a great talk by Timo focusing on SQL, [15:47.000 --> 15:49.000] so you can also do SQL in Apache Bin [15:49.000 --> 15:51.000] if you want in streaming, okay? [15:51.000 --> 15:54.000] So similar examples to what Timo did, [15:54.000 --> 15:56.000] and you can actually run that on Flink if you want, okay? [15:56.000 --> 15:59.000] So it may make sense if you, well, I don't know, [15:59.000 --> 16:02.000] at some point you want to move away from Flink to Dataflow, [16:02.000 --> 16:04.000] you want to move away from Dataflow to Spark, [16:04.000 --> 16:07.000] so in order to have this portability. [16:07.000 --> 16:10.000] One thing that you can do in streaming is stateful functions, [16:10.000 --> 16:13.000] and stateful functions are very interesting [16:13.000 --> 16:17.000] for windowing between quotes that doesn't depend on time. [16:17.000 --> 16:19.000] Very typically, I work with customers, [16:19.000 --> 16:21.000] like all these windowing trigger things, [16:21.000 --> 16:23.000] it's super interesting, but look, [16:23.000 --> 16:26.000] whenever I see a message of this type, [16:26.000 --> 16:30.000] I want to have all the messages that I have seen so far [16:30.000 --> 16:32.000] in a group and do these calculations, [16:32.000 --> 16:34.000] and I don't care about time, okay? [16:34.000 --> 16:36.000] I don't care about grouping things in time. [16:36.000 --> 16:39.000] I want to group things by some logic, okay? [16:39.000 --> 16:42.000] I'm gonna give you a predicate, you pass a message, [16:42.000 --> 16:44.000] if the message fulfills a condition, [16:44.000 --> 16:47.000] I want to close the previous window and start a new one. [16:47.000 --> 16:49.000] How can you do that in Apache bin? [16:49.000 --> 16:52.000] You can do that with stateful functions, okay? [16:52.000 --> 16:55.000] Stateful functions, so here we have some input, [16:55.000 --> 16:58.000] here we have a map, it's called a part doing Apache bin, [16:58.000 --> 17:00.000] and we do some transformation, [17:00.000 --> 17:02.000] and we want to accumulate a state here, okay? [17:02.000 --> 17:05.000] So depending on what we see at some point, [17:05.000 --> 17:09.000] we do something else, and this is mutable state, okay? [17:09.000 --> 17:12.000] In a system like Dataflow, like Flink, [17:12.000 --> 17:16.000] like all the systems where Apache bin runs, [17:16.000 --> 17:21.000] having state, mutable state in a streaming [17:21.000 --> 17:24.000] that is computed in a consistent way [17:24.000 --> 17:26.000] is extremely difficult, okay? [17:26.000 --> 17:30.000] One way to shoot yourself in your feet [17:30.000 --> 17:33.000] with systems like this in streaming [17:33.000 --> 17:35.000] is trying to keep accumulating state [17:35.000 --> 17:38.000] using some kind of external system, okay? [17:38.000 --> 17:42.000] Because runners will have... [17:42.000 --> 17:44.000] sometimes will have issues that will be errors, [17:44.000 --> 17:48.000] that will be retries, infrastructure will die, [17:48.000 --> 17:50.000] you will have auto-scaling. [17:50.000 --> 17:52.000] There are all kinds of situations [17:52.000 --> 17:56.000] that the runner may want to retract the computation [17:56.000 --> 17:58.000] and recompute again, okay? [17:58.000 --> 18:00.000] And then in these kinds of situations, [18:00.000 --> 18:02.000] having any kind of external system for mutable state, [18:02.000 --> 18:03.000] it's complex, okay? [18:03.000 --> 18:05.000] It's doable, okay? [18:05.000 --> 18:07.000] You may have, and you will have with Apache bin [18:07.000 --> 18:09.000] in any kind of the runners that you can run, [18:09.000 --> 18:12.000] you will have this end-to-end exactly once processing, [18:12.000 --> 18:14.000] but this end-to-end exactly once processing [18:14.000 --> 18:16.000] doesn't mean that your code is going to be executed [18:16.000 --> 18:18.000] exactly once. [18:18.000 --> 18:20.000] It may be executed more than once, okay? [18:20.000 --> 18:22.000] This is what makes maintaining [18:22.000 --> 18:24.000] external state to a pipeline complex. [18:24.000 --> 18:27.000] But if the state is internal to the pipeline, [18:27.000 --> 18:31.000] then, well, so the system itself can, let's say, [18:31.000 --> 18:33.000] take care of the problems of reprocessing [18:33.000 --> 18:36.000] and maintain a mutable state in a consistent way. [18:36.000 --> 18:40.000] So this is where it's a stateful function in Apache bin, [18:40.000 --> 18:44.000] and you can use it for use cases like this, okay? [18:44.000 --> 18:49.000] For instance, say that I want to produce windows [18:49.000 --> 18:52.000] between quotes based on some kind of property. [18:52.000 --> 18:56.000] So I keep seeing messages, okay, that I keep processing, okay? [18:56.000 --> 19:00.000] And then I keep accumulating the messages in some state, okay? [19:00.000 --> 19:03.000] I maintain a buffer, like I keep every single message [19:03.000 --> 19:05.000] that I see and I count, okay? [19:05.000 --> 19:07.000] Because, well, the buffer cannot, [19:07.000 --> 19:09.000] so the buffer must have some boundaries, okay? [19:09.000 --> 19:12.000] So because this is local state that is maintaining the machine [19:12.000 --> 19:15.000] in the worker, in the executor where you are running, [19:15.000 --> 19:17.000] and the executor will have limited resources. [19:17.000 --> 19:21.000] It might be very large resources, but limited anyways, okay? [19:21.000 --> 19:23.000] So you keep accumulating, [19:23.000 --> 19:26.000] and then you keep processing here, for instance. [19:26.000 --> 19:28.000] So typically, you can use this, for instance, [19:28.000 --> 19:30.000] batching to call in an external service, [19:30.000 --> 19:32.000] but you can also do here, [19:32.000 --> 19:34.000] whenever I see a specific type of message, [19:34.000 --> 19:36.000] I emit some output, okay? [19:36.000 --> 19:39.000] I emit some output, and then I have applied a window. [19:39.000 --> 19:41.000] All the messages that I have in the buffer, [19:41.000 --> 19:44.000] I tag a new session ID, a new window ID, [19:44.000 --> 19:46.000] and then I emit them. [19:46.000 --> 19:48.000] I hold them for a while until I see the right message [19:48.000 --> 19:50.000] that I need, and then I emit them. [19:50.000 --> 19:53.000] There are two problems here. [19:53.000 --> 19:57.000] We want to, so customers always think that streaming is complex, [19:57.000 --> 20:02.000] and they want to get away of all the temporal-based [20:02.000 --> 20:04.000] calculations, okay? [20:04.000 --> 20:06.000] It's so complex, so messy. [20:06.000 --> 20:09.000] Look, my algorithm is really much simpler, but it is not. [20:09.000 --> 20:13.000] So you are in the streaming, so you cannot ignore time, okay? [20:13.000 --> 20:15.000] You have situations where you will see the messages [20:15.000 --> 20:17.000] out of order, and you will see, [20:17.000 --> 20:19.000] you will have situations where you will not see [20:19.000 --> 20:21.000] the messages for a while, okay? [20:21.000 --> 20:23.000] And then you need to decide what to do in these two cases, [20:23.000 --> 20:25.000] even if you don't want to, okay? [20:25.000 --> 20:27.000] What happens when I see out of order? [20:27.000 --> 20:29.000] You may say, I don't care, unlikely, [20:29.000 --> 20:32.000] but well, in some situations, it might be true, okay? [20:32.000 --> 20:38.000] Or you may have to wait, like, some timer in order to give room [20:38.000 --> 20:41.000] for late data to arrive into your code [20:41.000 --> 20:44.000] and actually produce the actual output, okay? [20:44.000 --> 20:49.000] So this would be an event-time timer, okay? [20:49.000 --> 20:53.000] Look, in event-time, you are going to see the messages [20:53.000 --> 20:54.000] in order, okay? [20:54.000 --> 20:57.000] So wait 30 seconds, two minutes, and so on. [20:57.000 --> 20:59.000] And then when you have seen all the messages, [20:59.000 --> 21:01.000] it's the moment in which you apply the session. [21:01.000 --> 21:03.000] That's called an event-time timer. [21:03.000 --> 21:06.000] And you may have also problems of staleness, okay? [21:06.000 --> 21:08.000] I'm waiting for the end of my session, [21:08.000 --> 21:09.000] but I've not seen messages. [21:09.000 --> 21:12.000] I haven't seen messages in the last five minutes. [21:12.000 --> 21:14.000] In processing time, okay? [21:14.000 --> 21:16.000] The problem with event-time is that it depends [21:16.000 --> 21:17.000] on the progress of the watermark. [21:17.000 --> 21:19.000] But if you stop seeing messages, [21:19.000 --> 21:21.000] the watermark will stop advancing. [21:21.000 --> 21:26.000] The watermark is always estimated in being runners [21:26.000 --> 21:29.000] or normally estimated as the time stamp [21:29.000 --> 21:32.000] of the oldest message waiting to be processed, okay? [21:32.000 --> 21:35.000] So literally you may stop your pipeline waiting forever [21:35.000 --> 21:38.000] for some data that maybe it will never arrive, okay? [21:38.000 --> 21:41.000] So processing time-time will stop this problem, okay? [21:41.000 --> 21:44.000] After 10 minutes, like, measure with a clock. [21:44.000 --> 21:46.000] If nothing comes, I don't care about the watermark. [21:46.000 --> 21:47.000] I don't care. [21:47.000 --> 21:49.000] Keep going, okay? [21:49.000 --> 21:51.000] So data has been lost for whatever reason, [21:51.000 --> 21:53.000] and we cannot wait forever. [21:53.000 --> 21:55.000] So this is a stateful function, [21:55.000 --> 21:57.000] and it's also very useful in streaming [21:57.000 --> 22:00.000] because it allows you to apply logic that goes beyond [22:00.000 --> 22:02.000] the temporal properties that we have seen [22:02.000 --> 22:04.000] in the previous slides. [22:04.000 --> 22:07.000] And here you have some examples and links. [22:07.000 --> 22:10.000] The slides are already available through the first-time website, [22:10.000 --> 22:14.000] so I encourage you to have a look at these examples. [22:14.000 --> 22:16.000] What else can I do in streaming? [22:16.000 --> 22:18.000] Machine learning inference, okay? [22:18.000 --> 22:22.000] So there are many ways to do machine learning inference [22:22.000 --> 22:25.000] in streaming at a scale, okay? [22:25.000 --> 22:27.000] Many of those quite expensive. [22:27.000 --> 22:30.000] So you can deploy endpoints in cloud platforms, [22:30.000 --> 22:32.000] with GPUs, with a lot of stuff, okay? [22:32.000 --> 22:36.000] And normally, so, well, so those are... [22:36.000 --> 22:38.000] those solve a lot of functionality for you, [22:38.000 --> 22:40.000] but they are expensive. [22:40.000 --> 22:43.000] So what if you want to apply machine learning inference [22:43.000 --> 22:45.000] in a pipeline, in Apache Bin? [22:45.000 --> 22:47.000] Well, you could do that, okay? [22:47.000 --> 22:49.000] You could be thinking, well, I can do that. [22:49.000 --> 22:51.000] So I can, I don't know, like, import TensorFlow, [22:51.000 --> 22:53.000] load the model, apply it, [22:53.000 --> 22:55.000] so you could do a lot of stuff, okay, yourself. [22:55.000 --> 22:58.000] But this is already solved for you in Apache Bin, okay? [22:58.000 --> 23:00.000] So you can run machine learning inference [23:00.000 --> 23:04.000] with the so-called run inference... [23:04.000 --> 23:06.000] run inference transform, okay? [23:06.000 --> 23:08.000] So we see it here. [23:08.000 --> 23:10.000] So right now, it has, let's say, [23:10.000 --> 23:12.000] out-of-the-box support for PyTorch, TensorFlow, [23:12.000 --> 23:15.000] as I can learn, with more coming. [23:15.000 --> 23:17.000] When you're running a distributed system [23:17.000 --> 23:19.000] and you want to apply a model, [23:19.000 --> 23:21.000] each one of the workers in the distributed system [23:21.000 --> 23:23.000] will have its own memory. [23:23.000 --> 23:27.000] So Apache Bin runs on top of share-nothing architecture, [23:27.000 --> 23:29.000] okay, like fling, dataflow, spark. [23:29.000 --> 23:31.000] Workers are independent of each other. [23:31.000 --> 23:33.000] They don't share any common state. [23:33.000 --> 23:35.000] The state that we have seen before [23:35.000 --> 23:39.000] is actually maintained per key and per window [23:39.000 --> 23:41.000] if we apply the per window. [23:41.000 --> 23:42.000] It's totally local for the worker, [23:42.000 --> 23:44.000] and two workers cannot share a state. [23:44.000 --> 23:47.000] But the model, we don't want to instantiate a model [23:47.000 --> 23:50.000] if we have 100 workers 100 times [23:50.000 --> 23:52.000] because the model is going to be the same [23:52.000 --> 23:54.000] for every worker, right? [23:54.000 --> 23:55.000] So the model hasn't changed. [23:55.000 --> 23:57.000] The model is actually read-only. [23:57.000 --> 23:59.000] Run inference solves these problems [23:59.000 --> 24:01.000] by having some state that is shared [24:01.000 --> 24:03.000] across all the workers and it's transparent for you. [24:03.000 --> 24:05.000] So this is something that you can always implement [24:05.000 --> 24:07.000] in a distributed system, but it's complex. [24:07.000 --> 24:09.000] This is the problem that is solved with run inference. [24:09.000 --> 24:12.000] It's only one copy of the model [24:13.000 --> 24:15.000] per, let's say, machine where you're running, okay? [24:15.000 --> 24:17.000] So in memory, okay? [24:17.000 --> 24:19.000] Because, well, you need always to make an instance [24:19.000 --> 24:21.000] in memory to be able to apply it. [24:21.000 --> 24:23.000] But if you have, I don't know, like 100 threads, [24:23.000 --> 24:28.000] 100 sub-workers, 100 CPUs inside the same machine, [24:28.000 --> 24:32.000] you will not have 100 copies of the model. [24:32.000 --> 24:34.000] Regardless of, let's say, what's the computation model [24:34.000 --> 24:36.000] of how the runner is implemented on top of the machines. [24:36.000 --> 24:38.000] That will be only one copy in the memory of the machine. [24:38.000 --> 24:41.000] So this is the problem that is solved with run inference, okay? [24:42.000 --> 24:46.000] So if you want to apply a streaming inference, [24:46.000 --> 24:49.000] it's a very convenient way of doing this [24:49.000 --> 24:51.000] with very little code. [24:51.000 --> 24:54.000] And depending on the runner, this is a possibility [24:54.000 --> 24:56.000] in data flow, this is also a possibility [24:56.000 --> 24:58.000] if you are running on top of Kubernetes [24:58.000 --> 24:59.000] in a runner that supports Kubernetes, [24:59.000 --> 25:00.000] like, for instance, Flink. [25:00.000 --> 25:04.000] You can do also hinting of the resources [25:04.000 --> 25:06.000] that your transformation is gonna need, okay? [25:06.000 --> 25:08.000] Hinting of transformations could be, [25:08.000 --> 25:09.000] look, this transformation is gonna need [25:10.000 --> 25:12.000] this amount of memory, minimal, okay? [25:12.000 --> 25:15.000] But hinting could also be, look, [25:15.000 --> 25:17.000] this is a step that is running ML inference. [25:17.000 --> 25:20.000] So use a GPU for this step, okay? [25:20.000 --> 25:23.000] And then the runner will take care of making sure [25:23.000 --> 25:26.000] that the step that is running a virtual machine, [25:26.000 --> 25:28.000] let's say, matches the infrastructure hints [25:28.000 --> 25:30.000] that you provide through the code, [25:30.000 --> 25:32.000] and you will have, let's say, different types of nodes [25:32.000 --> 25:34.000] for different types of transformations. [25:34.000 --> 25:36.000] One of the problems of shared nothing architecture [25:36.000 --> 25:38.000] is that all the workers are alike. [25:38.000 --> 25:40.000] All the workers are the same. [25:40.000 --> 25:42.000] With this, you can have different types of workers [25:42.000 --> 25:44.000] for different kinds of transformations, [25:44.000 --> 25:49.000] which, let's say, in terms of cost, it's better. [25:49.000 --> 25:51.000] I don't have to say optimal, [25:51.000 --> 25:53.000] so maybe that's a better alternative. [25:53.000 --> 25:55.000] But basically, you use GPUs in the workers, [25:55.000 --> 25:57.000] where you need to use GPUs, you don't use them [25:57.000 --> 25:59.000] in the workers where you don't need to use them. [25:59.000 --> 26:02.000] And you don't have to worry about assigning work [26:02.000 --> 26:04.000] to different workers, that's actually done [26:04.000 --> 26:07.000] by the runner, automatically, with these hints, okay? [26:07.000 --> 26:11.000] If you want to know more, here you have some links. [26:11.000 --> 26:13.000] So I have only five minutes left, [26:13.000 --> 26:17.000] so I'm leaving the best for the end of the presentation. [26:17.000 --> 26:21.000] Great, look, Israel, you showed San Java at the beginning, [26:21.000 --> 26:23.000] now you tell me ML inference is so cool, [26:23.000 --> 26:25.000] but it's Python, right? [26:25.000 --> 26:27.000] So it's PyTorch, TensorFlow, [26:27.000 --> 26:30.000] Scikit-learn, it's all Python. [26:30.000 --> 26:32.000] One of the things that you can do in ApacheVinus [26:33.000 --> 26:35.000] is in cross-language transforms. [26:35.000 --> 26:39.000] Anything that you have available in any of the SDKs, [26:39.000 --> 26:42.000] in any language, you may use it in any other language, [26:42.000 --> 26:45.000] as long as the runner supports this, okay? [26:45.000 --> 26:47.000] So it's not supported by all the runners, [26:47.000 --> 26:49.000] but it's supported by the main runners, okay? [26:49.000 --> 26:54.000] So basically, run inference may be used in Java. [26:54.000 --> 26:58.000] If you want to use any transformation from any language, [26:58.000 --> 27:00.000] you have to add some boilerplate code, [27:00.000 --> 27:02.000] not so much, but a little bit. [27:02.000 --> 27:04.000] This is already done, let's say, for the main transforms [27:04.000 --> 27:06.000] that are most popular, all that they say, [27:06.000 --> 27:10.000] that make more sense to be used in different languages, [27:10.000 --> 27:13.000] like a map, well, using a map from Java in Python [27:13.000 --> 27:16.000] doesn't really make sense, okay? [27:16.000 --> 27:18.000] Using run inference makes. [27:18.000 --> 27:22.000] Using connectors, input-output connectors from another SDK [27:22.000 --> 27:24.000] in Python, for instance, makes sense, [27:24.000 --> 27:28.000] because the amount of input-output connectors [27:28.000 --> 27:31.000] that you have per SDK is not the same. [27:31.000 --> 27:33.000] So you may write, I don't know, like, [27:33.000 --> 27:39.000] to databases in Java and to message queues in Python, [27:39.000 --> 27:41.000] but maybe you don't have the same functionalities [27:41.000 --> 27:44.000] in all the SDKs, you can use any connector from any SDK, [27:44.000 --> 27:47.000] and this makes it quite flexible, okay? [27:47.000 --> 27:50.000] So these are the so-called multi-language pipelines, [27:50.000 --> 27:53.000] and basically, it means that you can run any transformation [27:53.000 --> 27:57.000] in any SDK, and this is implemented because [27:57.000 --> 28:00.000] the runner environment is containerized, okay? [28:00.000 --> 28:02.000] So there's a container per language, [28:02.000 --> 28:04.000] and there's some magic that makes, let's say, [28:04.000 --> 28:06.000] the container communicate between themselves, okay? [28:06.000 --> 28:09.000] And the serialization and the serialization [28:09.000 --> 28:11.000] between programming languages. [28:11.000 --> 28:14.000] So this is part of the boiler press that you need to take care of. [28:14.000 --> 28:17.000] If you use things like Apache Vida schemas [28:17.000 --> 28:19.000] that I haven't talked about in this talk, [28:19.000 --> 28:21.000] so it will be transparent for you, [28:21.000 --> 28:23.000] anything that you have in one schema in one language, [28:23.000 --> 28:25.000] you will be able to serialize it, [28:25.000 --> 28:27.000] serialize it to any other language. [28:27.000 --> 28:28.000] So if you follow, let's say, [28:28.000 --> 28:30.000] if you follow the Apache VIN custom, [28:30.000 --> 28:34.000] it's quite straightforward to use these kind of things. [28:34.000 --> 28:38.000] Well, thanks everyone so far for your attention. [28:38.000 --> 28:41.000] So here, and almost there are some links [28:41.000 --> 28:43.000] that I recommend you to have a look [28:43.000 --> 28:46.000] if you want to learn more about Apache VIN. [28:46.000 --> 28:50.000] I have covered a lot of stuff in very short time, okay? [28:50.000 --> 28:52.000] So there's a lot of things behind everything [28:52.000 --> 28:54.000] that I have explained here. [28:54.000 --> 28:57.000] If you want to know more about all the window in streaming, [28:57.000 --> 29:01.000] triggers, watermarks and so on, [29:01.000 --> 29:03.000] I strongly recommend you this book. [29:03.000 --> 29:05.000] It was released sometime ago. [29:05.000 --> 29:07.000] You may think that it's outdated, it's not outdated, [29:07.000 --> 29:09.000] so let's say this is the same model that is applied [29:09.000 --> 29:11.000] in many different streaming systems, [29:11.000 --> 29:14.000] and this is not a book about Apache VIN, [29:14.000 --> 29:15.000] it's a book about streaming systems [29:15.000 --> 29:18.000] with lots of examples coming from Apache VIN, [29:18.000 --> 29:21.000] but also examples coming from Flink, Kafka, [29:21.000 --> 29:23.000] PAPSA, and many other systems. [29:23.000 --> 29:24.000] Actually, it's very interesting. [29:24.000 --> 29:28.000] It's my favorite book, one of my favorite books, [29:28.000 --> 29:30.000] and the other one being the book actually [29:30.000 --> 29:34.000] from Martin Kledman about data intensive applications. [29:34.000 --> 29:37.000] And if you want to know more about VIN, [29:37.000 --> 29:39.000] so I recommend you the VIN College. [29:39.000 --> 29:42.000] There are lots of videos with lots of details [29:42.000 --> 29:46.000] about the things that I have explained here in YouTube. [29:46.000 --> 29:49.000] Some of them are actually linked in the slides. [29:49.000 --> 29:51.000] For sure, the main site of Apache VIN [29:51.000 --> 29:53.000] guide and all the documentation that is there. [29:53.000 --> 29:57.000] And if you want to learn more about Apache VIN, [29:57.000 --> 29:59.000] there is also the videos of the Apache VIN Summit, [29:59.000 --> 30:01.000] the previous editions. [30:01.000 --> 30:03.000] And if you want to participate, if you are here today, [30:03.000 --> 30:05.000] so you may be interested in streaming, [30:05.000 --> 30:08.000] so the call for papers is open until March 20th, I think. [30:08.000 --> 30:12.000] VIN Summit will be in June in New York, [30:12.000 --> 30:15.000] and I encourage you to submit talks. [30:15.000 --> 30:16.000] Well, so this is all. [30:16.000 --> 30:17.000] So thanks all for your attention. [30:17.000 --> 30:19.000] It's time for questions now. [30:19.000 --> 30:21.000] Thank you. [30:43.000 --> 30:46.000] So what's the advantage of using VIN [30:46.000 --> 30:48.000] if you are already using VIN? [30:48.000 --> 30:51.000] So it's portability, mainly. [30:51.000 --> 30:55.000] So if tomorrow you want to move away from Flink forward [30:55.000 --> 30:58.000] for whatever reason, so you should be able to move [30:58.000 --> 31:01.000] to other runners that have the same level of functionality, [31:01.000 --> 31:02.000] like, for instance, Dataflow. [31:02.000 --> 31:03.000] I don't know. [31:03.000 --> 31:06.000] So we have one of the main committers here of Apache Flink [31:06.000 --> 31:09.000] say that he gets hit by a bus. [31:09.000 --> 31:12.000] We don't want that to happen, but that may happen. [31:12.000 --> 31:14.000] Everything may happen. [31:15.000 --> 31:19.000] The world is really very uncertain. [31:19.000 --> 31:21.000] So basically you have portability. [31:21.000 --> 31:22.000] Yes. [31:22.000 --> 31:23.000] Thank you very much. [31:23.000 --> 31:26.000] Unfortunately, we don't have time for more questions right now, [31:26.000 --> 31:28.000] but I'm sure we'll be happy to answer any questions. [31:28.000 --> 31:29.000] Yes, anytime. [31:29.000 --> 31:30.000] Yes, thanks. [31:30.000 --> 31:31.000] Thank you.