Create a Custom Enhancement
A Custom Enhancement enables you to implement custom enhancement logic by posting enhancement batches directly to the API. This is a generic enhancement connector that provides maximum flexibility for implementing specialized document enhancement workflows.
This guide shows you how to create and configure a custom enhancement for your data ingestion workflows.
Prerequisites
Before you begin, ensure you have:
- Access to the Zeta Alpha Platform UI
- A tenant created
- An index created
- A content source with documents to enhance
- An API key with
ingestion-manager
role
Step 1: Create the Custom Enhancement Configuration
To create a custom enhancement connector, define a configuration file with the following structure:
{
"name": "My Custom Enhancement",
"description": "Custom enhancement logic for documents",
"is_indexable": true,
"connector": "custom_enhancement",
"connector_configuration": {
"content_source_id": "target-content-source-id"
}
}
Configuration Fields
content_source_id
: (string) The ID of the content source that this enhancement connector enhances.
Step 2: Create the Custom Enhancement Content Source
To create your custom enhancement connector in the Zeta Alpha Platform UI:
- Navigate to your tenant and click View next to your target index
- Click View under Content Sources for the index
- Click Create Content Source
- Paste your JSON configuration
- Click Submit
Important: Take note of the id
field in the response payload. You'll need this when posting enhancement batches.
Step 3: Implement Your Enhancement Logic
Unlike other connectors that run automatically, custom enhancements require you to implement the enhancement logic and call the API to submit enhancements.
Enhancement Workflow
- Monitor or query documents from the target content source
- Apply your custom enhancement logic
- Post enhancement batches to the API
- The platform processes and applies the enhancements to documents
Example Enhancement Logic (Python)
import requests
import json
# Configuration
TENANT = "my-tenant"
INDEX_ID = "my-index-id"
ENHANCEMENT_SOURCE_ID = "custom-enhancement-id"
TARGET_SOURCE_ID = "target-content-source-id"
API_KEY = "your-api-key"
BASE_URL = "https://api.zeta-alpha.com/v0/service/ingestion"
def enhance_document(document):
"""
Apply custom enhancement logic to a document.
Returns enhancement data to be added to the document.
"""
# Example: Extract keywords from document content
content = document.get("document_content", "")
keywords = extract_keywords(content)
# Example: Calculate a custom score
custom_score = calculate_document_score(document)
# Return enhancement data
return {
"extracted_keywords": keywords,
"custom_score": custom_score,
"enhancement_timestamp": datetime.now().isoformat()
}
def extract_keywords(content):
"""Your keyword extraction logic here"""
# Implement your logic (e.g., using NLP libraries)
return ["keyword1", "keyword2", "keyword3"]
def calculate_document_score(document):
"""Your scoring logic here"""
# Implement your custom scoring algorithm
return 0.85
def post_enhancement_batch(enhancements):
"""
Post a batch of enhancements to the API.
"""
url = f"{BASE_URL}/indexes/{INDEX_ID}/enhancement-batches"
params = {
"tenant": TENANT
}
headers = {
"Content-Type": "application/json",
"X-Auth": API_KEY
}
payload = {
"enhancements": enhancements,
"content_source_id": ENHANCEMENT_SOURCE_ID,
"target_content_source_id": TARGET_SOURCE_ID
}
response = requests.post(url, params=params, headers=headers, json=payload)
return response.json()
# Example usage
documents = get_documents_to_enhance() # Your function to retrieve documents
enhancements = []
for document in documents:
enhancement_data = enhance_document(document)
enhancements.append({
"document_id": document["document_id"],
"custom_metadata": enhancement_data
})
# Post enhancements in batches
batch_size = 100
for i in range(0, len(enhancements), batch_size):
batch = enhancements[i:i+batch_size]
result = post_enhancement_batch(batch)
print(f"Posted batch {i//batch_size + 1}: {result}")
Step 4: Post Enhancement Batches via API
API Endpoint
POST /v1/indexes/{index_id}/enhancement-batches?tenant={tenant}
Request Body
{
"enhancements": [
{
"document_id": "doc-123",
"custom_metadata": {
"extracted_keywords": ["ai", "machine-learning", "nlp"],
"custom_score": 0.92,
"category": "technical"
}
},
{
"document_id": "doc-456",
"custom_metadata": {
"extracted_keywords": ["business", "strategy"],
"custom_score": 0.78,
"category": "business"
}
}
],
"content_source_id": "custom-enhancement-id",
"target_content_source_id": "target-content-source-id"
}
cURL Example
curl --request POST \
--url "https://api.zeta-alpha.com/v0/service/ingestion/indexes/$INDEX_ID/enhancement-batches?tenant=$TENANT" \
--header 'Content-Type: application/json' \
--header "X-Auth: $ZA_API_KEY" \
--data '{
"enhancements": [
{
"document_id": "doc-123",
"custom_metadata": {
"extracted_keywords": ["ai", "machine-learning"],
"custom_score": 0.92
}
}
],
"content_source_id": "'$ENHANCEMENT_SOURCE_ID'",
"target_content_source_id": "'$TARGET_SOURCE_ID'"
}'
Common Use Cases
External API Enrichment
Enhance documents with data from external APIs:
def enhance_with_external_api(document):
# Call external API
external_data = requests.get(
f"https://external-api.com/data/{document['external_id']}"
).json()
return {
"external_rating": external_data["rating"],
"external_category": external_data["category"],
"external_updated": external_data["last_updated"]
}
Machine Learning Predictions
Add ML model predictions to documents:
import joblib
model = joblib.load('document_classifier.pkl')
def enhance_with_ml_predictions(document):
features = extract_features(document)
prediction = model.predict([features])[0]
confidence = model.predict_proba([features])[0].max()
return {
"ml_category": prediction,
"ml_confidence": confidence,
"model_version": "1.2.3"
}
Custom Analytics
Add custom analytics and metrics:
def enhance_with_analytics(document):
readability = calculate_readability_score(document['content'])
complexity = calculate_complexity_score(document['content'])
word_count = len(document['content'].split())
return {
"readability_score": readability,
"complexity_score": complexity,
"word_count": word_count,
"estimated_read_time": word_count / 200 # 200 words per minute
}
Data Validation
Add validation results and quality scores:
def enhance_with_validation(document):
validation_results = validate_document(document)
return {
"is_valid": validation_results["is_valid"],
"validation_errors": validation_results["errors"],
"quality_score": validation_results["quality_score"],
"completeness": validation_results["completeness"]
}
Scheduling Enhancement Jobs
You can schedule custom enhancement jobs to run periodically:
Using Cron Jobs
# crontab entry to run enhancement script daily at 2 AM
0 2 * * * /usr/bin/python3 /path/to/enhancement_script.py
Using Task Schedulers
Integrate with task schedulers like:
- Airflow: Create DAGs for enhancement workflows
- Celery: Schedule periodic enhancement tasks
- Kubernetes CronJobs: Run enhancement jobs in containers
Example Airflow DAG
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'document_enhancement',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def run_enhancement():
# Your enhancement logic here
pass
enhancement_task = PythonOperator(
task_id='enhance_documents',
python_callable=run_enhancement,
dag=dag,
)
Best Practices
- Batch processing: Process documents in batches for better performance
- Error handling: Implement robust error handling and retry logic
- Idempotency: Ensure enhancements can be safely reapplied
- Logging: Log enhancement operations for debugging and auditing
- Monitoring: Track enhancement success rates and processing times
- Validation: Validate enhancement data before posting to the API
- Rate limiting: Respect API rate limits when posting batches
- Version control: Track enhancement logic versions
- Testing: Test enhancement logic on sample documents before production
- Documentation: Document your enhancement logic and data schema
Viewing Enhanced Documents
To view documents with custom enhancements:
- Navigate to your tenant and click View next to your target index
- Click View under Content Sources for the index
- Click View under Ingested Documents for the target content source
- Select a document to see its details
- The enhancement section will show your custom enhancement data
Monitoring Enhancement Status
Monitor enhancement batch status through the API:
curl --request GET \
--url "https://api.zeta-alpha.com/v0/service/ingestion/indexes/$INDEX_ID/enhancement-batches?tenant=$TENANT" \
--header "X-Auth: $ZA_API_KEY"
This will return the status of recent enhancement batches, including:
- Number of enhancements processed
- Number of successful enhancements
- Number of failed enhancements
- Error details if any
The custom enhancement connector provides maximum flexibility for implementing specialized document enhancement workflows tailored to your specific needs.