Skip to main content

Create a Custom Enhancement

A Custom Enhancement enables you to implement custom enhancement logic by posting enhancement batches directly to the API. This is a generic enhancement connector that provides maximum flexibility for implementing specialized document enhancement workflows.

This guide shows you how to create and configure a custom enhancement for your data ingestion workflows.

Prerequisites

Before you begin, ensure you have:

  1. Access to the Zeta Alpha Platform UI
  2. A tenant created
  3. An index created
  4. A content source with documents to enhance
  5. An API key with ingestion-manager role

Step 1: Create the Custom Enhancement Configuration

To create a custom enhancement connector, define a configuration file with the following structure:

{
"name": "My Custom Enhancement",
"description": "Custom enhancement logic for documents",
"is_indexable": true,
"connector": "custom_enhancement",
"connector_configuration": {
"content_source_id": "target-content-source-id"
}
}

Configuration Fields

  • content_source_id: (string) The ID of the content source that this enhancement connector enhances.

Step 2: Create the Custom Enhancement Content Source

To create your custom enhancement connector in the Zeta Alpha Platform UI:

  1. Navigate to your tenant and click View next to your target index
  2. Click View under Content Sources for the index
  3. Click Create Content Source
  4. Paste your JSON configuration
  5. Click Submit

Important: Take note of the id field in the response payload. You'll need this when posting enhancement batches.

Step 3: Implement Your Enhancement Logic

Unlike other connectors that run automatically, custom enhancements require you to implement the enhancement logic and call the API to submit enhancements.

Enhancement Workflow

  1. Monitor or query documents from the target content source
  2. Apply your custom enhancement logic
  3. Post enhancement batches to the API
  4. The platform processes and applies the enhancements to documents

Example Enhancement Logic (Python)

import requests
import json

# Configuration
TENANT = "my-tenant"
INDEX_ID = "my-index-id"
ENHANCEMENT_SOURCE_ID = "custom-enhancement-id"
TARGET_SOURCE_ID = "target-content-source-id"
API_KEY = "your-api-key"
BASE_URL = "https://api.zeta-alpha.com/v0/service/ingestion"

def enhance_document(document):
"""
Apply custom enhancement logic to a document.
Returns enhancement data to be added to the document.
"""
# Example: Extract keywords from document content
content = document.get("document_content", "")
keywords = extract_keywords(content)

# Example: Calculate a custom score
custom_score = calculate_document_score(document)

# Return enhancement data
return {
"extracted_keywords": keywords,
"custom_score": custom_score,
"enhancement_timestamp": datetime.now().isoformat()
}

def extract_keywords(content):
"""Your keyword extraction logic here"""
# Implement your logic (e.g., using NLP libraries)
return ["keyword1", "keyword2", "keyword3"]

def calculate_document_score(document):
"""Your scoring logic here"""
# Implement your custom scoring algorithm
return 0.85

def post_enhancement_batch(enhancements):
"""
Post a batch of enhancements to the API.
"""
url = f"{BASE_URL}/indexes/{INDEX_ID}/enhancement-batches"
params = {
"tenant": TENANT
}
headers = {
"Content-Type": "application/json",
"X-Auth": API_KEY
}
payload = {
"enhancements": enhancements,
"content_source_id": ENHANCEMENT_SOURCE_ID,
"target_content_source_id": TARGET_SOURCE_ID
}

response = requests.post(url, params=params, headers=headers, json=payload)
return response.json()

# Example usage
documents = get_documents_to_enhance() # Your function to retrieve documents
enhancements = []

for document in documents:
enhancement_data = enhance_document(document)
enhancements.append({
"document_id": document["document_id"],
"custom_metadata": enhancement_data
})

# Post enhancements in batches
batch_size = 100
for i in range(0, len(enhancements), batch_size):
batch = enhancements[i:i+batch_size]
result = post_enhancement_batch(batch)
print(f"Posted batch {i//batch_size + 1}: {result}")

Step 4: Post Enhancement Batches via API

API Endpoint

POST /v1/indexes/{index_id}/enhancement-batches?tenant={tenant}

Request Body

{
"enhancements": [
{
"document_id": "doc-123",
"custom_metadata": {
"extracted_keywords": ["ai", "machine-learning", "nlp"],
"custom_score": 0.92,
"category": "technical"
}
},
{
"document_id": "doc-456",
"custom_metadata": {
"extracted_keywords": ["business", "strategy"],
"custom_score": 0.78,
"category": "business"
}
}
],
"content_source_id": "custom-enhancement-id",
"target_content_source_id": "target-content-source-id"
}

