The main purpose of CAL is to facilitate a centralized collection of application servers' local logs and provide reporting on the collected data. These reports provide great insight into important areas of the eBay site—invaluable for making decisions on all aspects of business at eBay.
As a main component of CAL, CAL Ingress is responsible for log collecting, processing, and dispatching. It accepts logs from thousands of CAL Clients, and dispatches those logs to several destinations after processing.
Three logs are collected and processed by CAL Ingress:
- Raw logs
- VID metrics
- Heartbeat metrics
Figure 1 shows the data flow. CAL Ingress receives data from the CAL client via the TCP connection and dispatches different types of data to different destinations. The raw log is sent to Filer directly. VID metrics, which are retrieved from the logs, are aggregated and sent to OLAP 2.0 via KAFKA. Heartbeat metrics are sent to Metric-Store Frontier.
Figure 1. Data flow
Requirements and challenges
As a centralized logging system, data traffic levels are huge. CAL Ingress processes about 2.5PB logs a day for eBay.
Low latency
A CAL client sends logs to CAL by NIO (Non-blocking I/O) long-lived TCP connections with a high QoS (Quality of Service) requirement. Once a bunch of data cannot be sent together, the connection between CAL and client is considered unhealthy. Cached data both in the client and server will be discarded. That is called "TCP Disconnect." Once TCP Disconnect occurs, there is data loss.
In order to alleviate TCP Disconnect and data loss, CAL Ingress should retrieve and acknowledge the data quickly. Low latency is required.
Currently the network buffer in most CAL client machines is 128k. The average data volume is 200k/s for one connection, so the latency of CAL Ingress is less than 0.64s. For some pools with large volumes, the latency should be even less.
CAL Ingress services millions of connections at the same time. Latency time is calculated via the following formula:
High throughput
Currently, CAL processes average 105TB/hour logs, so the average volume is 29.87GB/s. The peak volume can reach 58GB/s.
Architecture and performance tuning
CAL Ingress uses Netty to implement servers that accept client requests and uses a long-lived TCP connection to transfer data. To comply with the low latency and high throughput requirements, we separate reading and handling in different EventLoopGroups. See Figure 2.
Figure 2. CAL Ingress architecture
CAL Ingress services millions of connections (about 1 million in our production environment) at the same time. Ingress alternately reads data from one connection to another.
So Latency Time is calculated via the following formula:
Tperiod is the read interval. Nconnection is the number of connections, and Nthread is the number of total threads to handle these connections. So for each connection, the time interval to read data from it is Nconnection/Nthread * Tperiod. Tgc is the GC pause time.
To decrease the latency, the read interval and GC pause time should be decreased. Non-stop reading and GC optimization is the relative approach.
Non-stop reading
First, we enable TCP_NODELAY and TCP_QUICKACK to make sure the socket reading has not stopped at the network level.
Second, we break the normal data pipeline, read->handle->read; data-read and data-handle are separated.
As a result, reading does not stop.
As shown in Figure 2, data-read and data-handle belong to separate EventLoopGroups. There is one dedicated EventLoopGroup to read data from the socket. In this way, data-reading is not affected by the heavy data handling processing.
Several dedicated threads respond to retrieve/read data from network sockets all the time. After it reads the data into direct memory, it continuously reads the next bunch of data.
Now the reading speed can catch up to the sending speed of CAL clients. Less data is blocked in the client socket buffer, and TCP disconnect seldom occurs.
In previous generation of log-collecting component, the number of TCP disconnections is high; the min is 2500 and the max is 21000. But in CAL Ingress, the number has decreased quite a bit; the min is 4, and the max is just 65.
Figure 3. TCP disconnect for pool r1reco (before)
Figure 4. TCP disconnect for pool r1reco (after)
GC optimization
There is a term "stop-the-world” related to GC. Stop-the-world means that JVM will stop the application from running to execute a GC. Stop-the-world will occur no matter which GC algorithm you choose. When stop-the-world occur, CAL Ingress will stop receiving data, but the CAL client will continue sending data. If the network buffer is full with data, the CAL client cannot send the next bunch of data all at once, and TCP disconnection will occur.
We chose G1GC as our GC collector.
In order to let CAL Ingress survive stop-the-world, we must tune GC in two ways:
-
Reduce the GC frequency
-
Decrease the GC pause time
Off-heap
In a normal way, all the data is disposed of in memory. That means the data will be consumed from direct memory into heap memory at first.
CAL Ingress adopted an off-heap mechanism. Instead of reading data from a network buffer and creating the internal objects in heap memory, the received data is copied to pre-allocated off-heap memory directly and operates the memory without copying them to the heap as an object. Because all the data is stored off-heap, less heap memory is used, and the GC frequency is reduced.
Memory optimization
GC contains three major time-cost processes:
-
Object scan in GC young area
-
Object copy
- Copy the survival object from one survivor area, if it's full, to another
- An object is promoted from the young area to the old area
- Termination attempts
To reduce #1 and #2, one effectual way is to reduce the number of objects in heap memory and the number of survival objects.
We use Java MAT to dump the memory heap to investigate. (See Figure 5 for an example of one heap memory dump analyzation.)
According to the memory dump, we found that most of the objects are short-lived objects, so it is better to release them when they are still in the young generation area. We enlarged the young generation size by setting the JVM parameter to enlarge it: -XX:G1NewSizePercent=50.
Reduce the objects
According to the memory dump, most objects (62.82%) in a heap is SlicedByteBuf. And most of those objects are created in the VID handler isVIDRelated function. So we refine the function. Instead of creating lots of slicedByteBuf when parsing, we copy the content to the heap, and compare it in heap. In that way, no SlicedByteBuf is needed. The objects in the heap are reduced quite a bit.
Figure 5. Java MAT - Memory Optimize
For #3, our solution is to reduce the number of threads used.
Too many threads will cause too much contention in the CPU. If there are other processes or system activity that needs CPU resources, chances are that some of the GC workers are scheduled late because of that contention.
After investigating, we decided to use a thread-pool for a scheduled executor, instead of creating thread at each executor evoke.
Data processing optimization
All the data that is read from the socket is maintained in direct memory and is not released until processing is complete. If data processing speed does not catch up with the reading speed, the direct memory will be used up, and out of memory errors (OOM) will occur.
Traffic is the data traffic; Tprocess is the time to process data.
Figure 6. Data process pipeline
As shown in Figure 6, the data flows by several handlers: decompress, parser, VID handler, heartbeat handler and filer handler. In order to shorten the processing time, we try to optimize each handler. As a result, three areas are improved:
-
Refine time-costing function
-
Batch compress
-
Lazy-write and controllable memory usage
Refine the time-costing function
We calculated the time-cost for some frequently used functions and found out that some functions are time-cost. We refined those function to reduce the time. For example, the Search/indexof function of ByteBuf.
To search in a ByteBuf, it will read one byte from direct memory once and compare one byte by one. Reading from direct memory is time-consuming, so this function costs a lot.
We re-wrote the search function. Instead of reading one byte and comparing it one by one, we read a bunch of bytes once to compare. The number of read calls decreased, and execution time was reduced 30%. See Figure 7.
Figure 7. Function execution time
Batch compress
In the real production environment, the data size of one socket reading is variable. In most cases, the size of data is small. In the previous design, we parsed the data and compressed it. Compression is time expensive, and as the time to compress data increases, the time cost increases. So we combined those small data packets and compressed them until the size reached a predefined size.
Currently, we set the data size threshold to 160K, and the compression count decreases to 1/10. The total compression time decreases, too.
Lazy-write and controllable memory usage
Filer is a slow device, so it takes a long time to write data to filer. To alleviate the effect of filer's slowness, we decided to use "lazy-write." Instead of writing the data to slow filers immediately, the data is written to ByteBuf in direct memory as compressed data at first, and then flushed into filers in other threads.
To control the direct memory usage, we selected the Ring-Buffer approach. All the to-write-to-filer data is stored in a Ring Queue, which has the predefined memory size. When too much memory is used because of filer slowness, the Ring Queue is full, and any incoming data will be dropped.
System optimization
Besides the above optimization of CAL Ingress Java processes, we also do some JVM and system tuning and optimization. As a summary, 1. We use the Cloudflare zlib library to reduce compression time. 2. We enable RSS and CPU Numa pinning to improve the CPU efficiency. 3. We do an off-cpu check to reduce cpu context switching.
Conclusion
CAL Ingress now has a low latency and high throughput, and it can support huge amounts of data traffic in our production environment. When one CAL Ingress services 1000 connection, the overall throughput can reach 220MB/s without TCP disconnection.
Figure 8 gives the results of benchmark LnP (Load and Performance) testing. It shows the total throughput without TCP disconnection and the GC pause time when CAL Ingress serves multiple connections.
Figure 8. Throughput and average GC pause time
CAL Ingress has improved quite a bit in throughput, latency, rate of disconnect, and rate of data loss, as shown in Figure 9.
Figure 9. Improvements