Milvus is a highly popular open-source vector database, specifically engineered for efficient similarity search and analytics on massive-scale vector datasets. It provides a distributed, cloud-native architecture designed for performance and resilience, making it a strong contender for production semantic search systems. In this section, we'll explore how to interact with Milvus using its Python client library, pymilvus
, covering the fundamental operations needed to build search applications.
Before diving into the code, let's familiarize ourselves with some Milvus terminology:
DataType.INT64
or DataType.VARCHAR
: Often used for the primary key.DataType.FLOAT_VECTOR
: Stores the vector embeddings. You must specify the vector dim
(dimensionality).DataType.VARCHAR
, DataType.INT32
, DataType.BOOL
, DataType.FLOAT
): Used for storing metadata associated with each vector.M
and efConstruction
for HNSW) during index creation.L2
for Euclidean distance, IP
for Inner Product). This is typically specified when creating the index and performing searches.First, ensure you have a Milvus instance running (e.g., using Docker for local development) and the pymilvus
library installed (pip install pymilvus
). Connecting is straightforward:
from pymilvus import connections, utility, FieldSchema, CollectionSchema, DataType, Collection
# Connect to Milvus
# Assumes Milvus is running on localhost:19530
connections.connect(alias="default", host="localhost", port="19530")
print("Connected to Milvus.")
# Check if the connection is successful
print(f"Available collections: {utility.list_collections()}")
The alias
parameter lets you manage multiple connections if needed; "default" is commonly used.
Let's define a schema for storing information about technical articles, including a title (metadata), publication year (metadata), and the content embedding (vector).
# Define schema fields
article_id = FieldSchema(
name="article_id",
dtype=DataType.INT64,
is_primary=True,
auto_id=True # Let Milvus generate unique IDs
)
title = FieldSchema(
name="title",
dtype=DataType.VARCHAR,
max_length=256 # Max length for VARCHAR fields
)
publish_year = FieldSchema(
name="publish_year",
dtype=DataType.INT32
)
embedding = FieldSchema(
name="embedding",
dtype=DataType.FLOAT_VECTOR,
dim=768 # Example dimension, match your embedding model
)
# Define the collection schema
schema = CollectionSchema(
fields=[article_id, title, publish_year, embedding],
description="Technical article collection",
enable_dynamic_field=False # Set to True to allow adding fields not in schema
)
# Create the collection
collection_name = "tech_articles"
# Drop collection if it already exists (for demonstration)
if utility.has_collection(collection_name):
utility.drop_collection(collection_name)
print(f"Dropped existing collection: {collection_name}")
collection = Collection(name=collection_name, schema=schema)
print(f"Collection '{collection.name}' created successfully.")
Here, auto_id=True
simplifies ID management by letting Milvus handle primary key generation. We've explicitly set the vector dimension (dim=768
) which must match the output dimension of the embedding model you use.
For efficient searching, especially with large datasets, creating an index on the vector field is essential. This should typically be done after creating the collection but before inserting a large volume of data, although Milvus allows indexing after insertion too.
# Define index parameters for the 'embedding' field
# Using HNSW index type, L2 distance metric
index_params = {
"metric_type": "L2", # Common choice for many embedding models
"index_type": "HNSW", # Hierarchical Navigable Small Worlds
"params": {
"M": 16, # Number of neighbors per node layer
"efConstruction": 200 # Size of dynamic list during index building
}
}
# Create the index
collection.create_index(
field_name="embedding",
index_params=index_params
)
print(f"Index created successfully on field 'embedding'.")
# Wait for index building to complete (important for consistency)
utility.wait_for_index_building_complete(collection_name)
print("Index building complete.")
Choosing the right index_type
and tuning params
like M
and efConstruction
involves trade-offs between indexing time, search speed, and recall, as discussed in Chapter 3. L2
(Euclidean distance) is a common metric, but IP
(Inner Product) is often preferred for normalized embeddings where it's equivalent to Cosine Similarity.
Before you can search a collection in Milvus, you generally need to load it (or specific partitions) into memory. This pre-warms the data for faster query responses.
# Load the collection into memory for searching
collection.load()
print(f"Collection '{collection.name}' loaded into memory.")
Now, let's insert some sample data. The data should be structured as a list of lists or list of dictionaries, where the order or keys match the schema definition (excluding the auto_id
primary key).
# Sample data (ensure vector dimensions match schema, e.g., 768)
# Replace with actual embeddings from your model
data_to_insert = [
{"title": "Intro to Vector Databases", "publish_year": 2023, "embedding": [0.1] * 768},
{"title": "Understanding HNSW Indexes", "publish_year": 2022, "embedding": [0.2] * 768},
{"title": "Semantic Search Pipelines", "publish_year": 2023, "embedding": [0.3] * 768},
{"title": "Deep Learning Fundamentals", "publish_year": 2021, "embedding": [0.4] * 768}
]
# Insert data
insert_result = collection.insert(data_to_insert)
print(f"Inserted data. Primary keys: {insert_result.primary_keys}")
# Milvus operations are often asynchronous. Flush ensures data is persisted.
collection.flush()
print(f"Entities count after flush: {collection.num_entities}")
collection.flush()
is important to ensure the inserted data segments are sealed and become searchable. Milvus automatically handles generating the article_id
because we set auto_id=True
.
The core operation is searching for vectors similar to a given query vector. You provide the query vector(s), the number of results (limit
), and search parameters.
# Generate a query vector (e.g., from embedding a search query)
# Replace with an actual query embedding
query_vector = [[0.15] * 768] # Example query vector
# Define search parameters
search_params = {
"metric_type": "L2",
"params": {
"ef": 128 # Search scope, higher value means better recall but slower speed
}
}
# Perform the search
results = collection.search(
data=query_vector, # List of query vectors
anns_field="embedding", # Field to search on
param=search_params, # Index and search parameters
limit=3, # Number of results to return
output_fields=["title", "publish_year"] # Specify metadata fields to retrieve
)
# Process results
print("\nSearch Results (Top 3):")
for hit in results[0]: # results[0] corresponds to the first query vector
print(f" ID: {hit.id}, Distance: {hit.distance:.4f}, Title: {hit.entity.get('title')}, Year: {hit.entity.get('publish_year')}")
The ef
parameter in search_params
for HNSW controls the size of the dynamic list searched at query time. Higher values generally increase accuracy (recall) at the cost of latency. output_fields
allows you to retrieve specific metadata alongside the matching vector IDs and distances.
Milvus allows powerful combination of vector similarity search with filtering based on scalar metadata fields using boolean expressions.
# Perform a search with a filter expression
# Find articles similar to query_vector BUT published in 2023
filtered_results = collection.search(
data=query_vector,
anns_field="embedding",
param=search_params,
limit=3,
expr="publish_year == 2023", # Boolean expression for filtering
output_fields=["title", "publish_year"]
)
print("\nFiltered Search Results (Published in 2023, Top 3):")
if not filtered_results[0]:
print(" No matching results found for the filter.")
else:
for hit in filtered_results[0]:
print(f" ID: {hit.id}, Distance: {hit.distance:.4f}, Title: {hit.entity.get('title')}, Year: {hit.entity.get('publish_year')}")
The expr
parameter uses a SQL-like syntax for defining filter conditions on your metadata fields. This pre-filtering (or post-filtering, depending on the strategy Milvus employs) is highly efficient.
collection.get(ids=[...])
.collection.query(expr="publish_year > 2022", output_fields=["title"])
.collection.delete(expr="article_id in [...]")
.collection.create_partition()
) and specify a partition name during insertion and search (partition_name=...
) to manage data subsets.When a collection is no longer needed for active searching, release it from memory. To permanently delete it, drop it.
# Release collection from memory
collection.release()
print(f"\nCollection '{collection.name}' released from memory.")
# Drop the collection permanently
# utility.drop_collection(collection_name)
# print(f"Collection '{collection.name}' dropped.")
# Disconnect from Milvus
connections.disconnect(alias="default")
print("Disconnected from Milvus.")
Working with the Milvus client involves understanding its specific concepts like collections, schemas, and the importance of indexing and loading. Its Python client provides a comprehensive interface for defining data structures, ingesting vectors and metadata, and performing powerful filtered similarity searches, making it a robust choice for building practical semantic search applications. Remember that optimal performance often requires careful tuning of index and search parameters based on your specific data and application requirements.
© 2025 ApX Machine Learning