AI Powered Search with Python and Milvus Vector Database

Updated on November 11, 2023
AI Powered Search with Python and Milvus Vector Database header image

Introduction

Vector databases are commonly used to store vector embeddings for tasks such as similarity search to build recommendation and question-answering systems. Milvus is a popular open-source database that stores embeddings in the form of vector data. It is well-suited and offers indexing features like Approximate Nearest Neighbours (ANN) that enable fast and accurate results.

This article explains how to implement AI-powered search with Python and a Milvus Database. You will use a HuggingFace dataset, create embeddings from the dataset, divide the dataset into two halves (testing and training), and store all created embeddings to a Milvus database by creating a collection. Then, you are to perform a search operation by giving a question prompt and generate the most similar answers.

Prerequisites

Before you begin:

Set Up the Server

To develop and deploy your application, install the necessary dependencies and parameters on the server. Then, connect to your Milvus Cluster to set up database operations as described in the steps below.

  1. Using pip, install the necessary dependencies

     $ pip install transformers datasets pymilvus torch

    Below is what each package does:

    • transformers: A HuggingFace library used to access and work with pre-trained LLM models for tasks such as text classification and generation
    • datasets: A HuggingFace library that allows you to access and work with ready-to-use datasets for Natural Language Processing (NLP) tasks
    • pymilvus: The Milvus Python client that allows you to perform vector similarity search, storage, and management of large collections
    • torch: A machine learning library used to train and build deep learning models
  2. Open the Python console

     $ python3
  3. Import the required modules

     >>> from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
         from datasets import load_dataset_builder, load_dataset, Dataset
         from transformers import AutoTokenizer, AutoModel
         from torch import clamp, sum

    Below is what each of the imported module classes does:

    • pymilvus:

      • connections: Provides functions to manage connections to the Milvus database
      • FieldSchema: Define the schema of fields in a Milvus database
      • CollectionSchema: Defines the schema of a collection
      • DataType: Enumerates data types used in a Milvus collection
      • Collection: Provides the functionality to interact with Milvus collections to create, insert, and search vectors
      • utility: Provides the data preprocessing and query optimization functions to work with Milvus
    • datasets:

      • load_dataset_builder: Loads and returns dataset objects to accesss the database information and its metadata
      • load_dataset: Loads a dataset from a dataset builder and returns the dataset object for data access
      • Dataset: Represents a dataset that provides access to data-related operations
    • transformers:

      • AutoTokenizer: Loads the pre-trained tokenization models for NLP tasks
      • AutoModel: A model loading class that automatically loads pre-trained models for NLP tasks
    • torch:

      • clamp: Provides functions for element-wise limiting of tensor values
      • sum: Computes the sum of tensor elements along specified dimensions
  4. Declare the necessary parameters

     >>> DATASET = 'squad'
         MODEL = 'bert-base-uncased' 
         TOKENIZATION_BATCH_SIZE = 1000  
         INFERENCE_BATCH_SIZE = 64  
         INSERT_RATIO = .001 
         COLLECTION_NAME = 'huggingface_db'  
         DIMENSION = 768  
         LIMIT = 10 
         MILVUS_HOST = "MILVUS_CLUSTER_IP_ADDRESS"
         MILVUS_PORT = "19530"

    Below is what each declared parameter does:

    • DATASET: Defines the Huggingface dataset to use when searching for answers
    • MODEL: Defines the transformer to use for creating embeddings
    • TOKENIZATION_BATCH_SIZE: Determines how many text elements are processed at once during tokenization. This helps to speed up tokenization by using parallelism
    • INFERENCE_BATCH_SIZE: Sets the batch size for predictions, affecting the efficiency of text classification tasks. You can reduce the batch size to 32 or 18 when using a smaller GPU size
    • INSERT_RATIO: Controls the part of text data to convert into embeddings managing the volume of data to index when performing vector search
    • COLLECTION_NAME: Sets the collection name you intend to create
    • DIMENSION: Sets the size of an individual embedding to store in the collection
    • LIMIT: Sets the number of results to search and display in the output
    • MILVUS_HOST: Sets the VKE cluster external IP address to access the Milvus database
    • MILVUS_PORT: Defines the Milvus Database port accessible using the cluster host IP address
  5. Connect to the Milvus database. Replace 192.0.2.100, 19530, root, and Milvus with your actual Milvus cluster values

     >>> connections.connect(host=192.0.2.100, port=19530 user=root, password=Milvus)

    The above command creates a connection to the Milvus database using your VKE cluster deployment details.

Build the Question Answering System

To build the question-answering system, create a collection. Then, insert data to the collection after tokenizing and creating the embeddings.In addition, perform a search operation to get the relevant answers for a specific question to test the system functionality as described in the following sections.

Create a Collection

