The Petabyte Project, the Most Valuable Idea

I'm setting a goal: I want to be able to iterate through 1 petabyte worth of data in a few minutes.

1 petabyte, 1,000,000 gigabytes, all iterated through in 5 minutes. Per-gigabyte, that's a total allocated time of 0.0003 seconds per-gigabyte required to pull this stunt off... That's O(1) time! Although this is a ridiculous goal, I'm setting my optimism up to be crushed so we can have some fun and experiment with indexing and caching technologies. No caveats. This is my goal, but it doesn't have to succeed!

Note that this article/idea is evolving, and I'll be editing/appending to the content of it over time. So please follow and watch out for new updates as I progress on this problem.

Where did this idea come from?


Nambucca Heads

As I was driving along the highway to the beautiful Nambucca Heads in the Mid-North coast of Australia for a meeting, I had the windows down, sharp sunglasses on, and a rivetting DataBricks lecture playing. During this lecture, they talked about the "separation of compute and storage" (hilarious) - which is a wonderful way to sum-up the cloud revolution: separation, and elasticity. At this point, I knew something I had not yet admitted: YDP would NOT be a storage company, because every other cloud platform under the sun had conquered that, and invested the billions to facilitate that. From there, I realized YDP could host users data on S3, or import from S3 - which is no revolution.

BUT... what if our "data lake" (a term used a lot in that lecture) could use some ground-up innovations in order to attempt to make iterating through "big data" easier? Instead of trying to provide the "aggregator" and "house", we can plug-in these and provide an incredibly powerful set of tools for developers (built by developers) to solve big-data problems. From there, I thought of...

PolyTrace, and Iterators

Don't get scared... These ideas aren't crazy or unique, they're just a unique application of them to this problem. To summarize before digging deeper, these ideas can be described as:

  • PolyTrace: the ability to modify a few classes in Python, provide them to a user, and record their interaction with them in order to get an "access history".
  • Iterators: everyone in Python loves them, but this is an attempt to expand their use into structures not often assosiated with them.
  • PolyIndex: the ability to index a polymorphic sequence into a flat string.

Big Data and PolyTrace

What is PolyTrace? Well, for one thing, it's a concept name... But what problem does it solve? Well, in YDP, we're already primed with providing the user pseudo-elements. Users can think they're interacting with something, but in reality be interacting with something else. But, YDP makes it work.

PolyTrace is an excellent example: a user may want to import and iterate over a large, 15gb JSON file. Say they do:

*user wants big_data, and will save avg_age as a dataset*

avg_age = []

for row in big_data:
    # row = [123, {"name": "Jack Hales", "age": 22}]
    id = row[0]
    user_data = row[1]
    name = user_data["name"]
    age = user_data["age"]
    avg_age.append(age)

avg_age = sum(avg_age) / len(avg_age)
print(f"Average age of our user-base is {avg_age}")

(In reality, an iterative "big data" approach would use an iterative algorithm to solve this problem if it was pulling it all in to compute in a single node.)

The user would run this, and think: "Cool! YDP has a huge chunky server with heaps of RAM (15gb JSON = around 45gb RAM in my experience), and pulled it all in, and iterated over it like I said". But, the reality is VERY different... In this system, the flow would look like this:

  1. User links their "users.json" file, and YDP indexes it using in-house indexing methods.
  2. User makes a dataset creator for "Average User Age" with varname "avg_age".
  3. YDP provides "big_data" for "users.json" as a PolyTrace object.
  4. For the first invokation, each iteration on "big_data" will provide a PolyRow object. The user will access this, thinking it's a row of their real data, but the PolyRow will record each index invoked and the access history of it.
  5. From there, YDP gets a list of accessed data in each iterable row. In the above example, the access history will be:
    1. id:   row[0]
    2. name: row[1]["name"]
    3. age:  row[1]["age"]
  6. From there, YDP can create an index of the data, and pull only what's necessary due to step 1's indexing. In this example, unnecessary data is being pulled which is not used in the computation - but there may be a way to flag something as "used" but I've not worked on that yet.
  7. YDP can now begin to stream the data from the storage locations, and re-run this code by setting "big_data" as a generator which will provide rows of this indexed data which fits the schema so no KeyErrors occur.
  8. Now: the code is running properly, "avg_age" will be calculated, and the dataset will be saved.

You can quickly see how this would be a very attractive tool in a developers belt if they're provided with a big chunk of polymorphic JSON and don't want to do the work in cutting it up, indexing it, or streaming it using a ijson-like library (or kv iterator). But, my plans go much further than just iterating over JSON efficiently: I want to make it a common format for CSV, XML, and so-forth, and make it insanely fast (hence the petabyte project).

PolyIndexing

PolyIndexing is the concept of taking a polymorphic structure, converting it into an iterable series of chunks, then breaking those chunks up into groups depending on their indexing/polymorphic structure. If a big group of iterable chunks have a certain index, it will be tremendously efficient to port them all into a group under a folder, and place all item data in the subsequent files in that folder.

