Subtitles section Play video Print subtitles YURI: Hi, everyone. My name is Yuri. And today, I'm going to be talking to you about tf.data, which is TensorFlow's input pipeline. As a disclaimer, this presentation assumes familiarity with basic TensorFlow concepts such as ops and kernels. And it contains a lot of code examples that are not necessarily 100% accurate. There may be some details that have been removed because they're either unnecessary or distracting for the purpose of presentation. So with that, let's get started. In this talk, we're going to cover a couple of topics. We're going to peel the two main layers of tf.data's implementation one by one, first focusing on Python view and then on the C++ view of tf.data. And then I'm going to cover three areas of tf.data that might be of interest to the broader audience, support for non-tensor types, and both static and dynamic optimizations in tf.data. So let's get started with the Python view. Throughout the course of the presentation, I'm going to be using the following example, which is a pretty standard example of an input pipeline. What this input pipeline does, it's reading files that are in TFRecord formats-- so this contains records-- then shuffling those records, applying a map transformation that allows you to transform the records and parse them, pre-process them, and finally, batching the pre-processed data so that it's amenable to machine learning computation. And the idiomatic way to iterate through elements of an input pipeline in TF 2.0 is by a simple for loop. And that's because in TF 2.0, data sets are Python iterables. Besides this approach, you can also use the explicit iter or next keywords. Now, as the comment at the bottom mentions, the user-defined function that you can pass into the map transformation can be both graph or non-graph computation where the non-graph computation is enabled by AutoGraph. And I'll talk a little more about that later on. Just to contrast the simplifications that happened in the transition between 1.x and 2.0, let's take a look at what an input pipeline-- or idiomatic iteration of an input pipeline-- in 1.x would look like. And you can see that the definition of the input pipeline that is the top part of the data set remains the same. But the iteration is much more verbose. So hopefully, this kind of illustrates that the simplest way to iterate through a data set has been made much more simple in the 2.0 release of TensorFlow. So let's talk a bit more about what's actually going on when you run the Python program. And what we're going to do is we're going to go through different lines of the Python program and talk about what actually happens under the hood in terms of what types of TensorFlow ops these invocations correspond to. And I'm using a diagram to visualize the different types of ops. The gray box is the actual op-- so in this case, TFRecordDataset-- while the yellow boxes are the different inputs for the op, while the blue box is the output of the op. So in the case of the TFRecordDataset, we have a couple of inputs-- file names, compression types, buffer sizes. And an important thing that I want to highlight here is that this op produces a variant tensor, which is a representation of the data set object that can be passed between different ops. And we will see how that's used right away when we're looking at the map transformation. So the MapDataset op, you can see that one of its inputs is actually a variant, which is the downstream data set that produces the elements that the map transformation transforms. Other inputs are-- they're called other arguments. And these are actually the captured inputs for the function. In this particular case, that input would be empty, because the function doesn't have any captured inputs, at least not as outlined in the example. And the round boxes are not inputs. They are attributes. The difference between inputs and attributes is that the attribute values do not change with different executions of the op. They are constant. And the attributes here are function, which identifies the function parse, which is stored separately in TensorFlow runtime-- but it allows the op to look it up when it executes-- as well as the type of the arguments that the function inputs. And again, like the TFRecordDataset, it produces an output variant. So a little more about the use of support for user-defined functions in tf.data. A number of tf.data transformations are operations that actually allow users to specify their own functions. Examples of those are filter, flat_map, interleave, map, or reduce. And irrespective of the mode of the execution, tf.data will convert the user-defined function into a graph. And as illustrated on the previous slide, the function graph is-- a handle to the function graph is passed to the respective op through an attr. A little more detail on the tracing implementation-- it was originally based on framework.function.Defun and recently switched to the same tracing implementation that's used for TF functions in 2.0. This provided a number of benefits, including control flow version 2, support for resource variables, TensorArrayV2, and also the ability for users to specify user-defined functions that are not necessarily graph-compatible as long as they're supported by AutoGraph. And it's marked as work in progress here because this functionality is actually temporarily disabled. And we're working on enabling it back on very soon. So to tie it together, if we look at the input pipeline definition, the four lines, this definition of an input pipeline will roughly correspond to the following ops, and inputs, and attributes. Now, up to this point, we've only talked about how to define the input pipeline. But naturally, the thing that you would want to do with the input pipeline is that you would like to enumerate the elements inside of it. And that's where the iterator ops come in play. Because iterator, it can be thought of as an instance of a data set that has a state and allows you to enumerate the elements in a sequential order. So what are the iterator lifecycle ops? The op on the left top corner called Iterator that takes no input and produces a single output called handle is an op that creates an empty iterator resource, which is a way to pass the state, iterator state, between different operations, while the MakeIterator op takes two different inputs. It takes iterator resource, which is something that we've created by the iterator op, and a data set variant. And what this MakeIterator op does, it instantiates the data set-- sorry, the iterator resource with that particular data set. So at that point, you have an iterator resource that has been initialized to start producing elements for that particular data set as defined by the data set variant. Now, the actual iteration happens by the means of the IteratorGetNext op, which takes an iterator resource handle and produces the actual elements, which can be a tensor, or a nest of tensors, or possibly also non-tensor types. And later in the presentation, I'll talk about what exactly is supported in tf.data in terms of types. And finally, there's also a DeleteIterator op that takes the iterator resource and makes sure that the iterator state is properly disposed of when the iterator is no longer needed. This final op, as you can imagine, is very important to make sure that iterator resources are not being left behind. Because it is not uncommon for the iterator resource state to amass hundreds of megabytes or gigabytes of memory. And leaving these around can result in your computation running out of memory. As a side note, when you're looking at the performance or profiling performance of your program or input pipeline, you will see IteratorGetNext op in something like a timeline, or an [INAUDIBLE],, or CPU profile trace. And this is the op that indicates the output latency of your input pipeline. And so if that op is very small in its runtime-- I would say on the order of microseconds-- it means that your input pipeline is not a bottleneck. And if it's larger than that, chances are you are bottlenecked by input, at least to some extent. So now that we've talked about the different ops, let's actually see how the execution of the Python program corresponds or maps to the execution-- or creation and execution-- of the different ops. And what I'm going to do is I'm going to contrast the TF 2.0 eager mode style of execution with the TF 1.x graph mode style of execution to help folks understand what are the differences between TF-- between the two modes as far as tf.data is concerned. So let's start with the 2.0 eager mode. In eager mode, ops are created and executed as the program runs. So when the Python line that creates the TFRecodrDataset() runs, we end up creating-- both creating and executing-- the RFRecordDataset() op. And similarly, with the next line, we create and execute the shuffle() data set op, feeding the output of the previous op inside of it as part of the input variant. That way, we tie-- we're starting to build the input pipeline, trying it to-- connecting the two stages together. When the .map transformation is executed, the user-defined function is traced and stored in the TensorFlow runtime. And a handle to it is passed, as an attribute, to the map data set op along with the input variant representing the input pipeline up to that point. And finally, the batch() op is created and executed, creating the final stage of the input pipeline. Now, when the idiomatic way of iterating through tf.data is used-- that is, the for loop for element in data set-- what happens under the hood is that an entire method is called on the data set object. And that actually triggers the creation and execution of two ops. We first create the iterator resource through an op. It's called Anonymous Iterator. And I'm going to point out the difference between that and the iterator as I talk about the graph more execution. And then we associate the iterator resource with the input pipeline that we've created via the MakeIterator op. And as the Python for loop iterates, we end up invoking next() on the Python iterator object. And this translates to the IteratorGetNext op being created and subsequently executed. It's only created once, and it's executed as many times as there's elements in the data set. And finally, when the Python iterator object goes out of scope, the DeleteIterator op is invoked, which makes sure that the iterator state, iterator resource state, is properly disposed of. So let's contrast that with how this would work in 1.x graph mode. So in graph mode, the execution happens lazily, which means we create the ops as the Python lines are invoked. But they're not executed-- the execution is postponed until the particular ops are run using decision mechanism. So just stepping through the program, we see that we are building a graph but not executing it. There is a particular mechanism for creating the iterator resource op and the MakeIterator op, as well as creating the op that is later used for iteration. And it's only within the run part of your program that ops are executed. When the iterator initializer op is executed, we actually end up executing the entire graph of the input pipeline, including the iterator op. Now, the difference between the iterator op and the anonymous iterator op that was used in the eager mode is that anonymous iterator op creates a new resource every time it's executed, while iterator op creates a resource only the first time it's executed. And any subsequent execution returns a handle to that resource. And the reason for that is when we run the get_next op, that get_next op will actually execute the iterator op as well by the nature of the difference between the graph mode and eager mode executions. And so that's kind of an artifact of graph mode execution. And thus we need different types of resource creation ops for eager mode and graph mode inside of tf.data. And there is no explicit delete iterator op. And that's because the iterator resource lifetime is tied to the lifetime of the surrounding session. And so when the session is destroyed, so is the iterator resource. So far, so good? OK. So let's now, after-- now that we've kind of peeled the Python layer, and we talked about an op-level view of tf.data, let's dive a level deeper. And let's talk about what actually happens inside of the kernels that implement these ops. And like with most other TensorFlow op kernels, these are implemented in C++. So before we retrace our steps or take a look at the example program from a C++ point of view, let's talk about what are the important tf.data C++ abstractions. So the top-level one is a data set op kernel which implements the op kernel API. And this provides a mechanism for implementing different types of data set ops through a single interface where the different implementations of the data set op kernel interface just need to override or implement the MakeDataset() method. What the MakeDataset() does, it returns a DatasetBase object. Now, the purpose of the data set op kernel is to provide a translation between a graph representation of the op and a C++ representation of the op. Now the data set object-- DatasetBase object-- in turn has a method for creating an iterator for that particular data set as well as a method called AsGraphDef(), which provides the reverse of what I just talked about, which allows you to basically go from a C++ representation of a data set back to a graph representation of a data set, which will come in handy when I talk about static optimizations of tf.data. The MakeIterator() method of the DatasetBase returns an IteratorBase, which is an interface representing the different types of iterators we have. And the single most important method in that interface is GetNext(), which is the actual C++ method used for iterating through the state of the input pipeline. And coupled with that is the IteratorResource, which holds the state. And so as we will see, the IteratorResource is actually the entry point into the connected structure of different C++ iterators through which ops like IteratorGetNext get receive data. And the SetIteratorFromDataset() method corresponds to the MakeIterator op, as we'll shortly see. Last but not least, tf.data has two C++ abstractions for representing functions, its CapturedFunction and InstantiatedCapturedFunction. The CapturedFunction provides a mechanism for bundling a function with its captured inputs and later instantiating it. And the InstantiateCapturedFunction provides tf.data with a mechanism to actually run the user-defined functions. And so you can perhaps see how there is a simple relationship between DatasetBase and IteratorBase and CapturedFunction and InstantiatedCapturedFunction where the letter is an instance of the former in both of those cases. All right, so let's go back to our example. And now we're going to take a look at what happens when we execute the different lines of the input pipeline, but what happens in the C++ world. And unlike in the previous Python view section, in this section, the diagram at the bottom will not be graph objects, but they will be C++ objects. So in this case, the TFRecord Data set down below is actually an instance of the TFRecord Data set that's of type DatasetBase. And for context, we are, again, in TF 2.0 eager mode. So when the Python program executes a tf.data TFRecord Data set with files argument, we end up creating the data set variant through the following C++ code. And just for illustration, I'm showing here how that op kernel fetches the set of file names. And the set of file names can either be a single string or a list of strings. So there is some subsequent string parsing that's elided here. But the important bit here is that we're then storing the TFRecord Data set op data set in the output, where the data set object itself is a variant, which allows us to pass it on as an input to another op. And that another op is the ShuffleDataset, which gets executed immediately after. And so here I'm illustrating how the op receives or kind of extracts the variant tensor input from the op kernel context and then passes it inside of the ShuffleDatasetOp so that the ShuffleDatasetOp now understands what stage is producing elements for it to consume. Next, it's the map transformation. What I want to illustrate here is how the CapturedFunction mechanism works. We use the CapturedFunction Create factory that takes the identifier of the function, which is a list of attributes, as well as any captured inputs if there were any stored in the op kernel context. And similar to the ShuffleDatasetOp, we end up passing the captured function as well as the input to the downstream data set inside of the constructor. And finally, there is not much new to see here for the BatchDatasetOp. So I pretty much elided all the details from this slide. OK, so now for the interesting stuff, because this is where we are going to start iterating through the data set. So the first thing that happens when you call-- or when you write "for element in dataset"-- under the hood, this gets translated to a Python iter invocation on the data set object. And the first thing that happens is that we create the anonymous Iterator Resource. And here is just an illustration of the actual mechanism that does this as well as the code that then produces the handle to the iterator. And this handle, along with the variant tensor representing the batch data set is then passed to the MakeIteratorOp. So here you can see how we extract both the data set variant as well as the resource handle and use these two to pass them into the SetIteratorFromDataset() method that, as we will shortly see, will, in a cascading fashion, create a sequence of connected iterator objects. So let's take a closer look at what SetIteratorFromDataset() does. It takes a look at the outermost data set, because that's the variant that it received. And it invokes MakeIterator on that particular data set. And this will prompt a creation of a Batch Iterator using the MakeIterator() method on the Batch Data set, but also trigger a recursive MakeIterator() invocation on the input of Batch Data set, which is Map Data set. And so in that fashion, the Map-- in a similar fashion, Map Iterator is created, where the Map Iterator creator will also instantiate the captured function. So now we'll see that we have a parse_fn instance in addition to just parse_fn CapturedFunction object. And similarly, we create a Shuffle Iterator, and finally, the TFRecord Iterator. And because TFRecord Data set has no inputs, this is where the recursive creation of the input pipeline state stops. And the control bubbles up back to IteratorResource, that returns a resource handle-- actually, MakeIterator() doesn't return a resource handle. We already have the resource handle. AUDIENCE: Question. YURI: Uh-huh? AUDIENCE: Does the input pipeline have to be a line? Or can it be a DAG? YURI: So at the data set level, the input pipeline can be a DAG. At the iterator level, it will always be a tree, if that makes sense. OK. So next, let's take a look at next(). So when the IteratorGetNextOp is invoked because we're starting to iterate through the elements of the input pipeline, we again look up the resource using to LookupResource() method and then call to GetNext() method on the resource. And as I mentioned earlier, iterator resource is thus the entry point to the state of the iterator. And what happens is this recursively calls GetNext() on the Batch Iterator. And the Batch Iterator says, well, I need batch size worth of elements. So let me get them one by one. So it calls to Map Iterator to say, please give me an element. Map Iterator says, well, I need an element to apply a user-defined function on. So it's going to ask Shuffle for one. And Shuffle says, well, I need a buffer size worth of elements to do reasonable shuffling. So it's going to call to TFRecord Iterator. And TFRecord Iterator will say, OK, well, I have these files, and I'm going to open and start reading elements out of them. So at this point, we start returning data back up this round trip between Shuffle and TFRecord Iterators. It might happen multiple times initially. And at some point, Shuffle or has filled its buffer off of elements used for shuffling, and produces a random element back up to the Map Iterator, which then applies the user-defined function on it, and takes its output, and returns it back to the Batch Iterator. And this would be repeated batch size number of times. Then the Batch Iterator would take all those elements and create one higher-level, higher-dimensional element out of the individual slices of the batch and pass it on to Iterator Resource. And that would get created-- returned out to the Python program. Now finally, when the Python iterator goes out of scope, the Iterator Resource is deleted. And in a cascading fashion, the other iterators get created because their ref count goes to 0, or, we actually use smart pointers that are kind of connected between the different iterator objects. So far, so good? Any questions? OK. So up to this point, I talked primarily about input pipeline that wasn't trying to be performance-savvy. And if it wasn't obvious from the walkthrough of the stages of the iterator, it seems like there's a lot of steps that would need to happen to produce a single batch. And if all these steps lie on the critical path of your computation, then you're probably not executing at the best possible performance, or at the peak of your performance. So we have a tf.data performance guideline that talks about different mechanisms to make sure that your input pipeline is performant. And the three main ones are software pipelining, processing parallelization, and I/O parallelization. And they all use various performance artifacts, either buffers or parallelism, to allow users to specify how their input pipeline should be executed. And in the context of the parallelism-- and actually, pre-fetching as well-- they all map down to asynchronous threads being started by the data set of kernels. And they are running in the background, disconnected from the GetNext() calls, generating values into an internal buffer. And when a GetNext() call arrives, it just waits until there is something that buffer and returns it. So in the ideal case, there is data in the buffer, and you don't need to wait at all. But in case your consumer of the data is faster than your producer, then you might wait some of the time, but hopefully not all of the time. So let's just take a look at how this would change the diagram that we talked about. So what I did for the sake of an illustration is I added the num_parallel_calls argument to the map transformation as well as a prefetch transformation at the very end of the input pipeline. AUDIENCE: So you can prefetch anywhere [INAUDIBLE]?? YURI: You can. Yes. Rule of thumb is that the one at the very end is usually the one that you get the most mileage out of because it allows you to overlap the computation the entire pipeline with the computation that might be happening in the model, either on [INAUDIBLE].. But yes, in theory-- I have a pertinent quiz later on. We'll see that prefetching or decoupling the producer and consumer anywhere throughout you input pipeline might be a good idea. So the changes that reflect what happened, or changes in the pipeline code map to the following changes in the diagram. We now have a Prefetch Iterator and Prefetch Data Set at the end of the pipeline. And we also have a ParallelMap Iterator and ParallelMap data set, instead of just regular Map Iterator, Map Data Set. It turns out with tf.data, we have a different op kernel from the one that uses parallelism. AUDIENCE: Do you support Racket tensors yet? YURI: No. But the CL that introduces that support on-- actually a couple of CLs-- but we're very close. AUDIENCE: Excellent. And do you see any performance decreases when using Racket tensors? YURI: So because we are not supporting them yet, I don't have a good answer to that. AUDIENCE: Yeah. YURI: I think, through the course of the review process for bringing that support in, we've been cognizant of making sure that the implementation is efficient so that it works well out of the gate. AUDIENCE: Yeah. But you support sparse tensors? YURI: We do. AUDIENCE: OK. YURI: Yeah. So the hope is that the programs that use sparse tensors that also use Racket tensors will see a performance boost by switching to Racket tensors once that support is rolled up. AUDIENCE: Excellent. Thank you. YURI: Mhm. And so the runner threads that are our illustrated here, these are the background threads that decouple the producer and consumer in the Prefetch Iterator and the ParallelMap Iterator respectively. And they're actually not started until the very first getNext invocation. So before you call getNext for the first time, the iterator is idle. There's no activity happening. But the moment you start fetching data out of the iterator, background threads might be started or thread pools might be created that might start performing a background entity. An interesting, exciting thing, or a consequence of this, is that a traditional way to look at the performance of TensorFlow would be a timeline, which gives you a view into what happens in the context of a single stack. And this particular abstraction doesn't match well with the asynchronous nature of tf.data execution. In addition to that, you will only see the iterator get mixed up, which might not necessarily give you a good view into what's actually happening at the different stages of the tf.data into pipeline. And so at the recent Dev Summit, we announced that there is going to be an open source version of a tool that we've had available internally for some time that provides you with all these details of information so that you can debug the performance of your input pipeline using the same tools that we use internally. OK. So that concludes the section where I talked about C++ and what happens in C++. And there's a little bit of a switch now, because we're going to go back to Python level and talk about supporting non-tensor types. So tf.data supports more than just regular tensors. The different inputs or outputs of your data transformations can actually be a number of different things-- sparse tensors, tensor arrays, nests of any of these optionals, as well as nested data sets. And here, I just illustrate it. It's not an exhaustive list, but I just illustrate some of the transformations in terms of the types that they support either as an input or an output. AUDIENCE: What about NumPy arrays? YURI: NumPy arrays are supported as well. They kind of fall into the category of tensors by virtue being of trivially convertible to tensors. AUDIENCE: Yeah. YURI: Similar to NumPy, there's something called SparseTensorValue, which is really just a Python namedtuple return type. And that's kind of the NumPy equivalent for sparse tensors. And I think Racket tensors have the same. They have a Racket tensor value. AUDIENCE: With the caveat in [INAUDIBLE] and in eager, you don't actually need the value types because you can have a sparse tensor or a Racket tensor whose values are eager tensors, which are trivially convertible to NumPy arrays. YURI: Yeah. Yeah. So the value type is an artifact of graph mode 1.x. So the mechanism that tf.data uses under the hoods could provide support for these different types is the tf.data structure API, which is this interface and will require any type to be supported in tf.data to implement this interface. I'm not going to talk about each of these, but the list of methods is neither short nor long. So for example, the support for TensorArray was introduced less than a month ago. And it was one day of work. So I don't think that the overhead of introducing the support for the type, as long as it's kind of natural how to implement this method is very large. Having said that, the support for Racket tensor has been in the works for some time. And part of it is because we actually want that implementation to be very performant. And so it prompted creation of new C++ kernels to make sure that the performance is good from the get-go. Instead of talking about the individual methods in the interface, what I want to do here is I want to illustrate how this interface is actually used to provide the polymorphism at the Python level for different types of tf.data transformations. So for instance, if we look at the tf.data data set from tensors transformation, which is a data source that just take a memory array and use it as a data set source, what the implementation at the Python level does is it computes, or it involves the structure from value methods, to compute an instance of the structure object and stores internally in its attribute. And then it passes the output of structure to tensor arrays, to the op kernel-- so the tensor data set op kernel. I forgot to mention earlier, at the C++ level, tf.data only deals with flat lists of tensors. And so we need a mechanism to go between that representation and the Python numTensor nested structure of also arbitrary types. And it's the to_tensor_list and from_tensor_list that provide us with this boxing and unboxing, if you will, between the two representations. from_tensor_slices is similar. The difference between from_tensors and from_tensor_slices is that instead of viewing data as a single tensor, we end up slicing it. We assume that the value has a rank of at least 1. And we end up slicing it into however many slices it has in the dimension. And these are the invocations of the structure API that would allow us to do this kind of agnostically to the actual type of the Python value. I also want to illustrate how the structure API is used in the context of user defined functions. In particular, the function that we end up tracing is actually a different function than just the function the user process. We end up wrapping the function that gets passed in the Python program in invocations to from_tensor_list and then to_tensor_list. And this is kind of the glue that I talked about where we make sure that the Python record expects a flat list of tensors. And then we reconstruct the structure and the typing, using the from_tensor_list invocation, because that's what user provided function expects. And then we again deconstruct the structure and box all the known tensor types and tensors, because that's what the upstream transformation of tf.data expects. All right. So next up, let's talk about static optimizations, which illustrates that everything that I talked about up to this point about how tf.data works is only part of the truth. So in tf.data, we have a number of transformations or a number of optimizations implemented. Here is a subset of the ones who we have. And the ones that are in italics are the ones that enabled by default. And the ones that are not italic, then those can be enabled through tf.data options. This is how you would, for example, go about enabling a map vectorization transformation if you wanted to. The tf.data options has other features. It's not just for optimizations, it is also, for example, for specifying threading or statistics collection features. But in the context of static optimizations, I'm just illustrating how it's used for those. So what happens at the Python level when iterator is created is that the data set Python object has an options object associated with it. And we use the information in the options object to possibly chain additional data set transformations on the end of the data set. This is something that the user doesn't see in their code, doesn't write in their code. It's something that we do at the Python level to allow us to insert functionality that we would like to insert. And one of the main uses of this is this optimized data set that's used as an entry point for any static optimizations that are to be applied to the input pipeline. So if we take a look at the C++ level, what's happening inside of the optimized data set kernel is we'll again get the input of the data set, and then invoke a data set optimized method on the optimized data set kernel. And the code is actually quite long, so I just summarized it in high level statements here. This is what happens inside of the optimized method. We first use the AsGraphDef functionality to go from the C++ representation of the input object to GraphDef representation of the input object. We then use Grappler to apply the subset of tf.data optimizations that are either enabled by default, or explicitly enabled by a user, which will give us a transformed GraphDef representing the data set. And we then convert the rewritten GraphDef to a C++ representation using GraphRunner. And finally, we update the input with the result. So because map and batch optimization is one of the optimizations enabled by default, the map and batch stages that were in our example would be, in fact, replaced with a single map and batch data set, which is a more performant version of diffused map and batch transformation. And last topic that I want to talk about are dynamic optimizations. So I mentioned before that users have a number of mechanisms to make their input pipelines more performant. They can insert prefetching with buffer_size. They can insert map with num_parallel_calls or interleave with num_parallel_calls to apply various performance optimization tools. But what are good values of these arguments, like buffer_size and num_parallel_calls? Well, so in the context of this section, I'm only going to focus on the parallelism optimization. And to get you interested, I have a quiz for you. [LAUGHTER] So imagine that you have an input pipeline that looks like this. It reads from a set of files, and then it applies two transformations. And on the right-hand side in the comment, I mentioned how much time is required to get a single element, assuming constant processing time through the particular stage of the input pipeline. So with this information, how much time do you think you would need, when you call getNext, how long would it take to get a single element out? AUDIENCE: 200 milliseconds. YURI: 320. AUDIENCE: 320. YURI: 320 is the answer, because everything executes sequentially. So when you call into the outermost map, that goes recursively into the innermost or the inner map, which calls recursively into the tf record data set. We spend 20 milliseconds there, then we spend 100 milliseconds in the inner map executing f, and then 200 milliseconds in the order map executing g. So it's 320. I think you kind of jumped one step ahead here. [LAUGHTER] And this was supposed to be a trick question, but you already got the answer. What happens here if we just add num_parallel_calls to the map transformations? Nothing, except something happens. And the reason for this being 200 milliseconds is that num_parallel_calls uses a different op kernel which has a background thread that will be performing activity independently of the consumer of the data. So the very first element will actually take 320 milliseconds. But then over time, there is going to be the processing done for the three different stages will be actually overlapped because there is now two background threads doing everything in parallel. The parallelism is 1 at each stage, but that still gives you 200 milliseconds, in total, in the stable state, assuming-- AUDIENCE: Is that the correct mental model to think that this implies prefetch? YURI: Yes. That a very good mental model. Parallel map is, in fact, the prefetching by the virtue of using a background thread. And so in a way, this is an answer to your question where prefetching inside of the input pipeline, not just at the very end, might provide benefits. AUDIENCE: Question. So does that imply that the map function has to be traced? Like, for example, if the map function is just a Python function, and if you have multi-threading on Python function, then really [INAUDIBLE]?? YURI: So I think the answer is that you will get these benefits, irrespective of the cost, the constant cost of the Python function is-- of the function passed into the map transformation. If it's implemented as a py_func, that function itself might be-- oh, I see what you're saying, that multiple functions would be escaping into Python. That's a good point. Possibly. I would want to convince myself that they actually are all content for the same [INAUDIBLE].. AUDIENCE: Either way. AUDIENCE: If you ever use Python, you need to make sure your Python is thread safe. It's very hard for a TensorFlow runtime to not accidentally run Python code from many threads at the same time. YURI: Yeah. I think the bottom line here is that if you can avoid using py_func, for instance, use autograph. So it perhaps might not be surprising that if you increase the values of num_parallel_calls, because you know how much time you're going to spend in each of those stages, you can get to the optimal output latency of this input pipeline. You cannot run any faster than the slowest part of the input pipeline, which in this case is the sequential TFRecordReader. There might actually be a way to speed this up even further, by using interleave, num_parallel_calls over the different readers. But instead of exploring that avenue, what I want to ask is, what do you think happens here? AUDIENCE: Dismal performance. YURI: Yeah. I think the answer is, it depends. It might actually run well, well enough, because num_parallel_calls doesn't mean that you create that many threads, at least in the case of map transformation anyhow. It means that you allow to schedule as many ops into the interop thread pool at the same time. And because you're allowed to schedule them at the same time, you need a place to store them. So if nothing else, the downside of specifying a very large value of num_parallel_calls is that you're going to use more memory to store these intermediate values than you would actually need for equal performance, which might hurt your temporal locality or thread locality. So yes, the performance might actually become worse, but the reasons for why can be subtle and environment-specific. AUDIENCE: You said earlier that ops create their own threads. Is that actually the case? Or do they use the shared thread pools in the executor? YURI: They create their own thread. Parallel map, prefetch end up creating their own thread. Parallel interleave creates its own thread pool. Under the hoods, they end up using this abstraction of an unbounded thread pool, if you are familiar with that, which was introduced recently to combat memory fragmentation issues in the open source memory allocator, resulting from excessive thread creation. So the unbounded thread pool that the tf.data uses creates this illusion of logical threads that are mapped onto a smaller subset of physical threads. But they're different from the inter_op thread pools or any of the core TensorFlow runtime thread pools. We do rely in tf.data data on the inter_op thread pool for the execution of the user-defined functions, by default. But there is also an option to override that. And we also, by default, take advantage or inherit the setting of the inter_op_parallelism. And there is also a way to override that just for the tf.data ops. And as a matter of fact, our experience has been that disabling inter_op_parallelism for-- and this is perhaps not a surprise to you, but disabling inter_op_parallelism altogether for CPU content input pipelines gives you 10%, 20% speed up. Because you don't need to parallelize the individual ops, you get the parallelism by running multiple of them in parallel. AUDIENCE: Do we have any guides internally about how you can do tricks with tf.data to get performance enhancements? Or not so much? YURI: Great question. So instead of having guides, why not just have tf.data do the hard work for you? AUDIENCE: Excellent. YURI: And so this is something close to my heart because I worked both on tf.data performance in the early days in terms of exposing these knobs, and then I worked very hard on making sure that users don't need to use these knobs because there is something that does a good enough job automatically. And you might think, well, this seems like a good fit for reinforcement learning, since we're in RMI. And that's something that I explored as an idea. I didn't get it to work, but that might be because of me, not because of reinforcement learning. [LAUGHTER] The issue with reinforcement learning-- so the idea is this. What if you just pick some values for the different paths and knobs in your input pipeline, observed behavior, and then try some different values and use a smart algorithm that kind of converges, hopefully, to global optimum. Well, it turns out that the convergence was very slow. And if you set abysmal parameters for something that could be heavily parallelized, then you would need a very long time to actually realize that this is slow because of poor parameters, as opposed to this is slow because that's how fast the input pipeline runs. So instead of exploring reinforcement learning that tries to modify the parameters, or tf.data has an analytical model that models the performance of the input pipeline that's currently instantiated. And there's a little bit of math in my presentation because I'm originally a formal verification person. So here's some math for you. You can model the output latency of a node as a function of the output latencies of its inputs. And the challenge was, how do you implement these two functions-- the processing time of a node, as well as the lambda function that captures what the node itself does? So the processing time is modeled through a lightweight instrumentation of the C++ implementation we have. By lightweight, I mean it imposes roughly 100 nanoseconds of overhead on the critical path. And we keep track of node creation, deletion, as well as the computation, the processing time, spent within a node and a user-defined function. And the lambda, which is the transformation-specific functionality which maps how the output latencies of the inputs correspond to the output latency of the op, turns out there is a fairly small number of categories of node for each of which there is a different type of lambda. And the autotuning mechanism then takes advantage of these two implementation artifacts to tie it all together. And we start a background thread that periodically performs the following optimization. We snapshot the state of the analytical model, which captures the processing times of the different iterators that are currently floating around. And then we perform a very simple hill-climbing algorithm that allocates threads or cores to parallelism knobs that provide the greatest benefit. So it's really algorithm, assuming that this optimization is actually monotonic. And if you do that, you can specify AUTOTUNE for num_parallel_calls in this example. And as long as you have 16 or more cores, you get the same output latency that you would get out of the manually tuned one. And the key is that it will actually not over-provision by an order of magnitude. It might over-provision a little bit, but not 1,024. And that's it. [APPLAUSE]
B2 data data set tf data tf pipeline input Inside TensorFlow: tf.data - TF Input Pipeline 3 0 林宜悉 posted on 2020/03/31 More Share Save Report Video vocabulary