In this section, check for the existence of the collection, create the collection, and set up the index for the collection. To perform text-based operations, load the collection as described in the steps below.

  1. Verify if a collection exists. Replace COLLECTION_NAME with your target collection name

     >>> if utility.has_collection(COLLECTION_NAME):
             utility.drop_collection(COLLECTION_NAME)

    The above command checks if the collection you are making is already made or not, if the collection is present then it is deleted to avoid any conflicts.

  2. Create a new collection. Replace COLLECTION_NAME with your desired name

     >>> fields = [
             FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, auto_id=True),
             FieldSchema(name='original_question', dtype=DataType.VARCHAR, max_length=1000),
             FieldSchema(name='answer', dtype=DataType.VARCHAR, max_length=1000),
             FieldSchema(name='original_question_embedding', dtype=DataType.FLOAT_VECTOR, dim=DIMENSION)
         ]
         schema = CollectionSchema(fields=fields)
         collection = Collection(name=COLLECTION_NAME, schema=schema)

    The above code defines a new collection schema with the following fields:

    • id: Sets the primary field in which all database entries identified
    • original_question: Stores the original question and matches any other question you ask
    • answer: Holds the answer to each original_quesition
    • original_question_embedding: Contains embeddings for each entry in the original_question to perform a similarity search with your input question
  3. Create the collection index

     >>> index_params = {
             'metric_type':'L2',
             'index_type':"IVF_FLAT",
             'params':{"nlist":1536}
         }
    
     >>> collection.create_index(field_name="original_question_embedding",       index_params=index_params)

    The above code creates a new index for the original_question_embedding field to perform a similarity search. When successful, your output should look like the one below:

     Status(code=0, message=)
  4. Load the collection

     >>> collection.load()

    The above code loads the collection which is important when working with vector databases. Loading the collection ensures that the collection is ready to perform search operations.

Insert Data to the Collection

  1. Load the dataset

     >>> data_dataset = load_dataset(DATASET, split='all')
    
         data_dataset = data_dataset.train_test_split(test_size=INSERT_RATIO, seed=42)['test']
    
         data_dataset = data_dataset.map(lambda val: {'answer': val['answers']['text'][0]}, remove_columns=['answers'])

    The above code loads the dataset, splits the dataset into training and test sets, then processes the test set to remove any other columns except for the answer text.

  2. Initialize the tokenizer

     >>> tokenizer = AutoTokenizer.from_pretrained(MODEL)
  3. Tokenize the question

     >>> def tokenize_question(batch):
             results = tokenizer(batch['question'], add_special_tokens = True, truncation = True, padding = "max_length", return_attention_mask = True, return_tensors = "pt")
             batch['input_ids'] = results['input_ids']
             batch['token_type_ids'] = results['token_type_ids']
             batch['attention_mask'] = results['attention_mask']
             return batch

    The above code defines a function tokenize_question that takes a batch of data as input and tokenizes the question field into an acceptable Bert model format. It applies truncation and padding, then returns the encoded data in a batch along with input_ids, token_type_ids, and attention_mask. This is a common pre-processing step in NLP tasks before you send data to the model.

  4. Tokenize each entry

     >>> data_dataset = data_dataset.map(tokenize_question, batch_size=TOKENIZATION_BATCH_SIZE, batched=True)
    
         data_dataset.set_format('torch', columns=['input_ids', 'token_type_ids', 'attention_mask'], output_all_columns=True)

    The above code uses the map function on the data_dataset and applies the tokenize_question function on every question in the dataset. When successful, the output format is set to a torch compatible format for PyTorch based machine learning models.

  5. Create the embeddings

     >>> model = AutoModel.from_pretrained(MODEL)
    
     >>> def embed(batch):
             sentence_embs = model(
                         input_ids=batch['input_ids'],
                         token_type_ids=batch['token_type_ids'],
                         attention_mask=batch['attention_mask']
                         )[0]
             input_mask_expanded = batch['attention_mask'].unsqueeze(-1).expand(sentence_embs.size()).float()
             batch['question_embedding'] = sum(sentence_embs * input_mask_expanded, 1) / clamp(input_mask_expanded.sum(1), min=1e-9)
             return batch
    
     >>> data_dataset = data_dataset.map(embed, remove_columns=['input_ids', 'token_type_ids', 'attention_mask'], batched = True, batch_size=INFERENCE_BATCH_SIZE)

    The above code loads the pre-trained model and passes the tokenized questions through the model to get the required embeddings and the generated embeddings are added to the dataset as question_embeddings.

  6. Insert questions into a collection

     >>> def insert_function(batch):
             insertable = [
                 batch['question'],
                 [x[:995] + '...' if len(x) > 999 else x for x in batch['answer']],
                 batch['question_embedding'].tolist()
             ]    
             collection.insert(insertable)
    
     >>> data_dataset.map(insert_function, batched=True, batch_size=64)
         collection.flush()

    The above code uses data from the dataset and inserts it to the collection. The answer is then truncated to consider the VARCHAR limit as displayed in the following output:

     Dataset({
         features: ['id', 'title', 'context', 'question', 'answer', 'input_ids', 'token_type_ids', 'attention_mask', 'question_embedding'],
         num_rows: 99
     })

