Skip to main content
AI-Developer → AI Engineering

Build a Searchable Podcast Database with Gemini: Audio Transcription, Embeddings, and Semantic RAG

Use Gemini 2.0 Flash to transcribe podcast audio, generate embeddings with Google's text-embedding-004 model, and build a semantic search system that answers questions with citations across hundreds of episodes.

March 14, 2026
18 min read
#Gemini#Google AI#RAG#Podcast#Transcription#Embeddings#ChromaDB#Python

Podcasts are one of the richest knowledge sources on the internet — but their content is completely locked in audio. You can't Ctrl+F across 200 episodes. You can't ask "what was that episode about a fish tank and a casino hack?" and get a cited answer. This system fixes that: Gemini 2.0 Flash transcribes the audio, Google's text-embedding-004 maps the semantic meaning, and ChromaDB stores it all so you can query across every episode in seconds — with source citations.

What this system does:

✅ Parses RSS feeds to get episode list
✅ Transcribes audio using Gemini 2.0 Flash
✅ Embeds transcripts with Google's text-embedding-004
✅ Stores vectors in ChromaDB (persistent, local)
✅ Answers questions with cited sources
✅ Scales to hundreds of episodes

The API Landscape: What Gemini Actually Offers

The draft tutorial for this article referenced "Gemini 3 File Search" with client.file_search_stores.create()—which doesn't exist. Here's what the real Gemini API provides as of 2026:

Feature API Use Case Persistence
Files API genai.upload_file() Upload files for multimodal prompts 48-hour TTL
Embeddings models/text-embedding-004 Generate vectors for semantic search Your choice
Audio Transcription Gemini 2.0 Flash multimodal Convert speech to text N/A
Semantic Retrieval Corpus API (beta) Managed RAG within Google's infrastructure Google-hosted
Grounding Google Search tool Ground answers with live web search N/A

For a persistent, production-ready podcast database, the most reliable path is: Gemini for transcription + Gemini Embeddings + ChromaDB (local vector database). This is fully open, doesn't require beta API access, and owns your data.

Architecture

┌──────────────────────────────────────────────────────────────┐
│                    INGESTION PIPELINE                        │
│                                                              │
│  RSS Feed → Download MP3 → Gemini Files API → Transcription │
│                                   ↓                         │
│  Transcript → Chunk (500 words) → text-embedding-004        │
│                                   ↓                         │
│                         ChromaDB (local)                    │
└──────────────────────────────────────────────────────────────┘
                           ↕ persist
┌──────────────────────────────────────────────────────────────┐
│                     QUERY PIPELINE                           │
│                                                              │
│  User Question → text-embedding-004 → ChromaDB search       │
│       ↓                                                      │
│  Top-K chunks retrieved → Gemini 2.0 Flash → Answer         │
│       ↓                                                      │
│  Cited response (episode title, timestamp, context)          │
└──────────────────────────────────────────────────────────────┘

Setup

Installation

pip install google-generativeai feedparser requests chromadb python-dotenv

Create .env (add to .gitignore):

GEMINI_API_KEY=AIza...

Get your key at aistudio.google.com → Get API Key (free tier available).


Step 1: Audio Transcription with Gemini 2.0 Flash

Gemini 2.0 Flash is multimodal—it can process audio files directly. We upload the file first using the Files API, then pass the file reference to the model.

# transcribe.py
import google.generativeai as genai
import os
import time
from pathlib import Path
from dotenv import load_dotenv

load_dotenv()
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))