Let's have some fun: our "basic benchmarks" to smash

To start, I wanted to use a format which is typically not seen as polymorphic: CSV. CSV is a simple file, which can either use quotation marks to distinguish between strings and commas to separare strings of data. I wanted to start the journey to iterate by creating a gigabyte, and use the time-to-iter-a-gb as the benchmark to clock against, then used to forecast the time for a petabyte. I've opted for single-core performance, then used the relevant multi-core performance metrics when possible/necessary.

The Data 

The data I've opted to purely iterate through is only a CSV file with two headers: Name and Age, and rows Jack, 22, repeated until getting to 1gb of file size (1,044,481 KB to be specific). I've then tested various methods to find their best speeds.

First tests: setting the playing field

Using the csv.reader module

23.7 seconds


But provides the data to work with... Spoiler: using PyTables reduces this iteration time (and provides the same data) on the same machine to 2 seconds. The only downfall on this method is the tables need to be in a predefined type and predefined rows help in optimizing call speed.

Iterating over raw lines using open

9.66 seconds

Iterating using sized chunks

3.46 seconds (best, at 46,000 byte chunks)

By forecasting out the best results, this gives us a best-iter-time of 57.7 minutes per-terabyte, and 40 days per petabyte, just to iterate and do no compute on the raw data. Holy... fucking... shit... I seriously think my optimism hasn't had any idea whatsoever to how large a scale a petabyte of data is, and it just sunk in... In order to get a petabyte down to a few minutes, I'd have to improve this iteration time by almost four orders of magnitude (9,611 times to be precise).

On the bright side: each once small improvement brings down this time significantly. A 2x improvement in speed means a reduction in 20 entire theoretical days! To remind the user, the goal time is 0.0003 seconds per-gigabyte... 

One night of this rattling around in my head...

I had a weird idea: why not test this on another machine? The only machine on-hand at the time was my Macbook Air 2020, with an M1 chip and 8gb of RAM: a perfect, basic candidate. By running the same iterator using sized chunks progam, it clocked a whopping 0.14 seconds to complete the 1gb chunk! An insane 24.5x increase in speed from my Windows SSD SATA machine! With this increase, that takes the petabyte iter-time down from 40 days, to around 1.5 days, a beautiful reduction.

Now to benchmark using standard drive benchmarking tools... How does my SATA SSD compare to the M1's SOC SSD? Using CrystalDiskMark and AmorphousDiskMark, the details are as follows:

Windows SATA SSD Results

Windows Samsung SATA SSD CrystalDiskMark results

Mac Air M1 SOC Results

Macbook Air 2020 M1 CrystalDiskMark results (Amorphous Disk Mark)

What?! Although I know I've got an incredibly good and performant SSD, my first inclination was to blame the "immature" transfer speeds compared to Apple's SOC... but that doesn't seem to be the problem. When moving to an 3x8GiB test, the opposite is true and my Windows machine is 6x slower than my Mac machine in the basic tests (random is on-par). I really don't know who to trust now... Is it cache sizes? NAND flashes? No idea.

Moving forward, test another OS

To further add to this knowledgebase, I tested on a micro Linux instance I forgot I had on OVH. I cloned the repo I'd used for testing, ran the dataset generator, ran the tests, and clocked nothing suprising on the reader tests, but 0.30 seconds on the best-performing test compared to Windows' 3.6 seconds to-beat (10x faster). 

Concurrency, threading, processes, how do they work with IO?

From here, back on my development machine, I devised a new test: to use smaller sets of files that equal to 1gb but are iterable segments of a basic structure (still CSV). I split up the large data into 69mb files, 15 in total, and ran some tests. My main question is: how does IO scale? Is it threading-separated, process-seperated, or single-file separated? 

Windows, 65536 byte chunk size, results were:

Threading with 1 thread: 3.6 seconds. 2 threads: 3.58 seconds. 15 threads: 3.45 seconds. So, threads don't do anything.

Process pools with 1 process: 3.6 seconds. 2 processes: 2.19 seconds. 4 processes: 1.33 seconds. 15 processes: 1.15 seconds. So, file access can be asynchronous to each process, great to know! 

It's helpful now to know that I have a Intel i7-6700K with 4 cores (hyperthreaded), 8 threads.

Now to test on my Macbook

With 15 processes: 0.23 seconds, slower than the iterative chunk method. With 15 threads: 0.17 seconds. Weird!

These results make me feel as if the threading/processesing will be effective on Linux/Mac for compute processing, whereas on Windows it brings the time down to iterate through the data much quicker. With this in mind, I feel as if a Linux machine works best for converting the given data into chunks, but also wins-out with the chunk iterating since it's still 5x faster than Windows.

Idea: can compute out-run IO? What if I can compress my data in chunks, but decompress it at runtime, and have that run faster than the loop? The idea is that "compression upfront, then decompression at runtime will be faster than raw iteration". I doubt it, but I want to test it.

Alright, let's test that theory. In order to get a good idea of whether or not this idea will work, I will use Python's zlib on 15 chunks of compressed data. From there, when iteration time is needed, I will decompress the data and that will count as the full iteration. I will also tweak the level of compression. Let's see how it performs...