cURL Example

curl --request POST \
--url "https://api.zeta-alpha.com/v0/service/ingestion/indexes/$INDEX_ID/enhancement-batches?tenant=$TENANT" \
--header 'Content-Type: application/json' \
--header "X-Auth: $ZA_API_KEY" \
--data '{
"enhancements": [
{
"document_id": "doc-123",
"custom_metadata": {
"extracted_keywords": ["ai", "machine-learning"],
"custom_score": 0.92
}
}
],
"content_source_id": "'$ENHANCEMENT_SOURCE_ID'",
"target_content_source_id": "'$TARGET_SOURCE_ID'"
}'

Common Use Cases

External API Enrichment

Enhance documents with data from external APIs:

def enhance_with_external_api(document):
# Call external API
external_data = requests.get(
f"https://external-api.com/data/{document['external_id']}"
).json()

return {
"external_rating": external_data["rating"],
"external_category": external_data["category"],
"external_updated": external_data["last_updated"]
}

Machine Learning Predictions

Add ML model predictions to documents:

import joblib

model = joblib.load('document_classifier.pkl')

def enhance_with_ml_predictions(document):
features = extract_features(document)
prediction = model.predict([features])[0]
confidence = model.predict_proba([features])[0].max()

return {
"ml_category": prediction,
"ml_confidence": confidence,
"model_version": "1.2.3"
}

Custom Analytics

Add custom analytics and metrics:

def enhance_with_analytics(document):
readability = calculate_readability_score(document['content'])
complexity = calculate_complexity_score(document['content'])
word_count = len(document['content'].split())

return {
"readability_score": readability,
"complexity_score": complexity,
"word_count": word_count,
"estimated_read_time": word_count / 200 # 200 words per minute
}

Data Validation

Add validation results and quality scores:

def enhance_with_validation(document):
validation_results = validate_document(document)

return {
"is_valid": validation_results["is_valid"],
"validation_errors": validation_results["errors"],
"quality_score": validation_results["quality_score"],
"completeness": validation_results["completeness"]
}

Scheduling Enhancement Jobs

You can schedule custom enhancement jobs to run periodically:

Using Cron Jobs

# crontab entry to run enhancement script daily at 2 AM
0 2 * * * /usr/bin/python3 /path/to/enhancement_script.py

Using Task Schedulers

Integrate with task schedulers like:

  • Airflow: Create DAGs for enhancement workflows
  • Celery: Schedule periodic enhancement tasks
  • Kubernetes CronJobs: Run enhancement jobs in containers

Example Airflow DAG

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'document_enhancement',
default_args=default_args,
schedule_interval=timedelta(days=1),
)

def run_enhancement():
# Your enhancement logic here
pass

enhancement_task = PythonOperator(
task_id='enhance_documents',
python_callable=run_enhancement,
dag=dag,
)

Best Practices

  1. Batch processing: Process documents in batches for better performance
  2. Error handling: Implement robust error handling and retry logic
  3. Idempotency: Ensure enhancements can be safely reapplied
  4. Logging: Log enhancement operations for debugging and auditing
  5. Monitoring: Track enhancement success rates and processing times
  6. Validation: Validate enhancement data before posting to the API
  7. Rate limiting: Respect API rate limits when posting batches
  8. Version control: Track enhancement logic versions
  9. Testing: Test enhancement logic on sample documents before production
  10. Documentation: Document your enhancement logic and data schema

Viewing Enhanced Documents

To view documents with custom enhancements:

  1. Navigate to your tenant and click View next to your target index
  2. Click View under Content Sources for the index
  3. Click View under Ingested Documents for the target content source
  4. Select a document to see its details
  5. The enhancement section will show your custom enhancement data

Monitoring Enhancement Status

Monitor enhancement batch status through the API:

curl --request GET \
--url "https://api.zeta-alpha.com/v0/service/ingestion/indexes/$INDEX_ID/enhancement-batches?tenant=$TENANT" \
--header "X-Auth: $ZA_API_KEY"

This will return the status of recent enhancement batches, including:

  • Number of enhancements processed
  • Number of successful enhancements
  • Number of failed enhancements
  • Error details if any

The custom enhancement connector provides maximum flexibility for implementing specialized document enhancement workflows tailored to your specific needs.