def transcribe_audio(audio_path: str, episode_title: str = "") -> str:
    """
    Transcribe a podcast audio file using Gemini 2.0 Flash.

    Steps:
    1. Upload file using Files API (returns a file URI)
    2. Wait for processing (state changes from PROCESSING to ACTIVE)
    3. Pass file reference to generate_content
    4. Return transcript text
    """
    audio_file = None
    try:
        print(f"  📤 Uploading {Path(audio_path).name} to Gemini Files API...")
        audio_file = genai.upload_file(
            path=audio_path,
            display_name=episode_title or Path(audio_path).stem,
        )

        # Wait for file to finish processing (usually <30 seconds for audio)
        max_wait = 120
        waited = 0
        while audio_file.state.name == "PROCESSING" and waited < max_wait:
            time.sleep(5)
            waited += 5
            audio_file = genai.get_file(audio_file.name)
            print(f"  ⏳ Processing... ({waited}s)")

        if audio_file.state.name != "ACTIVE":
            raise RuntimeError(f"File processing failed: state={audio_file.state.name}")

        print(f"  🎙️ Transcribing with Gemini 2.0 Flash...")
        model = genai.GenerativeModel("gemini-2.0-flash")
        response = model.generate_content(
            [
                audio_file,  # The uploaded file reference
                (
                    "Generate a detailed transcript of this podcast audio. "
                    "Label speakers as 'HOST' and 'GUEST' when distinguishable. "
                    "Preserve the natural flow of conversation. "
                    "Do not summarize—transcribe everything verbatim."
                ),
            ],
            request_options={"timeout": 600},  # Long timeout for full episodes
        )

        transcript = response.text
        print(f"  ✅ Transcribed: {len(transcript.split())} words")
        return transcript

    finally:
        # Delete from Files API to avoid storage accumulation
        # (Files API has a 20GB limit per project)
        if audio_file and audio_file.state.name == "ACTIVE":
            try:
                genai.delete_file(audio_file.name)
            except Exception:
                pass


def save_transcript(transcript: str, episode: dict, output_dir: str = "transcripts") -> str:
    """Save transcript to disk and return the file path."""
    os.makedirs(output_dir, exist_ok=True)
    safe_title = "".join(c for c in episode["title"] if c.isalnum() or c in " -_").strip()
    filename = f"{episode['num']:03d}_{safe_title[:60]}.txt"
    filepath = os.path.join(output_dir, filename)

    with open(filepath, "w", encoding="utf-8") as f:
        f.write(f"TITLE: {episode['title']}\n")
        f.write(f"PODCAST: {episode.get('podcast', 'Unknown')}\n")
        f.write(f"DATE: {episode.get('published', 'Unknown')}\n")
        f.write(f"EPISODE_NUM: {episode['num']}\n")
        f.write("=" * 60 + "\n\n")
        f.write(transcript)

    return filepath

Cost and Speed

  • Cost: ~$0.08–0.15 per hour of audio (Gemini 2.0 Flash token pricing)
  • Speed: A 45-minute episode transcribes in ~60–90 seconds
  • Quality: Comparable to Whisper Large-v3 on English podcasts

Step 2: RSS Feed Ingestion

# ingest.py
import feedparser
import requests
import os
from pathlib import Path

def get_podcast_episodes(rss_url: str, max_episodes: int = 10) -> list[dict]:
    """
    Parse an RSS feed and return episode metadata.

    Works with any standard podcast RSS feed.
    """
    print(f"📡 Fetching RSS feed: {rss_url}")
    feed = feedparser.parse(rss_url)

    if feed.bozo:
        raise ValueError(f"RSS parse error: {feed.bozo_exception}")

    podcast_name = feed.feed.get("title", "Unknown Podcast")
    print(f"  Podcast: {podcast_name} ({len(feed.entries)} total episodes)")

    episodes = []
    for i, entry in enumerate(feed.entries[:max_episodes]):
        # Find audio enclosure (the actual MP3/AAC file)
        audio_url = None
        for enclosure in entry.get("enclosures", []):
            if "audio" in enclosure.get("type", "") or enclosure.get("href", "").endswith((".mp3", ".m4a", ".aac")):
                audio_url = enclosure["href"]
                break

        if not audio_url:
            print(f"  ⚠️ No audio found for: {entry.title}")
            continue

        episodes.append({
            "num": i + 1,
            "title": entry.get("title", f"Episode {i+1}"),
            "summary": entry.get("summary", "")[:500],
            "audio_url": audio_url,
            "published": entry.get("published", ""),
            "podcast": podcast_name,
            "link": entry.get("link", ""),
        })

    print(f"  Found {len(episodes)} episodes with audio")
    return episodes


def download_audio(audio_url: str, dest_path: str, timeout: int = 120) -> bool:
    """Download audio file with progress indication."""
    try:
        response = requests.get(audio_url, stream=True, timeout=timeout)
        response.raise_for_status()

        total_size = int(response.headers.get("content-length", 0))
        downloaded = 0

        with open(dest_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=65536):
                f.write(chunk)
                downloaded += len(chunk)

        size_mb = downloaded / (1024 * 1024)
        print(f"  ⬇️ Downloaded: {Path(dest_path).name} ({size_mb:.1f} MB)")
        return True

    except requests.exceptions.Timeout:
        print(f"  ❌ Download timed out: {audio_url}")
        return False
    except Exception as e:
        print(f"  ❌ Download failed: {e}")
        return False

Step 3: Chunking and Embeddings

A full podcast transcript can be 8,000–15,000 words. We split it into overlapping chunks, embed each chunk, and store them in ChromaDB.

# embed.py
import google.generativeai as genai
import chromadb
from chromadb.utils import embedding_functions
import os
import re
from dotenv import load_dotenv

load_dotenv()
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))


def chunk_transcript(text: str, chunk_size: int = 500, overlap: int = 100) -> list[str]:
    """
    Split transcript into overlapping word chunks.

    overlap=100 means consecutive chunks share 100 words,
    ensuring context isn't lost at chunk boundaries.
    """
    words = text.split()
    chunks = []
    start = 0

    while start < len(words):
        end = min(start + chunk_size, len(words))
        chunk = " ".join(words[start:end])
        chunks.append(chunk)
        if end == len(words):
            break
        start += chunk_size - overlap

    return chunks


class GeminiEmbeddingFunction(chromadb.EmbeddingFunction):
    """Custom embedding function using Google's text-embedding-004."""

    def __call__(self, input: chromadb.Documents) -> chromadb.Embeddings:
        embeddings = []
        # Batch embeddings (API supports up to 100 texts per request)
        batch_size = 20
        for i in range(0, len(input), batch_size):
            batch = input[i : i + batch_size]
            result = genai.embed_content(
                model="models/text-embedding-004",
                content=batch,
                task_type="retrieval_document",
            )
            embeddings.extend(result["embedding"])
        return embeddings


def get_or_create_collection(db_path: str = "./podcast_db") -> chromadb.Collection:
    """Initialize ChromaDB with Gemini embeddings."""
    client = chromadb.PersistentClient(path=db_path)
    embedding_fn = GeminiEmbeddingFunction()

    collection = client.get_or_create_collection(
        name="podcast_transcripts",
        embedding_function=embedding_fn,
        metadata={"hnsw:space": "cosine"},
    )
    return collection


def index_transcript(
    transcript: str,
    episode: dict,
    collection: chromadb.Collection,
) -> int:
    """
    Chunk, embed, and store a transcript in ChromaDB.
    Returns the number of chunks indexed.
    """
    # Parse header from transcript file (written in save_transcript)
    lines = transcript.split("\n")
    content_start = next(
        (i for i, l in enumerate(lines) if l.startswith("=" * 10)), 0
    ) + 2
    clean_transcript = "\n".join(lines[content_start:]).strip()

    chunks = chunk_transcript(clean_transcript, chunk_size=400, overlap=80)

    ids = []
    documents = []
    metadatas = []

    for chunk_idx, chunk in enumerate(chunks):
        # Skip very short chunks (likely headers/footers)
        if len(chunk.split()) < 20:
            continue

        chunk_id = f"ep{episode['num']:03d}_chunk{chunk_idx:04d}"
        ids.append(chunk_id)
        documents.append(chunk)
        metadatas.append(
            {
                "episode_num": episode["num"],
                "episode_title": episode["title"],
                "podcast": episode.get("podcast", ""),
                "published": episode.get("published", ""),
                "chunk_index": chunk_idx,
                "total_chunks": len(chunks),
                "link": episode.get("link", ""),
            }
        )

    # Upsert (insert or update) — safe to re-run
    collection.upsert(ids=ids, documents=documents, metadatas=metadatas)

    print(f"  📝 Indexed {len(ids)} chunks from '{episode['title']}'")
    return len(ids)

Step 4: The Query Interface

# query.py
import google.generativeai as genai
import chromadb
from embed import get_or_create_collection, GeminiEmbeddingFunction
import os
from dotenv import load_dotenv

load_dotenv()
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))


def search_podcasts(
    question: str,
    collection: chromadb.Collection,
    n_results: int = 5,
) -> list[dict]:
    """
    Embed the question and retrieve the most relevant transcript chunks.
    """
    # Embed the query with task_type="retrieval_query" (different from indexing)
    query_embedding = genai.embed_content(
        model="models/text-embedding-004",
        content=question,
        task_type="retrieval_query",
    )["embedding"]

    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=n_results,
        include=["documents", "metadatas", "distances"],
    )

    chunks = []
    for doc, meta, dist in zip(
        results["documents"][0],
        results["metadatas"][0],
        results["distances"][0],
    ):
        chunks.append(
            {
                "text": doc,
                "episode_title": meta["episode_title"],
                "podcast": meta["podcast"],
                "published": meta["published"],
                "episode_num": meta["episode_num"],
                "link": meta.get("link", ""),
                "similarity": round(1 - dist, 3),  # cosine distance → similarity
            }
        )

    return chunks


ANSWER_SYSTEM = """You are an AI assistant that answers questions about podcast episodes.

You will be given:
1. A user question
2. Relevant transcript excerpts with source metadata

Your job:
- Answer the question based ONLY on the provided transcript excerpts
- Always cite the specific episode(s) you're drawing from
- If the answer spans multiple episodes, mention each one
- If the excerpts don't contain a clear answer, say so honestly

Format citations as: [Episode Title, published DATE]"""


def answer_question(
    question: str,
    collection: chromadb.Collection,
    n_results: int = 5,
) -> dict:
    """
    Retrieve relevant chunks and generate a cited answer using Gemini.
    """
    chunks = search_podcasts(question, collection, n_results)

    if not chunks:
        return {"answer": "No relevant episodes found.", "sources": []}

    # Build context string
    context_parts = []
    for i, chunk in enumerate(chunks, 1):
        context_parts.append(
            f"[Source {i}: '{chunk['episode_title']}' — {chunk['podcast']}, {chunk['published']}]\n"
            f"{chunk['text']}\n"
        )
    context = "\n---\n".join(context_parts)

    # Generate answer
    model = genai.GenerativeModel(
        "gemini-2.0-flash",
        system_instruction=ANSWER_SYSTEM,
    )
    response = model.generate_content(
        f"QUESTION: {question}\n\nTRANSCRIPT EXCERPTS:\n{context}\n\nANSWER:"
    )

    # Collect unique sources
    seen = set()
    sources = []
    for chunk in chunks:
        key = chunk["episode_title"]
        if key not in seen:
            seen.add(key)
            sources.append(
                {
                    "title": chunk["episode_title"],
                    "podcast": chunk["podcast"],
                    "published": chunk["published"],
                    "similarity": chunk["similarity"],
                    "link": chunk["link"],
                }
            )

    return {
        "answer": response.text,
        "sources": sources,
        "chunks_searched": len(chunks),
    }

Step 5: Full Pipeline — Bulk Processing

Tie everything together with error handling and parallel processing:

# pipeline.py
import os
import json
import time
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

from ingest import get_podcast_episodes, download_audio
from transcribe import transcribe_audio, save_transcript
from embed import get_or_create_collection, index_transcript


AUDIO_DIR = "audio"
TRANSCRIPT_DIR = "transcripts"
DB_PATH = "./podcast_db"
PROGRESS_FILE = "progress.json"  # Track which episodes are done


def load_progress() -> dict:
    if Path(PROGRESS_FILE).exists():
        return json.loads(Path(PROGRESS_FILE).read_text())
    return {}


def save_progress(progress: dict):
    Path(PROGRESS_FILE).write_text(json.dumps(progress, indent=2))


