How to Chat with Your BigQuery? Introducing semantic search to databases.

blog preview

Large Language Models have shown to be very powerful in knowing and creating creating SQL queries against a variety of databases - including Google BigQuery.

This advancement in AI technology makes it possible to interact with your BigQuery database using natural language.

Data in BigQuery datasets tend to be quite complex and may contain one of the worlds most relevant business-related information.

Imagine a world, where you ask "How many users signed up last month?" and you get an answer in seconds, without needing to first search the database for the right data model and then writing an SQL query. Just using everyday, human language.

In this post we will show you how to build a conversational interface for Google BigQuery, using the powerful GPT-4o model. We'll outline the steps needed to set up your development environment, how to connect to BigQuery, how to use the OpenAI API to generate generate the SQL queries and how to execute them against your BigQuery database.

As a bonus, we are going to show how to use RAG - Retrieval Augmented Generation to improve the quality of the generated SQL queries.

Concept for a vector-search based chat interface for BigQuery?

First, let's have a look at how such a "chat with your BigQuery" solution might look like.

In general, we need the following elements:

  1. A way for the user to enter his query in natural language.
  2. A layer that translates the natural language query into SQL.
  3. A runtime to execute the SQL query against the BigQuery database.

While 1 and 3 are quite straightforward, the main logic and complexity lies in task number 2 - translating natural language into SQL.

To allow high quality SQL generation, we need the following sub-steps:

  1. Get the structure of the database - the database schema
  2. Get a description of the individual columns, so that the AI can understand the context of the data.
  3. Use retrieval augmented generation, to find the most relevant BigQuery tables and columns for the user query.
  4. Use a large language model to generate the SQL query.
  5. Automatically correct errors in the generated SQL query - as LLMs art not yet perfect and tend to produce erroneous queries from time to time.

Meaning, to have a high quality and robust conversational chat interface for BigQuery, we need the following high-level architecture:

High-level architecture of a conversational chat interface for BigQueryHigh-level architecture of a conversational chat interface for BigQuery

If we look at the architecture in more detail, we can see that we not only need to "connect" our BigQuery database to an AI model, but we need an AI Agent

In short, AI agents are AI models that take a user input and than independently use their knowledge, external tools (like database access) and memory to solve the user query.

In our case, we need to build a small agent, specified as follows:

Task: Take the user query and translate it to a working SQL query, answering the user query. Tools: BigQuery database access, Python runtime Knowledge: SQL generation (as part of the AI model)

AI Agent OverviewAI Agent Overview

Note: As the set of tasks we need the AI to independently solve is limited, our AI agent will be quite small. The concept of AI agents is phenomenally powerful for a variety of tasks, but in our case we only need a very limited implementation. Basically, all our agent needs to do is to generate SQL queries, execute them, decide whether to execute them again on error and return the result to the user.

Step by step: How to build a Text2SQL agent for BigQuery

Now that we established the basic architecture, lets go through the steps required to implement it. We are implementing the whole AI agnet loop from scratch, without using any of the existing LLM libraries.

Why not using LangChain or LlamaIndex?

Both LangChain and LlamaIndex provide SQL AI Agents as part of their feature set.

While they provide both a great way to get started with text2sql agents, their abstractions make it difficult to understand the underlying processes. Furthermore, and more important, their implementation is rather generic for many SQL databases. We found that after many hours and days of creating SQL AI agents, that each and any use case needs tweaking in how the SQL is generated and how errors are handled. This measure of flexibility is just not possible with the high-level interfaces provided by LangChain and LlamaIndex.

Preparation

First, make sure, that you have an active Google Cloud project as well as activated billing for said project.

Next, enable the following Google Cloud APIs:

Next, set up the Google Cloud BigQuery authentication as follows:

  1. Create a service account for your project. Add the following BigQuery roles:

    • BigQuery User
    • BigQuery Data Viewer
    • BigQuery Job User
  2. Download the service account key of the newly created service account.