Using our dummy data (very dummy), the 69mb chunks compress down into 382kb each, so this is a good "best best best best case" example. Windows results:

Iterating through the 15 files 1-by-1 (no concurrency) and decompressing, the total time to iterate is 0.93 seconds, where if you decode into a string the time is +30% (1.23 seconds). This is a great improvement, as this is before asynchronous improvements. After concurrency (15 threads), the time is 0.64 seconds. When switching to processes, the time is 0.53 seconds.

When using 8 processes (optimal, less volatile) and increasing the compression rate from 1 (0.285) to 9 bring the time to 0.293 seconds - no real benefit, and potential negative effect.

Now... Moving to Mac... the best time (processes) is 0.15 seconds with string decoding, and 0.07/0.08 without the decoding - insane! At this speed (if I could make a byte-iterating algorithm), the time-per-petabyte would be 22.2 hours - we're under a day at this point!

For the record, doing this as a single large chunk (compressed 1gb) takes around 1 second on all compression levels. On Mac, 0.28 seconds - no improvements here. Note also that it's possible to compress streams, which is a great possibility thanks to the compressobj method in zlib - and could be used to convert large datasets into compressed data in the indexing process.

What's the next move?

Image by GR Stocks https://unsplash.com/@grstocks


I'm still trying to work that one out. So far, we've "learnt" a few things which can help us get a picture of what chess pieces we can move next. Below I'll list a few things learnt, and questions moving forward.

Learnings

  • I/O seems to be processor-bound as well as drive-bound.
  • Decompression+compute seems to be faster than raw reading.
  • I/O can be sped up using decompression, as-per this answer I wrote for fun.
  • The cost of around a petabyte of data in raw drive cost is well over US$20,000 just for the drives as per this website.

Questions, Tests, and Further Learnings

  • Can I speed up IO delivery using a buffered reader? The benchmarks could assist in knowing what the theoretical limit of my drives are.
  • Can mmap be used somehow? Will this article help? Results: this worked very interestingly, and provided more ready-to-go data than the byte chunking concepts. This (on Mac) brought the iter time down to around 0.02-0.03 seconds per-GB, which is an insane throughput. This speed boost is provided only when we're able to deal with bytes, and bypass all the OS kernels to stream file data from the kernel level straight to the application using less system calls.
  • What is the iter time of the chunks are pre-loaded into memory, and can be directly iterated over? Test both regular Python and Numba improvements. This will serve as a good theoretical limit to know whether it's possible or not. Results: for Windows, after loading all chunks into an array, this is 0.0014 seconds per-GB. On Mac, 0.0004 seconds (no concurrency) - exactly our want!
  • Can this article comparing cat to the data help? Results: it benchmarked speeds well, and helped in encouraging me to experiment with mmap as above.
  • Test whole JSON parsing based on compressed/uncompressed data vs raw parsing of the file. Results: around 13% faster if there's a massive difference in compressed vs non-compressed data size, otherwise there's less of margin, and potentially a negative margin compared to raw reading.
  • PyTables? Results: this is ridiculously fascinating, and applied some of my ideas presented above. This uses a bunch of C-bindings and other setups to preformat your data, automatically chunk it, and perform a bunch of optimizations by much smarter people than myself, and the iter time (with pre-formatted, column-providing rows) was 2 seconds on Windows and around the same on Mac. This is a big contendor as this is one step above what we're attempting to iterate through, so indexing may be intelligent to do under this system since there's something they call "kernel search" which is a fast search algorithm. Anyway, it's very compelling.

Conclusion (for now)

For now, I've succeeded in building a compelling case for the ability to reduce iterative time by using some I/O tricks and so-forth. For now, I'll take a break so I can continue development on the interface and experience of Your Data Place, and completing some other ideas which are in their infancy. Soon, one day, I'll return to this concept and apply some fascinating indexing technology to attempt to go the final haul.

Thanks for reading, and look forward to hearing your thoughts/ideas.

(I'll be updating this as I continue working on this idea.)

Notes:

Look at this: https://www.leeholmes.com/searching-for-content-in-base-64-strings/

I want to explore this idea for something sharp and new for YDP which may be an axis in bringing the productivity my platform provides to the large "big data" users. This idea I've coined (for the time being) as PolyTrace, which is a tool I'll test which aims to map out a users desired data. The concept is what I *think* a graph database *may* look like, but I don't have much knowledge on graph databases, so I'll look into those so see if it's been solved for already.

Anyway, the idea is to great "polymorphic indexes of data", to make data extraction easier and cheaper for complex jobs.

https://pypi.org/project/ijson/#usage

Use Numba to improve on the efficiency of executing these large runtime improvements.

A petabyte is 1,000 terrabytes, which is 1,000,000 gigabytes, which is a ridiculously large amount of data to iterate through.


Comments

Popular posts from this blog

Shareable data and code in YDP Projects

Siesta of YDP

The start: a week of Rust