Providing Metadata Discovery on Large-Volume Data Sets

Many big data systems collect petabytes of data on a daily basis. Such systems are often designed primarily to query raw data records for a given time range with multiple data filters. However, discovering or identifying unique attributes present in such large datasets can be difficult.

Performing runtime aggregations on large data sets, for example the unique hosts that logged for an application for a particular time range, need high computational power and can be extremely slow. Performing sampling on the raw data is an option for attribute discovery. However, such an approach would also mean that we would miss sparse or rare attributes within large volumes of data.

Introduction

The metadata store is our internal implementation to provide guaranteed real-time discovery of all the unique attributes (or metadata) within truly massive volumes of different monitoring signals. It primarily relies on Elasticsearch and RocksDB in the backend. Elasticsearch enables aggregations to find unique attributes over a time range, while RocksDB enables us to perform de-duplication of the same hash within a time window to avoid redundant writes.

Monitoring signals types

We provide discovery on three types of monitoring signals: metrics, logs, and events.

Metrics

Metrics are periodic time-series data that contain a metric name, a source timestamp, dimensions in the form of a map (K, V), and a long numeric value, e.g. http.hits 123456789034877 host=A 

In the above example, http.hits is the metric name, 1234567890 is the EPOC UTC timestamp, 34877 is the long value, and host=A is the dimension { K, V } pair. We allow discovery of metric names and namespace along with their dimension map.

Logs

They represent log lines from various applications or software/hardware infrastructure.

We represent logs in the following sample structure format:

Screen Shot 2018 11 15 at 2.54.45 PMLogs are always discoverable against a use-case (also known as namespace). Each log line can be a particular type, such as stdout or stderr.

Discovery is also provided on the type (also known as name) of log signal. As shown in the above example, the dimension key and value maps are also discoverable.

Events

This monitoring signal resembles both a log and a metric signal. They can be considered as sparse metrics that are indicated as events within the system. They are aperiodic in nature. For example, a router switch becoming unavailable is logged as an event. Also, they can be verbose in nature, such as a log signal with a lot of text to indicate what happened during the event.

A sample event looks like the following:

Screen Shot 2018 11 15 at 2.54.58 PM

Similar to log/metrics, they have a namespace and an event name, both of which are discoverable. In addition to the dimension key and its values, the value keys are also discoverable for events. Making the field key discoverable allows us to perform aggregation, such as MIN, MAX, and COUNT, on known fields.

On our product console, the discover attributes are highlighted on the following illustration:

Approach and design

All monitoring signals are received initially by our ingress service instances. These service nodes push different input monitoring signals (logs/metrics and events) onto a Kafka data bus topic using a custom key partitioning logic. Metadata store ingress daemons consume monitoring signals from different assigned partitions and then write data onto a backend Elasticsearch that is used exclusively for discovery purpose.

The different monitoring signals that we collect are pushed onto a Kafka bus, which acts as our source data stream. An advantage of using Kafka is that it offers persistence even if the downstream pipeline is under maintenance or not available. We also use a custom Kafka key partitioner on the ingress service to ensure that all keys with same hash always go on the same Kafka partition destination. Different monitoring signals use different hashes internally. For example, we use hash on namespace + name for metric signals, while our log signals use a hash on "namespace + dimension { K, V } map." This kind of grouping helps reduce the overall cardinality encountered on each downstream Kafka consumer, and thereby effectively reduces the total in-memory footprint.

Similar to our metadata store ingress daemon, there are other consumers that write the raw monitoring signal onto some backend stores such as Hadoop, HBase, Druid, etc. Maintaining a separate discovery pipeline provides an easy drill down and subsequent egress of these raw monitoring signals without the need to perform expensive runtime aggregations.

RocksDB is used as an embedded data cache in Metadata Store. This embedded cache avoids duplicate writes onto our backend Elasticsearch data sink. We chose RocksDB because of its impressive benchmark results and configuration flexibility.

When a record is processed by the metadata store ingress daemon, its hash key is checked against the existing cache. If the record was not already loaded in the cache, then the entry gets written onto the search database (Elasticsearch), and its hash key is added to the cache. This is the only time when the record would be written to the database for that debounce period. No action is taken if the record is already present in the cache.

The RocksDB cache is designed to read heavy, but we do see a burst of writes at the beginning of the debounce period (as the cache is reset). For the current load, we have seen reads go above 5 billion and writes in tens of millions into the cache, with most of the writes happening in the first few minutes after the start of the debounce window. For this reason, there could be a consumer lag on the onset of the debounce period. For lower read/write latencies, efforts were made to keep all the cache data in memory on the RocksDB to avoid secondary disk storage lookups. Write-ahead logs (WAL) and compression are disabled. During our benchmarking, we found that a single memtable of 16GB was sufficient to store the hashes.

