Select Page

Using Milvus for Real-Time Query

xi ge
Published: November 6, 2022

In this article, we will continue to explain how different components in Milvus interact with each other to complete real-time data queries.

Some useful resources before getting started are listed below. We recommend reading them first to better understand the topic in this post.

Load Data to Query Node

Before a query is executed, the data has to be loaded to the query nodes first.

There are two types of data that are loaded to the query node: streaming data from the log broker, and historical data from object storage (also called persistent storage below).

Flowchart of loading data to query node

A flowchart of loading data to query node

The data coord is in charge of handling streaming data that are continuously inserted into Milvus. When a Milvus user calls collection.load()to load a collection, the query coord will inquire the data coord to learn which segments have persisted in storage and their corresponding checkpoints. A checkpoint is a mark to signify that persisted segments before the checkpoints are consumed while those after the checkpoint are not.

Then, the query coord outputs an allocation strategy based on the information from the data coord: either by segment or by channel. The segment allocator is responsible for allocating segments in persistent storage (batch data) to different query nodes. For instance, in the image above, the segment allocator allocates segments 1 and 3 (S1, S3) to query node 1, and segments 2 and 4 (S2, S4) to query node 2. The channel allocator assigns different query nodes to watch multiple data manipulation channels (DMChannels) in the log broker. For instance, in the image above, the channel allocator assigns query node 1 to watch channel 1 (Ch1), and query node 2 to watch channel 2 (Ch2).

With the allocation strategy, each query node loads segment data and watches the channels accordingly. In query node 1 in the image, historical data (batch data), are loaded via the allocated S1 and S3 from persistent storage. Meanwhile, query node 1 loads incremental data (streaming data) by subscribing to channel 1 in the log broker.

Data Management in Query Node

A query node needs to manage both historical and incremental data. Historical data are stored in sealed segments while incremental data are stored in growing segments.

Historical Data Management

There are mainly two considerations for historical data management: load balance and query node failover.

Load balance

Load balance.

For instance, as shown in the illustration, query node 4 has been allocated more sealed segments than the rest of the query nodes. Very likely, this will make query node 4 the bottleneck that slows down the whole query process. To solve this issue, the system needs to allocate several segments in query node 4 to other query nodes. This is called load balance.

Incremental Data Management

The query node watches DMChannels to receive incremental data. Flowgraph is introduced in this process. It first filters all the data insertion messages. This is to ensure that only data in a specified partition is loaded. Each collection in Milvus has a corresponding channel, which is shared by all partitions in that collection. Therefore, a flowgraph is needed for filtering inserted data if a Milvus user only needs to load data in a certain partition. Otherwise, data in all partitions in the collection will be loaded to the query node.

After being filtered, the incremental data are inserted into growing segments and further passed on to server time nodes.

During data insertion, each insertion message is assigned a timestamp. The server time node provides an updated tsafe value every time it receives a timetick from the insert node. tsafe means safety time, and all data inserted before this point of time can be queried. Take an example, if tsafe = 9, inserted data with timestamps smaller than 9 can all be queried.

Real-Time Query in Milvus

Real-time querying in Milvus is enabled by query messages. Query messages are inserted into the log broker by proxy. Then query nodes obtain query messages by watching the query channel in the log broker.

Query Message

A query message includes the following crucial information about a query:

  • msgID: Message ID, the ID of the query message assigned by the system.
  • collectionID: The ID of the collection to query (if specified by the user).
  • execPlan: The execution plan is mainly used for attribute filtering in a query.
  • service_ts: Service timestamp will be updated together with tsafe mentioned above. The service timestamp signifies at which point the service is in. All data inserted before service_ts are available for query.
  • travel_ts: Travel timestamp specifies a range of time in the past. And the query will be conducted on data existing in the time period specified by travel_ts.
  • guarantee_ts: Guarantee timestamp specifies a period of time after which the query needs to be conducted. The query will only be conducted when service_ts > guarantee_ts.

Real-Time Query

When a query message is received, Milvus first judges if the current service time, service_ts, is larger than the guarantee timestamp, guarantee_ts, in the query message. If yes, the query will be executed. The query will be conducted in parallel on both historical data and incremental data. Since there can be an overlap of data between streaming data and batch data, an action called “local reduce” is needed to filter out the redundant query results.

However, if the current service time is smaller than the guarantee timestamp in a newly inserted query message, the query message will become an unsolved message and wait to be processed till the service time becomes bigger than the guarantee timestamp.

Query results are ultimately pushed to the result channel. The proxy obtains the query results from that channel. Likewise, the proxy will conduct a “global reduce” as well because it receives results from multiple query nodes and query results might be repetitive.

To ensure that the proxy has received all query results before returning them to the SDK, a result message will also keep a record of information including searched sealed segments, searched DMChannels, and global sealed segments (all segments on all query nodes). The system can conclude that the proxy has received all query results only if both of the following conditions are met:

  • The union of all searched sealed segments recorded in all result messages is larger than global sealed segments,
  • All DMChannels in the collection are queried.

Ultimately, the proxy returns the final results after “global reduce” to the Milvus SDK.