Podcasts are one of the richest knowledge sources on the internet — but their content is completely locked in audio. You can't Ctrl+F across 200 episodes. You can't ask "what was that episode about a fish tank and a casino hack?" and get a cited answer. This system fixes that: Gemini 2.0 Flash transcribes the audio, Google's text-embedding-004 maps the semantic meaning, and ChromaDB stores it all so you can query across every episode in seconds — with source citations.
What this system does:
The API Landscape: What Gemini Actually Offers
The draft tutorial for this article referenced "Gemini 3 File Search" with client.file_search_stores.create()—which doesn't exist. Here's what the real Gemini API provides as of 2026:
For a persistent, production-ready podcast database, the most reliable path is: Gemini for transcription + Gemini Embeddings + ChromaDB (local vector database). This is fully open, doesn't require beta API access, and owns your data.
Architecture
┌──────────────────────────────────────────────────────────────┐
│ INGESTION PIPELINE │
│ │
│ RSS Feed → Download MP3 → Gemini Files API → Transcription │
│ ↓ │
│ Transcript → Chunk (500 words) → text-embedding-004 │
│ ↓ │
│ ChromaDB (local) │
└──────────────────────────────────────────────────────────────┘
↕ persist
┌──────────────────────────────────────────────────────────────┐
│ QUERY PIPELINE │
│ │
│ User Question → text-embedding-004 → ChromaDB search │
│ ↓ │
│ Top-K chunks retrieved → Gemini 2.0 Flash → Answer │
│ ↓ │
│ Cited response (episode title, timestamp, context) │
└──────────────────────────────────────────────────────────────┘
Setup
Installation
pip install google-generativeai feedparser requests chromadb python-dotenv
Create .env (add to .gitignore):
GEMINI_API_KEY=AIza...
Get your key at aistudio.google.com → Get API Key (free tier available).
Step 1: Audio Transcription with Gemini 2.0 Flash
Gemini 2.0 Flash is multimodal—it can process audio files directly. We upload the file first using the Files API, then pass the file reference to the model.
# transcribe.py
import google.generativeai as genai
import os
import time
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
def transcribe_audio(audio_path: str, episode_title: str = "") -> str:
"""
Transcribe a podcast audio file using Gemini 2.0 Flash.
Steps:
1. Upload file using Files API (returns a file URI)
2. Wait for processing (state changes from PROCESSING to ACTIVE)
3. Pass file reference to generate_content
4. Return transcript text
"""
audio_file = None
try:
print(f" 📤 Uploading {Path(audio_path).name} to Gemini Files API...")
audio_file = genai.upload_file(
path=audio_path,
display_name=episode_title or Path(audio_path).stem,
)
# Wait for file to finish processing (usually <30 seconds for audio)
max_wait = 120
waited = 0
while audio_file.state.name == "PROCESSING" and waited < max_wait:
time.sleep(5)
waited += 5
audio_file = genai.get_file(audio_file.name)
print(f" ⏳ Processing... ({waited}s)")
if audio_file.state.name != "ACTIVE":
raise RuntimeError(f"File processing failed: state={audio_file.state.name}")
print(f" 🎙️ Transcribing with Gemini 2.0 Flash...")
model = genai.GenerativeModel("gemini-2.0-flash")
response = model.generate_content(
[
audio_file, # The uploaded file reference
(
"Generate a detailed transcript of this podcast audio. "
"Label speakers as 'HOST' and 'GUEST' when distinguishable. "
"Preserve the natural flow of conversation. "
"Do not summarize—transcribe everything verbatim."
),
],
request_options={"timeout": 600}, # Long timeout for full episodes
)
transcript = response.text
print(f" ✅ Transcribed: {len(transcript.split())} words")
return transcript
finally:
# Delete from Files API to avoid storage accumulation
# (Files API has a 20GB limit per project)
if audio_file and audio_file.state.name == "ACTIVE":
try:
genai.delete_file(audio_file.name)
except Exception:
pass
def save_transcript(transcript: str, episode: dict, output_dir: str = "transcripts") -> str:
"""Save transcript to disk and return the file path."""
os.makedirs(output_dir, exist_ok=True)
safe_title = "".join(c for c in episode["title"] if c.isalnum() or c in " -_").strip()
filename = f"{episode['num']:03d}_{safe_title[:60]}.txt"
filepath = os.path.join(output_dir, filename)
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"TITLE: {episode['title']}\n")
f.write(f"PODCAST: {episode.get('podcast', 'Unknown')}\n")
f.write(f"DATE: {episode.get('published', 'Unknown')}\n")
f.write(f"EPISODE_NUM: {episode['num']}\n")
f.write("=" * 60 + "\n\n")
f.write(transcript)
return filepath
Cost and Speed
- Cost: ~$0.08–0.15 per hour of audio (Gemini 2.0 Flash token pricing)
- Speed: A 45-minute episode transcribes in ~60–90 seconds
- Quality: Comparable to Whisper Large-v3 on English podcasts
Step 2: RSS Feed Ingestion
# ingest.py
import feedparser
import requests
import os
from pathlib import Path
def get_podcast_episodes(rss_url: str, max_episodes: int = 10) -> list[dict]:
"""
Parse an RSS feed and return episode metadata.
Works with any standard podcast RSS feed.
"""
print(f"📡 Fetching RSS feed: {rss_url}")
feed = feedparser.parse(rss_url)
if feed.bozo:
raise ValueError(f"RSS parse error: {feed.bozo_exception}")
podcast_name = feed.feed.get("title", "Unknown Podcast")
print(f" Podcast: {podcast_name} ({len(feed.entries)} total episodes)")
episodes = []
for i, entry in enumerate(feed.entries[:max_episodes]):
# Find audio enclosure (the actual MP3/AAC file)
audio_url = None
for enclosure in entry.get("enclosures", []):
if "audio" in enclosure.get("type", "") or enclosure.get("href", "").endswith((".mp3", ".m4a", ".aac")):
audio_url = enclosure["href"]
break
if not audio_url:
print(f" ⚠️ No audio found for: {entry.title}")
continue
episodes.append({
"num": i + 1,
"title": entry.get("title", f"Episode {i+1}"),
"summary": entry.get("summary", "")[:500],
"audio_url": audio_url,
"published": entry.get("published", ""),
"podcast": podcast_name,
"link": entry.get("link", ""),
})
print(f" Found {len(episodes)} episodes with audio")
return episodes
def download_audio(audio_url: str, dest_path: str, timeout: int = 120) -> bool:
"""Download audio file with progress indication."""
try:
response = requests.get(audio_url, stream=True, timeout=timeout)
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
downloaded = 0
with open(dest_path, "wb") as f:
for chunk in response.iter_content(chunk_size=65536):
f.write(chunk)
downloaded += len(chunk)
size_mb = downloaded / (1024 * 1024)
print(f" ⬇️ Downloaded: {Path(dest_path).name} ({size_mb:.1f} MB)")
return True
except requests.exceptions.Timeout:
print(f" ❌ Download timed out: {audio_url}")
return False
except Exception as e:
print(f" ❌ Download failed: {e}")
return False
Step 3: Chunking and Embeddings
A full podcast transcript can be 8,000–15,000 words. We split it into overlapping chunks, embed each chunk, and store them in ChromaDB.
# embed.py
import google.generativeai as genai
import chromadb
from chromadb.utils import embedding_functions
import os
import re
from dotenv import load_dotenv
load_dotenv()
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
def chunk_transcript(text: str, chunk_size: int = 500, overlap: int = 100) -> list[str]:
"""
Split transcript into overlapping word chunks.
overlap=100 means consecutive chunks share 100 words,
ensuring context isn't lost at chunk boundaries.
"""
words = text.split()
chunks = []
start = 0
while start < len(words):
end = min(start + chunk_size, len(words))
chunk = " ".join(words[start:end])
chunks.append(chunk)
if end == len(words):
break
start += chunk_size - overlap
return chunks
class GeminiEmbeddingFunction(chromadb.EmbeddingFunction):
"""Custom embedding function using Google's text-embedding-004."""
def __call__(self, input: chromadb.Documents) -> chromadb.Embeddings:
embeddings = []
# Batch embeddings (API supports up to 100 texts per request)
batch_size = 20
for i in range(0, len(input), batch_size):
batch = input[i : i + batch_size]
result = genai.embed_content(
model="models/text-embedding-004",
content=batch,
task_type="retrieval_document",
)
embeddings.extend(result["embedding"])
return embeddings
def get_or_create_collection(db_path: str = "./podcast_db") -> chromadb.Collection:
"""Initialize ChromaDB with Gemini embeddings."""
client = chromadb.PersistentClient(path=db_path)
embedding_fn = GeminiEmbeddingFunction()
collection = client.get_or_create_collection(
name="podcast_transcripts",
embedding_function=embedding_fn,
metadata={"hnsw:space": "cosine"},
)
return collection
def index_transcript(
transcript: str,
episode: dict,
collection: chromadb.Collection,
) -> int:
"""
Chunk, embed, and store a transcript in ChromaDB.
Returns the number of chunks indexed.
"""
# Parse header from transcript file (written in save_transcript)
lines = transcript.split("\n")
content_start = next(
(i for i, l in enumerate(lines) if l.startswith("=" * 10)), 0
) + 2
clean_transcript = "\n".join(lines[content_start:]).strip()
chunks = chunk_transcript(clean_transcript, chunk_size=400, overlap=80)
ids = []
documents = []
metadatas = []
for chunk_idx, chunk in enumerate(chunks):
# Skip very short chunks (likely headers/footers)
if len(chunk.split()) < 20:
continue
chunk_id = f"ep{episode['num']:03d}_chunk{chunk_idx:04d}"
ids.append(chunk_id)
documents.append(chunk)
metadatas.append(
{
"episode_num": episode["num"],
"episode_title": episode["title"],
"podcast": episode.get("podcast", ""),
"published": episode.get("published", ""),
"chunk_index": chunk_idx,
"total_chunks": len(chunks),
"link": episode.get("link", ""),
}
)
# Upsert (insert or update) — safe to re-run
collection.upsert(ids=ids, documents=documents, metadatas=metadatas)
print(f" 📝 Indexed {len(ids)} chunks from '{episode['title']}'")
return len(ids)
Step 4: The Query Interface
# query.py
import google.generativeai as genai
import chromadb
from embed import get_or_create_collection, GeminiEmbeddingFunction
import os
from dotenv import load_dotenv
load_dotenv()
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
def search_podcasts(
question: str,
collection: chromadb.Collection,
n_results: int = 5,
) -> list[dict]:
"""
Embed the question and retrieve the most relevant transcript chunks.
"""
# Embed the query with task_type="retrieval_query" (different from indexing)
query_embedding = genai.embed_content(
model="models/text-embedding-004",
content=question,
task_type="retrieval_query",
)["embedding"]
results = collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
include=["documents", "metadatas", "distances"],
)
chunks = []
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0],
):
chunks.append(
{
"text": doc,
"episode_title": meta["episode_title"],
"podcast": meta["podcast"],
"published": meta["published"],
"episode_num": meta["episode_num"],
"link": meta.get("link", ""),
"similarity": round(1 - dist, 3), # cosine distance → similarity
}
)
return chunks
ANSWER_SYSTEM = """You are an AI assistant that answers questions about podcast episodes.
You will be given:
1. A user question
2. Relevant transcript excerpts with source metadata
Your job:
- Answer the question based ONLY on the provided transcript excerpts
- Always cite the specific episode(s) you're drawing from
- If the answer spans multiple episodes, mention each one
- If the excerpts don't contain a clear answer, say so honestly
Format citations as: [Episode Title, published DATE]"""
def answer_question(
question: str,
collection: chromadb.Collection,
n_results: int = 5,
) -> dict:
"""
Retrieve relevant chunks and generate a cited answer using Gemini.
"""
chunks = search_podcasts(question, collection, n_results)
if not chunks:
return {"answer": "No relevant episodes found.", "sources": []}
# Build context string
context_parts = []
for i, chunk in enumerate(chunks, 1):
context_parts.append(
f"[Source {i}: '{chunk['episode_title']}' — {chunk['podcast']}, {chunk['published']}]\n"
f"{chunk['text']}\n"
)
context = "\n---\n".join(context_parts)
# Generate answer
model = genai.GenerativeModel(
"gemini-2.0-flash",
system_instruction=ANSWER_SYSTEM,
)
response = model.generate_content(
f"QUESTION: {question}\n\nTRANSCRIPT EXCERPTS:\n{context}\n\nANSWER:"
)
# Collect unique sources
seen = set()
sources = []
for chunk in chunks:
key = chunk["episode_title"]
if key not in seen:
seen.add(key)
sources.append(
{
"title": chunk["episode_title"],
"podcast": chunk["podcast"],
"published": chunk["published"],
"similarity": chunk["similarity"],
"link": chunk["link"],
}
)
return {
"answer": response.text,
"sources": sources,
"chunks_searched": len(chunks),
}
Step 5: Full Pipeline — Bulk Processing
Tie everything together with error handling and parallel processing:
# pipeline.py
import os
import json
import time
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from ingest import get_podcast_episodes, download_audio
from transcribe import transcribe_audio, save_transcript
from embed import get_or_create_collection, index_transcript
AUDIO_DIR = "audio"
TRANSCRIPT_DIR = "transcripts"
DB_PATH = "./podcast_db"
PROGRESS_FILE = "progress.json" # Track which episodes are done
def load_progress() -> dict:
if Path(PROGRESS_FILE).exists():
return json.loads(Path(PROGRESS_FILE).read_text())
return {}
def save_progress(progress: dict):
Path(PROGRESS_FILE).write_text(json.dumps(progress, indent=2))
def process_episode(episode: dict, collection, progress: dict) -> str:
"""Process one episode: download → transcribe → index."""
ep_key = f"ep_{episode['num']}"
status = progress.get(ep_key, {})
try:
# Step 1: Download audio
audio_path = os.path.join(AUDIO_DIR, f"ep{episode['num']:03d}.mp3")
if not status.get("downloaded"):
os.makedirs(AUDIO_DIR, exist_ok=True)
success = download_audio(episode["audio_url"], audio_path)
if not success:
return f"❌ Download failed: {episode['title']}"
progress[ep_key] = {"downloaded": True}
save_progress(progress)
# Step 2: Transcribe
transcript_path = os.path.join(
TRANSCRIPT_DIR,
f"{episode['num']:03d}_{episode['title'][:50]}.txt"
)
if not status.get("transcribed") and not Path(transcript_path).exists():
transcript = transcribe_audio(audio_path, episode["title"])
transcript_path = save_transcript(transcript, episode, TRANSCRIPT_DIR)
progress[ep_key]["transcribed"] = True
save_progress(progress)
# Clean up audio file to save disk space
if Path(audio_path).exists():
Path(audio_path).unlink()
print(f" 🗑️ Deleted audio file (transcript saved)")
# Step 3: Index in ChromaDB
if not status.get("indexed"):
if Path(transcript_path).exists():
with open(transcript_path, "r", encoding="utf-8") as f:
transcript_text = f.read()
num_chunks = index_transcript(transcript_text, episode, collection)
progress[ep_key]["indexed"] = True
progress[ep_key]["chunks"] = num_chunks
save_progress(progress)
return f"✅ {episode['title']}"
except Exception as e:
return f"❌ {episode['title']}: {e}"
def build_podcast_database(rss_url: str, max_episodes: int = 20):
"""
Full pipeline: fetch RSS → download audio → transcribe → index.
Resumes from previous progress automatically.
"""
collection = get_or_create_collection(DB_PATH)
progress = load_progress()
episodes = get_podcast_episodes(rss_url, max_episodes)
print(f"\n{'='*60}")
print(f"Processing {len(episodes)} episodes")
print(f"{'='*60}\n")
# Process episodes (sequential to avoid rate limits on transcription)
for i, episode in enumerate(episodes, 1):
print(f"\n[{i}/{len(episodes)}] {episode['title']}")
result = process_episode(episode, collection, progress)
print(f" → {result}")
time.sleep(2) # Be polite to the audio server
total_chunks = collection.count()
print(f"\n{'='*60}")
print(f"✅ Database ready: {total_chunks} chunks from {len(episodes)} episodes")
print(f"DB path: {DB_PATH}")
print(f"{'='*60}\n")
return collection
if __name__ == "__main__":
# Example: Google DeepMind Podcast
RSS_URL = "https://feeds.simplecast.com/your-podcast-feed"
collection = build_podcast_database(RSS_URL, max_episodes=10)
# Test query
from query import answer_question
result = answer_question(
"What was that story about a fish tank and cybersecurity?",
collection
)
print(f"\n💬 Answer:\n{result['answer']}")
print(f"\n📚 Sources:")
for src in result["sources"]:
print(f" - {src['title']} (similarity: {src['similarity']})")
Interactive Query Interface (Streamlit)
# app.py
import streamlit as st
from embed import get_or_create_collection
from query import answer_question
st.set_page_config(page_title="Podcast Search", page_icon="🎙️", layout="wide")
st.title("🎙️ Podcast Search")
st.caption("Ask questions across your entire podcast library")
@st.cache_resource
def load_collection():
return get_or_create_collection("./podcast_db")
collection = load_collection()
total_chunks = collection.count()
st.caption(f"📊 {total_chunks} transcript chunks indexed")
if total_chunks == 0:
st.warning("No episodes indexed yet. Run `pipeline.py` to build the database.")
st.stop()
# Search interface
question = st.text_input(
"Ask a question about any episode:",
placeholder="What was that story about a fish tank and cybersecurity?"
)
n_results = st.slider("Number of chunks to retrieve", 3, 10, 5)
if question:
with st.spinner("Searching..."):
result = answer_question(question, collection, n_results)
st.subheader("Answer")
st.markdown(result["answer"])
st.subheader("Sources")
for src in result["sources"]:
with st.expander(f"📻 {src['title']} — similarity: {src['similarity']}"):
st.write(f"**Podcast:** {src['podcast']}")
st.write(f"**Published:** {src['published']}")
if src.get("link"):
st.markdown(f"[🔗 Listen to episode]({src['link']})")
Run with:
streamlit run app.py
Example Queries and What You Get
Performance Benchmarks
Cost estimates (Google AI pricing):
- Transcription: ~$0.10 per hour of audio (Gemini 2.0 Flash input pricing)
- Embeddings: ~$0.001 per 1K tokens (text-embedding-004)
- Query answering: ~$0.01–0.05 per query
- Total for 50 episodes (~40 hrs audio): ~$5–10 one-time ingestion cost
Common Mistakes
Key Takeaways
genai.upload_file()) for temporary multimodal inputs, and ChromaDB + text-embedding-004 for persistent, searchable RAG databases.generate_content().task_type for indexing vs. querying. Embedding for storage: task_type="retrieval_document". Embedding user queries: task_type="retrieval_query". This small difference measurably improves retrieval quality.genai.delete_file().