The above graph represents the number of unique documents that were written onto the backend Elasticsearch. Spikes correspond to the beginning of the debounce period when the cache was reset.

For monitoring purpose, we report all the rocksDB statistics as metrics into our monitoring platform. The RocksDB Tuning Guide describes different database options.

We use Elasticsearch version 6.x to power the backend aggregation to identify different attributes within the monitoring signal. We leverage a 30-node Elasticsearch cluster build on SSD and 64 GB RAM hosts that are managed on our internal could platform. 30 GB is allocated for the Elasticsearch JVM process, and the rest is kept aside for the operating system (OS). During the ingestion phase, documents are hashed based on different metadata on the monitoring signal to uniquely identify a document. For example, logs are hashed based on the namespace, name, and different dimension { K, V } pairs. The document is modeled in a parent-child format, and Elasticsearch indices are created per namespace and per month.

The root or parent document _ids are hashed based on the entire { K, V } dimension map, while child documents are hashed on namespace, name, and debounce timestamp. Child documents are uniquely created for a time window, also referred to as a debounce period. The debounce timestamp is the beginning epoch time representation of debounce period. Each child document, if found within a debounce, would mean that a unique combination of the child document’s namespace and name occurred along with its parent documents topology (or distinct dimension pairs). The shorter the debounce period, the better is the time approximation for the discovery of a unique attribute. There is 1:N association between a parent and a child document within an Elasticsearch index. 

 

Screen Shot 2018 11 15 at 2.55.07 PM

 

 

We have the following parent-child document dynamic template modeling on Elasticsearch:

googleusercontent 5bdbb8511f2c5While the child document has the following template: 

googleusercontent 5bdbb852ab52fWe maintain two load balancers (LB) on Elasticsearch clusters. A READ LB virtual IP (VIP) is maintained on client nodes for all read operations while a WRITE LB VIP is maintained for data nodes. This helps us to perform aggregation-based compute operations onto separate client nodes without overwhelming data nodes.

Elasticsearch is not an optimal data-sink if you have too many writes on the same document, because the segment merges are expensive. At high traffic volumes, such background segment merges greatly affect the indexing/search performance. We, therefore, designed our documents in a way that we treat documents as immutable.

Every index is created on Elasticsearch clusters with the following nomenclature:

For example, cConsider the following indices maintained on our backend Elasticsearch server:

We maintain indices on a monthly basis and they are retained for a period of three months. Purging an index is as straightforward as deleting/dropping an index.

Our Discovery service is a web application that is deployed as a docker image. This service exposes REST APIs to query on backend metadata store. 

Key REST APIs on discovery services are:

  • Find namespaces (or use-cases) on different monitoring signals (logs/events/metrics)
  • Find all names within a namespace for a given time range
  • Discover dimension keys and their values for all monitoring signals for an input namespace, list of names, and for a given time range
  • Discover value keys for an input event namespace and for a given time range
  • Find all namespaces or names based on an input dimension {K, V} filter 
  • For a given namespace, name, and different dimension filters, we can also discover other associated dimensions found for that unique input combinations  

Our metadata store ingress daemon is deployed and managed on an internal Kubernetes platform (also referred to as Tess.io). The application lifecycle for the metadata store ingress daemon is managed on Kubernetes as a stateless application. Our managed Kubernetes platform allows custom metric annotations during deployment by which we can expose health metrics on a known port in Prometheus format. Monitoring dashboards and alerts are set up based on these health metrics. We also expose similar metrics on the Discovery service to capture the error/success rate and average search latency. 

Performance numbers

  • We were able to handle 1.6 million metric signals per minute on 10 metadata ingress daemon hosts (downstream Kafka consumers) without any performance issues
  • Discovery of any unique metadata attributes after ingestion from source is within a few seconds
  • Our debounce window interval is set to 12 hours in our production environment, and within each debounce period, we have a unique cardinality of ~40 million (seen up to 60 million).
  • Currently, we see an average latency of 100 ms for most of the discovery queries that are fired in our production environment today. As expected, we found that queries fired across namespaces are much slower than targeted namespace based queries.
    Screen Shot 2018 11 15 at 2.55.17 PM

Conclusion

Separating discovery functions from the actual data pipeline has allowed us to quickly drill down on the raw monitoring data. The metadata store helps greatly limit the scope of data that needs to be queried and thereby improves overall search throughput significantly. The approach also additionally shields the raw data stores from discovery calls, and thereby saves significant compute cycles on dependent backend stores.