Finally, let's import some demo data into BigQuery. Alternatively, you can use your own data.

  1. Download the demo dataset. It contains information from a well-known Kaggle challenge about churn of a Telecom company.

  2. Import the file into BigQuery in a new dataset aiagent_test and table churn_table.

Implementation of the AI Agent

  1. Create a new venv or conda environment and activate it.

  2. Install the required Python packages:

    1pip pandas install openai sqlalchemy sqlalchemy-bigquery google-cloud-bigquery
  3. Import the modules and define the BigQuery table and dataset and set up the openai api key.

    1from openai.types.chat import ChatCompletionToolParam
    2from sqlalchemy.sql import text
    3from sqlalchemy import create_engine, MetaData
    4import openai
    5from openai import AsyncOpenAI
    6import json
    7import pandas as pd
    8
    9project = "pondhouse-data"
    10dataset = "aiagent_test"
    11table = "churn_table"
    12openai.api_key = "your-openai-api"
  4. Create an sqlalchemy engine for the BigQuery database.

    1engine = create_engine(f'bigquery://{project}', credentials_path='/path/to/keyfile.json')
  5. Let's test our database connection:

    1statement = text(f"""SELECT count(*) from {dataset}.{table};""")

    The results should be:

    1(7043,)
  6. As we are able to connect to our database, let's retrieve the database schema.

    1metadata = MetaData()
    2metadata.reflect(bind=engine)
    3
    4# Reformat the retrieved schema
    5schema_info = []
    6
    7for table in metadata.tables.values():
    8 table_info = {
    9 "table_name": table.name,
    10 "columns": [{column.name: column.type} for column in table.columns],
    11 }
    12 schema_info.append(table_info)

    This provides a list of all the tables and columns in the provided BigQuery dataset.

    1 [{'table_name': 'aiagent_test.churn_table',
    2 'columns': [{'customerID': String()},
    3 {'gender': String()},
    4 {'SeniorCitizen': Integer()},
    5 {'Partner': Boolean()},
    6 {'Dependents': Boolean()},
    7 ...], ...]

Preparation done, we can now start with the actual implementation of the AI agent. In short, we will use the OpenAI API to generate the SQL queries and execute them against the BigQuery database.

Implementing the AI Agent loop

At it's core, AI agents are nothing more than a while loop, that takes a task/query and invokes an AI model to solve it. The while loop (so, the agent) runs for as long as no stop condition is given.

In our case, we only have a simple agent. In pseudo-code it looks:

while:

  • Use AI to generate SQL query based on user input and using the database schema
  • Execute the generated SQL query on the BigQuery database
  • Validate, if there were any errors, if yes, use AI to correct the query
  • Optional: If no errors, use AI to validate, if the results answer the user query. If not, repeat
  • Stop if the user query is answered

Let's implement this in python. First, we create a function to generate our query. We'll use the remarkable OpenAI function calling feature for that, as function calling provides robust JSON output which is easy to parse.

NOTE: Many experiments show, that AI models tend to be less intelligent when using function calling. This was not a problem for our SQL generation attempts so far - but keep that in mind. If you find your AI model is constantly producing bad queries, try to use the "normal" chat endpoint and parse the output yourself.

1query_database_function: list[ChatCompletionToolParam] = [
2 {
3 "type": "function",
4 "function": {
5 "name": "query_database",
6 "description": "Create a BigQuery SQL query which answers the question as accurately as possible. Make sure to return a complete query, without the user needing to add additional information. \n Also return the type of the query, like 'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'ALTER', 'DROP', etc.",
7 "parameters": {
8 "type": "object",
9 "properties": {
10 "query": {
11 "type": "string",
12 "description": "BigQuery dialect SQL query",
13 },
14 "type": {
15 "type": "string",
16 "description": "Type of query, like 'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'ALTER', 'DROP', etc.",
17 },
18 },
19 "required": ["query", "type"],
20 },
21 },
22 }
23]
24
25async def create_query(query: str, schema):
26 system_prompt = """Given the following database schema, create a BigQuery SQL query which answers the users question as accurately as possible. Make sure to return a complete query, without the user needing to add additional information.
27Also return the type of the query, like 'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'ALTER', 'DROP', etc."""
28
29 client = AsyncOpenAI(api_key=openai.api_key)
30 response = await client.chat.completions.create(
31 model="gpt-4o",
32 temperature=0.5,
33 messages=[
34 {
35 "role": "system",
36 "content": system_prompt + "\n" + "Schema: " + str(schema),
37 },
38 {"role": "user", "content": query},
39 ],
40 tools=query_database_function,
41 )
42
43 query = json.loads(response.choices[0].message.tool_calls[0].function.arguments)[
44 "query"
45 ]
46 query_type = json.loads(
47 response.choices[0].message.tool_calls[0].function.arguments
48 )["type"]
49
50 if query_type != "SELECT":
51 raise Exception("Only read statements are allowed!")
52
53 return query

This function will be used to run the very first AI query generation attempt. We'll execute the resulting query against the BigQuery database and check for errors. If there are any, we'll use the AI to correct the query.

Let's create this error handling function first. It's basically just another LLM query generation, but we'll also send the error message to the model, so that it can correct the query.

1async def error_handling(question: str, query: str, schema, error):
2 system_prompt = "Given the following database schema, you suggested a BigQuery SQL query to answer the following question which resulted in the following error. Please correct the error."
3 client = AsyncOpenAI(api_key=openai.api_key)
4 response = await client.chat.completions.create(
5 model="gpt-4o",
6 temperature=0.5,
7 messages=[
8 {
9 "role": "system",
10 "content": system_prompt
11 + "\n"
12 + "Schema: "
13 + str(schema)
14 + "\n Original question which failed: "
15 + question
16 + "\n Your answer, which failed: "
17 + query
18 + "\n Error: "
19 + str(error),
20 },
21 {
22 "role": "user",
23 "content": "Correct the error and answer the question.",
24 },
25 ],
26 tools=query_database_function,
27 )
28 query = json.loads(response.choices[0].message.tool_calls[0].function.arguments)[
29 "query"
30 ]
31
32 return query

As you can see, we are just updating the system prompt to include the error and ask the model to correct this error. Otherwise, it's the same request and function as before.

And finally, bring it all together, by creating the agent-loop:

1async def execute_agent(question: str):
2 query = await query_database(question, schema_info)
3 print(query)
4
5 with engine.connect() as connection:
6 runs = 0
7 while runs < 10:
8 try:
9 # Execute the query
10 result = connection.execute(text(query))
11 break
12 except Exception as e:
13 print("Oh no, an error. Never mind... We'll figure this out.", e)
14 query = asyncio.run(error_handling(question, query, schema_info, e))
15 print("New query: ", query)
16 if runs >= 10:
17 print("Stopping agent after 10 failed attempts.")
18 columns = [col for col in result.keys()]
19 data = result.fetchall()
20 return pd.DataFrame(data, columns=columns)

All we have to do now is to run our agent and provide a question:

1await execute_agent("How many churns are in the churn_table?")
2# Output: 1869

And - amazingly - the agent is able to Automatically generate the SQL query, execute the query, self-heal in case of errors and provide the correct result!

Let's try something more complicated: We want to know the percentage of males and females in the dataset.

1await execute_agent(
2 "What's the percentage of male and female customers of users who churned?"
3)

Output:

1SELECT gender, COUNT(*) * 100.0 / (SELECT COUNT(*) FROM aiagent_test.churn_table WHERE Churn = TRUE) AS percentage FROM aiagent_test.churn_table WHERE Churn = TRUE GROUP BY gender
2
3 gender percentage
40 Male 49.75923
51 Female 50.24077

Quite impressive, isn't it?

Improving our AI SQL Agent for BigQuery data analysis

While our little agent is already quite nice to play with, it creates quite a good amount of garbage queries. While it's surprisingly good in resolving these errors on it's own, there are some methods to reduce these erroneous attempts.

  1. Adding a description to the database columns: In this example, we only provide the table name as well as the column names as context to the AI model. The model is than asked to try to "guess" which columns might be relevant for answering the users question. This is quite error-prone and can be improved by adding a description to the columns.

    There are 2 ways to do this: a. Manually add a description to the columns. While this is in fact a little tedious, it is the most accurate and reliable way to provide context to the model. b. Send a head(10) of each column to GPT-4o and ask for a description. The model will most probably be able to provide a reasonable enough description.

    As both methods are quite simple, we'll leave the implementation to the reader. For the sake of this post I used GPT-4o to refactor our schema as follows:

    1table_with_descriptions = [
    2 {'table_name': 'aiagent_test.churn_table', 'column_name': 'customerID', 'column_description': 'A unique identifier assigned to each customer.', 'column_type': 'String'},
    3 {'table_name': 'aiagent_test.churn_table', 'column_name': 'gender', 'column_description': 'The gender of the customer (Male/Female).', 'column_type': 'String'},
    4 {'table_name': 'aiagent_test.churn_table', 'column_name': 'SeniorCitizen', 'column_description': 'Indicates if the customer is a senior citizen (1 for Yes, 0 for No).', 'column_type': 'Integer'},
    5 {'table_name': 'aiagent_test.churn_table', 'column_name': 'Partner', 'column_description': 'Indicates if the customer has a partner (True for Yes, False for No).', 'column_type': 'Boolean'},
    6 {'table_name': 'aiagent_test.churn_table', 'column_name': 'Dependents', 'column_description': 'Indicates if the customer has dependents (True for Yes, False for No).', 'column_type': 'Boolean'},
    7 {'table_name': 'aiagent_test.churn_table', 'column_name': 'tenure', 'column_description': 'Number of months the customer has stayed with the company.', 'column_type': 'Integer'},
    8 {'table_name': 'aiagent_test.churn_table', 'column_name': 'PhoneService', 'column_description': 'Indicates if the customer has phone service (True for Yes, False for No).', 'column_type': 'Boolean'},
    9 {'table_name': 'aiagent_test.churn_table', 'column_name': 'MultipleLines', 'column_description': 'Indicates if the customer has multiple lines (Yes, No, No phone service).', 'column_type': 'String'},
    10 {'table_name': 'aiagent_test.churn_table', 'column_name': 'InternetService', 'column_description': 'Type of internet service provided to the customer (DSL, Fiber optic, No).', 'column_type': 'String'},
    11 {'table_name': 'aiagent_test.churn_table', 'column_name': 'OnlineSecurity', 'column_description': 'Indicates if the customer has online security service (Yes, No, No internet service).', 'column_type': 'String'},
    12 {'table_name': 'aiagent_test.churn_table', 'column_name': 'OnlineBackup', 'column_description': 'Indicates if the customer has online backup service (Yes, No, No internet service).', 'column_type': 'String'},
    13 {'table_name': 'aiagent_test.churn_table', 'column_name': 'DeviceProtection', 'column_description': 'Indicates if the customer has device protection plan (Yes, No, No internet service).', 'column_type': 'String'},
    14 {'table_name': 'aiagent_test.churn_table', 'column_name': 'TechSupport', 'column_description': 'Indicates if the customer has tech support service (Yes, No, No internet service).', 'column_type': 'String'},
    15 {'table_name': 'aiagent_test.churn_table', 'column_name': 'StreamingTV', 'column_description': 'Indicates if the customer has streaming TV service (Yes, No, No internet service).', 'column_type': 'String'},
    16 {'table_name': 'aiagent_test.churn_table', 'column_name': 'StreamingMovies', 'column_description': 'Indicates if the customer has streaming movies service (Yes, No, No internet service).', 'column_type': 'String'},
    17 {'table_name': 'aiagent_test.churn_table', 'column_name': 'Contract', 'column_description': 'The type of contract the customer has (Month-to-month, One year, Two year).', 'column_type': 'String'},
    18 {'table_name': 'aiagent_test.churn_table', 'column_name': 'PaperlessBilling', 'column_description': 'Indicates if the customer uses paperless billing (True for Yes, False for No).', 'column_type': 'Boolean'},
    19 {'table_name': 'aiagent_test.churn_table', 'column_name': 'PaymentMethod', 'column_description': 'The payment method used by the customer (Electronic check, Mailed check, Bank transfer, Credit card).', 'column_type': 'String'},
    20 {'table_name': 'aiagent_test.churn_table', 'column_name': 'MonthlyCharges', 'column_description': 'The amount charged to the customer on a monthly basis.', 'column_type': 'Float'},
    21 {'table_name': 'aiagent_test.churn_table', 'column_name': 'TotalCharges', 'column_description': 'The total amount charged to the customer.', 'column_type': 'String'},
    22 {'table_name': 'aiagent_test.churn_table', 'column_name': 'Churn', 'column_description': 'Indicates if the customer has churned (True for Yes, False for No).', 'column_type': 'Boolean'}
    23]
  2. Using RAG to first find potentially relevant tables and columns to reduce the search space for the AI model.

    This method is quite powerful and makes use of the fact, that most user queries are somehow related to the available columns. By using a RAG system to search for relevant columns and tables before asking the AI to create a query, we can potentially drastically improve the quality of the generated results.

Using RAG for text2sql BigQuery SQL agents

To demonstrate how such a RAG-assisted text2sql system might look like, let's extend the agent we just created.

The concept for RAG is always the same: we use a set of texts (in our case column and table names), create vector indexes for them and then use these indexes to find semantically similar texts (columns and tables) to the user query.

More specifically, we are creating an index on our column descriptions and use the other keys (table_name, column_name and column_type) as metadata.

We use ChromaDB as our vector store for this example, but any other vector store will work as well.

1
2import chromadb
3from chromadb.config import Settings
4
5async def get_embedding(description: str):
6 client = AsyncOpenAI(api_key=openai.api_key)
7 response = await client.embeddings.create(
8 input=[description], model="text-embedding-ada-002"
9 )
10
11 return response.data[0].embedding
12
13
14# Generate vector representations for descriptions
15for item in table_with_descriptions:
16 description = item["column_description"]
17 embedding = await get_embedding(description)
18 item["vector"] = embedding
19
20# Initialize ChromaDB
21client = chromadb.Client(Settings())
22collection_name = "telecom_churn"
23if collection_name in client.list_collections():
24 collection = client.get_collection(collection_name)
25else:
26 collection = client.create_collection(collection_name)
27
28# Add data to ChromaDB
29documents = [item["column_description"] for item in table_with_descriptions]
30embeddings = [item["vector"] for item in table_with_descriptions]
31metadatas = [
32 {k: v for k, v in item.items() if k not in ["vector", "column_description"]}
33 for item in table_with_descriptions
34]
35ids = [f"doc{idx+1}" for idx in range(len(table_with_descriptions))]
36
37collection.add(documents=documents, embeddings=embeddings, metadatas=metadatas, ids=ids)

Note: ChromaDB provides a really handy feature where you just add the "document" (the text you want to index) to the collection.add() method - without providing any embeddings. ChromaDB will handle the embedding creation for you. We chose to manually create embeddings in this example so that it's easier to port to other vector stores. If you use ChromaDB anyhow, feel free to make use of this feature though.

So, embeddings created, lets create a function to retrieve the most relevant columns.

1async def query_chromadb(question: str, threshold=0.5):
2 # Get the embedding for the user question
3 query_vector = await get_embedding(question)
4
5 # Query ChromaDB for the most relevant documents
6 results = collection.query(
7 query_embeddings=[query_vector],
8 n_results=10, # Adjust the number of results as needed
9 )
10
11 # Extract relevant metadata from the results
12 relevant_results = []
13
14 # Filter results based on similarity threshold
15 for i, score in enumerate(results["distances"][0]):
16 if score < threshold:
17 document = results["documents"][0][i]
18 metadata = results["metadatas"][0][i]
19 relevant_results.append(
20 {
21 "table_name": metadata["table_name"],
22 "column_name": metadata["column_name"],
23 "column_type": metadata["column_type"],
24 "column_description": document,
25 "score": score,
26 }
27 )
28
29 relevant_results = sorted(relevant_results, key=lambda x: x["score"], reverse=False)
30 relevant_results
31
32 return relevant_results if relevant_results else []

This function finds the 10 most relevant columns with regards to the user query. Adjust this number as needed.

As ChromaDB does not provide a "threshold" parameter, we than manually filter the results to only include columns which are somewhat relevant for our purposes.

NOTE: If your vector store provides a threshold feature, you can skip the manual filtering step.

Now let's use this function in our agent:

1async def execute_agent_with_rag(question: str):
2 rag_schema = await query_chromadb(question) # <-- That's the only change needed
3 query = await query_database(question, rag_schema)
4 print(query)
5
6 with engine.connect() as connection:
7 runs = 0
8 while runs < 10:
9 try:
10 # Execute the query
11 result = connection.execute(text(query))
12 break
13 except Exception as e:
14 print("Oh no, an error. Never mind... We'll figure this out.", e)
15 query = asyncio.run(error_handling(question, query, schema_info, e))
16 print("New query: ", query)
17 if runs >= 10:
18 print("Stopping agent after 10 failed attempts.")
19 columns = [col for col in result.keys()]
20 data = result.fetchall()
21 return pd.DataFrame(data, columns=columns)

The only change needed is to call the query_chromadb function before calling the query_database function. Instead of sending over the whole table schema, we only send the subset of columns which might be relevant for the job.

Let's try it out once again:

1await execute_agent_with_rag(
2 "What's the percentage of male and female customers of users who churned?"
3)

And - thank god - our agent still works like a charm!

Conclusion

To conclude this blog post on creating a conversational interface for Google BigQuery using GPT-4, let's summarize the key points and discuss the implications of this technology:

We've demonstrated how to build a useful, simple AI agent that can translate natural language queries into SQL, execute them against a BigQuery database, and handle errors automatically.

The implementation combines several cutting-edge technologies:

  • OpenAI's GPT-4 for natural language understanding and SQL generation

  • Google BigQuery for data storage and

  • querying Retrieval Augmented Generation (RAG) for improving query relevance

This approach allows non-technical users to interact with complex databases using everyday language, significantly lowering the barrier to data analysis.

We've shown how to improve the accuracy of SQL generation by:

  • Adding descriptions to database columns

  • Using RAG to find relevant tables and columns before query generation

The flexibility of this custom implementation allows for fine-tuning and optimization specific to BigQuery and particular use cases.

We find, that these natural language interfaces for databases have the potential to change the way how organizations interact with their data. It democratizes access to complex databases, allowing more people within an organization to derive insights without needing to learn SQL or rely heavily on heavily on data teams. Or - even better - allow data professionals to focus focus more on the actual analysis and less on the data retrieval.

However, it's important to note that while this system is powerful, it's not infallible. Users should always verify critical results, especially for complex queries or when making important business decisions based on the output.

NOTE: If you need help with implementing such a system for your organization, feel free to reach out to us - we are happy to help!

Further reading

Interested in how to train your very own Large Language Model?

We prepared a well-researched guide for how to use the latest advancements in Open Source technology to fine-tune your own LLM. This has many advantages like:

  • Cost control
  • Data privacy
  • Excellent performance - adjusted specifically for your intended use
More information on our managed RAG solution?
To Pondhouse AI
More tips and tricks on how to work with AI?
To our Blog