Generate responses

In this section, create a custom question dataset, tokenize, and embed the dataset. Then, perform a search operation in the Milvus collection to find the top relevant answers for your question.

  1. Create a new question dataset. Replace When was maths invented with your desired question

     >>> questions = {'question':['When was maths invented?']}
         question_dataset = Dataset.from_dict(questions)

    The above code creates a new question_dataset dataset. You can increase the number of questions you wnt to generate answers using the questions variable.

  2. Tokenize and embed the question

     >>> question_dataset = question_dataset.map(tokenize_question, batched = True, batch_size=TOKENIZATION_BATCH_SIZE)
    
     >>> question_dataset.set_format('torch', columns=['input_ids', 'token_type_ids', 'attention_mask'], output_all_columns=True)
    
     >>> question_dataset = question_dataset.map(embed, remove_columns=['input_ids', 'token_type_ids', 'attention_mask'], batched = True, batch_size=INFERENCE_BATCH_SIZE)

    The above code tokenizes the question_dataset using the tokenize_question function. Then, sets the output format to torch and embeds the question_dataset by applying the embed function to generate the embeddings.

  3. Define the search function

     >>> def search(batch):
             res = collection.search(batch['question_embedding'].tolist(), anns_field='original_question_embedding', param = {}, output_fields=['answer', 'original_question'], limit = LIMIT)
             overall_id = []
             overall_distance = []
             overall_answer = []
             overall_original_question = []
             for hits in res:
                 ids = []
                 distance = []
                 answer = []
                 original_question = []
                 for hit in hits:
                     ids.append(hit.id)
                     distance.append(hit.distance)
                     answer.append(hit.entity.get('answer'))
                     original_question.append(hit.entity.get('original_question'))
                 overall_id.append(ids)
                 overall_distance.append(distance)
                 overall_answer.append(answer)
                 overall_original_question.append(original_question)
             return {
                 'id': overall_id,
                 'distance': overall_distance,
                 'answer': overall_answer,
                 'original_question': overall_original_question
             }

    The above search function performs a search operation using the embeddings. It searches for similar questions in the embeddings and retrieves information such as the id, distance, answer and original_question. Retrieved information is organized into lists and returned as a dictionary.

  4. Perform a search operation

     >>> question_dataset = question_dataset.map(search, batched=True, batch_size = 1)
    
     >>> for x in question_dataset:
             print()
             print('Question:')
             print(x['question'])
             print('Answer, Distance, Original Question')
             for x in zip(x['answer'], x['distance'], x['original_question']):
                 print(x)

    The above code applies the search function you defined earlier in the question_dataset. When successful, it prints the information for each question as displayed in the output below:

     Question:
     When was maths invented?
     Answer, Distance, Original Question
     ('until 1870', tensor(33.3018), 'When did the Papal States exist?')
     ('October 1992', tensor(34.8276), 'When were free elections held?')
     ('1787', tensor(36.0596), 'When was the Tower constructed?')
     ('Poland, Bulgaria, the Czech Republic, Slovakia, Hungary, Albania, former East Germany and Cuba', tensor(38.3254), 'Where was Russian schooling mandatory in the 20th century?')
     ('6,000 years', tensor(41.9444), 'How old did biblical scholars think the Earth was?')
     ('1992', tensor(42.2079), 'In what year was the Premier League created?')
     ('1981', tensor(44.7781), "When was ZE's Mutant Disco released?")
     ('Medieval Latin', tensor(46.9699), "What was the Latin of Charlemagne's era later known as?")
     ('taxation', tensor(49.2372), 'How did Hobson argue to rid the world of imperialism?')
     ('light weight, relative unbreakability and low surface noise', tensor(49.5037), "What were advantages of vinyl in the 1930's?")

    As displayed in the above output, the closest 10 answers are generated in descending order for the question you asked along with the original questions those answers belong to. The output also displays tensor values with each answer, a less tensor value means that the generated answer is more accurate to your question.

Conclusion

You have built a question answering system using a HuggingFace dataset and Milvus. You created embeddings from the dataset, stored them in a Milvus collection, and performed a similarity search to find the most suitable answers for the provided prompt. You can modify the questions to return more accurate results depending on the tensor values associated with each answer.

Next Steps

To implement more solutions on your Vultr Cloud GPU server, visit the following resources: