Write Your First Adapter
This guide walks through building a simple adapter that syncs text files from a directory into an OmniData container. By the end, you will understand the adapter contract and how to register your adapter for discovery.
Prerequisites
- Python 3.11+
- The
omnidatapackage installed - A bootstrapped
.omnidatacontainer to sync into
Step 1: Subclass AdapterBase
Every adapter extends AdapterBase and implements two methods:
from omnidata.adapters.base import AdapterBase, SyncResult
import hashlib
from pathlib import Path
class NotesAdapter(AdapterBase):
"""Syncs .txt files from a local directory."""
name = "notes"
uri_scheme = "notes"
def sync(self, index_db, config, state):
"""Discover new or changed text files."""
watch_dir = Path(config.get("watch_dir", "~/Notes")).expanduser()
created, updated, skipped = 0, 0, 0
errors = []
for filepath in watch_dir.glob("*.txt"):
uri = f"notes://{filepath}"
content = filepath.read_bytes()
content_hash = hashlib.sha256(content).hexdigest()
existing = index_db.execute(
"SELECT content_hash FROM omnidata_resources WHERE uri = ? AND deleted_at IS NULL",
(uri,)
).fetchone()
if existing is None:
self._create_resource(index_db, uri, filepath, content, content_hash)
created += 1
elif existing[0] != content_hash:
self._update_resource(index_db, uri, filepath, content, content_hash)
updated += 1
else:
skipped += 1
return SyncResult(
created=created,
updated=updated,
skipped=skipped,
errors=errors,
)
def read_content(self, index_db, resource):
"""Extract text content for chunking."""
filepath = resource["uri"].replace("notes://", "")
return Path(filepath).read_text(encoding="utf-8")
Step 2: Implement resource creation
The helper methods handle the actual database writes to index.db and blob storage to the blobs/ directory:
import uuid
from datetime import datetime, timezone
def _create_resource(self, index_db, uri, filepath, content, content_hash):
index_db.execute(
"""INSERT INTO omnidata_resources
(id, uri, source, resource_type, title, content_hash,
byte_size, mime_type, resource_at, pipeline_state)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'bronze')""",
(
str(uuid.uuid4()),
uri,
self.name,
"document",
filepath.name,
content_hash,
len(content),
"text/plain",
datetime.fromtimestamp(
filepath.stat().st_mtime, tz=timezone.utc
).strftime("%Y-%m-%dT%H:%M:%SZ"),
),
)
self._store_blob(content_hash, content) # writes to blobs/ directory
def _update_resource(self, index_db, uri, filepath, content, content_hash):
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
index_db.execute(
"""UPDATE omnidata_resources
SET content_hash = ?, byte_size = ?, updated_at = ?
WHERE uri = ? AND deleted_at IS NULL""",
(content_hash, len(content), now, uri),
)
self._store_blob(content_hash, content) # writes to blobs/ directory
Step 3: Register via entry point
In your pyproject.toml, register the adapter so the runtime discovers it automatically:
[project.entry-points."omnidata.adapters"]
notes = "my_package.adapters.notes:NotesAdapter"
After installing your package, the OmniData runtime will discover the notes adapter on startup.
Step 4: Add to adapters.json
Register the adapter in the container’s adapters.json so it runs on schedule:
{
"adapters": [
{
"id": "uuid-here",
"adapter_name": "notes",
"uri_scheme": "notes",
"enabled": true,
"sync_interval": 1800,
"configuration": {"watch_dir": "~/Notes"},
"state": {}
}
]
}
This file lives at the root of the .omnidata bundle, alongside manifest.json, index.db, memory.db, and the blobs/ directory.
Key patterns
- Idempotency: The adapter uses content-hash comparison to skip unchanged files
- Pipeline state: New resources start as
bronze— the runtime handles promotion to silver and gold - Separation of concerns:
sync()handles discovery,read_content()handles extraction - Soft deletes: Never DELETE rows — set
deleted_atif a source file disappears - Blob storage: Raw content is stored as content-addressed files in
blobs/, not in SQLite