def process_episode(episode: dict, collection, progress: dict) -> str:
    """Process one episode: download → transcribe → index."""
    ep_key = f"ep_{episode['num']}"
    status = progress.get(ep_key, {})

    try:
        # Step 1: Download audio
        audio_path = os.path.join(AUDIO_DIR, f"ep{episode['num']:03d}.mp3")
        if not status.get("downloaded"):
            os.makedirs(AUDIO_DIR, exist_ok=True)
            success = download_audio(episode["audio_url"], audio_path)
            if not success:
                return f"❌ Download failed: {episode['title']}"
            progress[ep_key] = {"downloaded": True}
            save_progress(progress)

        # Step 2: Transcribe
        transcript_path = os.path.join(
            TRANSCRIPT_DIR,
            f"{episode['num']:03d}_{episode['title'][:50]}.txt"
        )
        if not status.get("transcribed") and not Path(transcript_path).exists():
            transcript = transcribe_audio(audio_path, episode["title"])
            transcript_path = save_transcript(transcript, episode, TRANSCRIPT_DIR)
            progress[ep_key]["transcribed"] = True
            save_progress(progress)

            # Clean up audio file to save disk space
            if Path(audio_path).exists():
                Path(audio_path).unlink()
                print(f"  🗑️ Deleted audio file (transcript saved)")

        # Step 3: Index in ChromaDB
        if not status.get("indexed"):
            if Path(transcript_path).exists():
                with open(transcript_path, "r", encoding="utf-8") as f:
                    transcript_text = f.read()
                num_chunks = index_transcript(transcript_text, episode, collection)
                progress[ep_key]["indexed"] = True
                progress[ep_key]["chunks"] = num_chunks
                save_progress(progress)

        return f"✅ {episode['title']}"

    except Exception as e:
        return f"❌ {episode['title']}: {e}"


def build_podcast_database(rss_url: str, max_episodes: int = 20):
    """
    Full pipeline: fetch RSS → download audio → transcribe → index.
    Resumes from previous progress automatically.
    """
    collection = get_or_create_collection(DB_PATH)
    progress = load_progress()

    episodes = get_podcast_episodes(rss_url, max_episodes)

    print(f"\n{'='*60}")
    print(f"Processing {len(episodes)} episodes")
    print(f"{'='*60}\n")

    # Process episodes (sequential to avoid rate limits on transcription)
    for i, episode in enumerate(episodes, 1):
        print(f"\n[{i}/{len(episodes)}] {episode['title']}")
        result = process_episode(episode, collection, progress)
        print(f"  → {result}")
        time.sleep(2)  # Be polite to the audio server

    total_chunks = collection.count()
    print(f"\n{'='*60}")
    print(f"✅ Database ready: {total_chunks} chunks from {len(episodes)} episodes")
    print(f"DB path: {DB_PATH}")
    print(f"{'='*60}\n")

    return collection


if __name__ == "__main__":
    # Example: Google DeepMind Podcast
    RSS_URL = "https://feeds.simplecast.com/your-podcast-feed"

    collection = build_podcast_database(RSS_URL, max_episodes=10)

    # Test query
    from query import answer_question

    result = answer_question(
        "What was that story about a fish tank and cybersecurity?",
        collection
    )
    print(f"\n💬 Answer:\n{result['answer']}")
    print(f"\n📚 Sources:")
    for src in result["sources"]:
        print(f"  - {src['title']} (similarity: {src['similarity']})")

Interactive Query Interface (Streamlit)

# app.py
import streamlit as st
from embed import get_or_create_collection
from query import answer_question

st.set_page_config(page_title="Podcast Search", page_icon="🎙️", layout="wide")
st.title("🎙️ Podcast Search")
st.caption("Ask questions across your entire podcast library")

@st.cache_resource
def load_collection():
    return get_or_create_collection("./podcast_db")

collection = load_collection()
total_chunks = collection.count()
st.caption(f"📊 {total_chunks} transcript chunks indexed")

if total_chunks == 0:
    st.warning("No episodes indexed yet. Run `pipeline.py` to build the database.")
    st.stop()

