Using milvs for real-time queries

In this article, we will continue to explain how the various components of Milvus interact with each other to accomplish real-time data queries.

Some useful resources are listed below before you get started. We recommend reading them first to understand the topic better in this post.

load data into query node

Before a query can be executed, the data must first be loaded into the query nodes.

There are two types of data loaded into the query node: streaming data from the log broker, and historical data from object storage (also known as persistent storage below).

Flowchart of loading data into query node

The data cord is in charge of handling the streaming data that is continuously inserted into the milves. When a Milvus user calls collection.load()To load the collection, the query coord will query the data coord to find which segments remain in storage and their respective checkpoints. A checkpoint is a mark to indicate that the segments formed before the checkpoint are consumed while the segments after the checkpoint are not.

Then, the query outputs an allocation strategy based on the information obtained from the coordinate data coordinate: either by segment or by channel. The segment allocator is responsible for allocating chunks in persistent storage (batch data) for different query nodes. For example, in the image above, the segment allocator allocates segments 1 and 3 (S1, S3) to query node 1, and segments 2 and 4 (S2, S4) to query node 2. . The channel allocator assigns different query nodes to the view. Multiple Data Manipulation Channels (DMChannels) in Log Broker. For example, in the image above, the channel allocator assigns query node 1 to watch channel 1 (Ch1) and query node 2 to watch channel 2 (Ch2).

With allocation strategy, each query node loads chunk data and watches channels accordingly. In query node 1 in the image, historical data (batch data), allocated from persistent storage, is loaded through S1 and S3. Meanwhile, query node 1 loads incremental data (streaming data) into the log broker by subscribing to channel 1.

data management in query node

A query node needs to handle both historical and incremental data. Historical data is stored in sealed volumes whereas incremental data is stored in growing volumes.

historical data management

There are mainly two considerations for historical data management: load balancing and query node failover.

load balancing

load balancing.

For example, as shown in the example, query node 4 is allocated more sealed segments than the rest of the query nodes. Very likely, this query will make node 4 a bottleneck which slows down the entire query process. To solve this problem, the system needs to allocate multiple segments in query node 4 to other query nodes. This is called load balancing.

incremental data management

The query node watches the DMChannels to get the incremental data. In this process a flowgraph is introduced. It first filters all data entry messages. This is to ensure that only data is loaded in the specified partition. Each collection in milves has a corresponding channel, which is shared by all partitions in that collection. Therefore, a flowgraph is needed to filter the inserted data if a Milvus user only needs to load data into a certain partition. Otherwise, the data for all partitions in the collection will be loaded into the query node.

After filtering, the incremental data is inserted into increasing segments and further sent to the server time nodes.

During data entry, a timestamp is assigned to each entry message. Server time node provides an update tsafe value each time it receives a timetick from the inserted node. tsafe This means protection time, and all data inserted before this time can be queried. Take an example, if tsafe = 9, data inserted with a timestamp smaller than 9 can all be queried.

Real-time query in Milvs

Real-time querying in Milvus is enabled by query messages. Query messages are inserted into the log broker by the proxy. Then the query nodes receive the query message by viewing the query channel in the log broker.

query message

A query message includes the following important information about a query:

  • msgID: message id, the ID of the query message specified by the system.
  • collectionID: The ID of the collection to query (if specified by the user).
  • execPlan:Execution plan is mainly used for attribute filtering in a query.
  • service_ts:service timestamp will be updated with tsafe above. The service timestamp represents at what point the service is at. all previously entered data service_ts are available for enquiry.
  • travel_ts: travel timestamp Specifies a range of times in the past. and the query will be conducted on the data present in the time period specified by travel_ts,
  • guarantee_ts:guarantee timestamp Specifies the time period after which the query needs to be operated. The query will be conducted only when service_ts , guarantee_ts,

real-time query

When a query message is received, Milvs first determines whether the current service time, service_tsis greater than the guaranteed timestamp, guarantee_ts, in the query message. If yes, the query will be executed. The query will be conducted in parallel on both historical data and incremental data. Because there may be overlap of data between streaming data and batch data, an action called “local reduce” is needed to filter out redundant query results.

However, if the current service time is less than the guaranteed timestamp in the newly inserted query message, the query message will become an unresolved message and wait to be processed until the service time is greater than the guaranteed timestamp.

The query results are eventually pushed to the results channel. The proxy receives query results from that channel. Similarly, the proxy will also perform a “global reduction” because it fetches results from multiple query nodes and query results may be repeated.

To ensure that the proxy has received all query results before returning the SDK, a result message records the information, including Searched Sealed Segments, Discovered DmChannels, and Global Sealed Segments (all segments on all query nodes) Will also keep The system can conclude that the proxy has received all query results only if both of the following conditions are met:

  • The union of all searched sealed segments recorded in all result messages is greater than the global sealed segments,
  • All DMChannels in the collection are queried.

Ultimately, the proxy returns the final result after a “global reduction” to the Milvus SDK.

Leave a Comment