# Search interface
question = st.text_input(
    "Ask a question about any episode:",
    placeholder="What was that story about a fish tank and cybersecurity?"
)

n_results = st.slider("Number of chunks to retrieve", 3, 10, 5)

if question:
    with st.spinner("Searching..."):
        result = answer_question(question, collection, n_results)

    st.subheader("Answer")
    st.markdown(result["answer"])

    st.subheader("Sources")
    for src in result["sources"]:
        with st.expander(f"📻 {src['title']} — similarity: {src['similarity']}"):
            st.write(f"**Podcast:** {src['podcast']}")
            st.write(f"**Published:** {src['published']}")
            if src.get("link"):
                st.markdown(f"[🔗 Listen to episode]({src['link']})")

Run with:

streamlit run app.py

Example Queries and What You Get

Specific Story Recall
"What was that story about a fish tank and cybersecurity?"
→ Finds the exact episode, quotes the discussion, provides timestamp context
Cross-Episode Synthesis
"Summarize everything discussed about protein folding across all episodes"
→ Pulls relevant chunks from multiple episodes, cites each one
Speaker Quote Retrieval
"What did the guest say about the limitations of transformers?"
→ Finds the relevant exchange with speaker labels from transcription

Performance Benchmarks

Task Time Notes
Transcription (45-min episode) 60–90 sec Gemini 2.0 Flash
Chunking + embedding (1 episode, ~400 chunks) 15–30 sec text-embedding-004
Full pipeline per episode ~2–3 min Including download
Query + answer generation 3–7 sec ChromaDB retrieval + Gemini
10 episodes (full pipeline) ~25 min Sequential processing
100 episodes ~4 hours With rate limiting delays

Cost estimates (Google AI pricing):

  • Transcription: ~$0.10 per hour of audio (Gemini 2.0 Flash input pricing)
  • Embeddings: ~$0.001 per 1K tokens (text-embedding-004)
  • Query answering: ~$0.01–0.05 per query
  • Total for 50 episodes (~40 hrs audio): ~$5–10 one-time ingestion cost

Common Mistakes

Mistake Consequence Fix
Using "gemini-3-pro-preview" model name 404 Not Found — model doesn't exist Use "gemini-2.0-flash" or "gemini-2.5-pro"
Not deleting Files API uploads Hit 20GB project storage limit Delete uploaded files after transcription
Using task_type="retrieval_document" for queries Lower quality retrieval Use task_type="retrieval_query" for queries, "retrieval_document" for indexing
Chunk size too large (>1000 words) Poor retrieval — chunks contain multiple topics Use 300–500 word chunks with 50–100 word overlap
No overlap between chunks Context lost at boundaries Add 15–20% overlap (chunk_size=400, overlap=80)
Not cleaning audio files Disk fills up quickly (each MP3 ~50–150MB) Delete audio after transcription; keep text only
Indexing same episode twice Duplicate results in queries Use collection.upsert() not add()

Key Takeaways

What to remember from this article:
1
There is no "Gemini 3 File Search" API. As of 2026, use the Files API (genai.upload_file()) for temporary multimodal inputs, and ChromaDB + text-embedding-004 for persistent, searchable RAG databases.
2
Gemini 2.0 Flash is a powerful transcription engine. It's multimodal, accepts audio files directly, and produces high-quality transcripts with speaker labels. Upload the file, wait for ACTIVE state, then call generate_content().
3
Use different task_type for indexing vs. querying. Embedding for storage: task_type="retrieval_document". Embedding user queries: task_type="retrieval_query". This small difference measurably improves retrieval quality.
4
Progress tracking prevents re-running expensive operations. Save a JSON file after each step (download, transcribe, index). If the pipeline fails mid-way, it resumes from where it left off—not from scratch.
5
Delete Files API uploads after transcription. The Files API stores uploads against your project's 20GB quota. After extracting the transcript text, delete the uploaded file with genai.delete_file().
MH

Mohamed Hamed

20 years building production systems — the last several deep in AI integration, LLMs, and full-stack architecture. I write what I've actually built and broken. If this was useful, the next one goes to LinkedIn first.

Follow on LinkedIn →