from elasticsearch import Elasticsearch, helpers
import requests
import configparser
import warnings
import time
import random
import concurrent.futures
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore")
def init_es_client(config_path='./conf/config.ini'):
"""初始化并返回基于配置文件中的 Elasticsearch 客户端"""
config = configparser.ConfigParser()
config.read(config_path)
es_host = config.get('elasticsearch', 'ES_HOST')
es_user = config.get('elasticsearch', 'ES_USER')
es_password = config.get('elasticsearch', 'ES_PASSWORD')
es = Elasticsearch(
hosts=[es_host],
basic_auth=(es_user, es_password),
verify_certs=False,
ca_certs='conf/http_ca.crt'
)
return es
EMBEDDING_SERVICE_URL = "http://localhost:11434/api/embeddings"
def fetch_documents_from_elasticsearch(es_client, index="logs", query=None
, batch_size=25):
"""
从 Elasticsearch 中获取缺少嵌入的文档
"""
query = query or {
"query": {
"bool": {
"must_not": {"exists": {"field": "embedding"}}
}
},
"size": batch_size,
"sort": [{"@timestamp": "asc"}]
}
response = es_client.search(index=index, body=query, scroll="1m")
scroll_id = response["_scroll_id"]
documents = response["hits"]["hits"]
while documents:
for doc in documents:
yield doc
response = es_client.scroll(scroll_id=scroll_id, scroll="1m")
scroll_id = response["_scroll_id"]
documents = response["hits"]["hits"]
def fetch_embeddings(text):
try:
response = requests.post(
EMBEDDING_SERVICE_URL,
json={"model": "all-minilm", "prompt": text},
timeout=10
)
response.raise_for_status()
result = response.json()
logger.info("result.embedding: %s", result["embedding"])
return result.get("embedding")
except requests.exceptions.RequestException as e:
logger.error("Error fetching embedding: %s", str(e))
return None
def update_document_in_elasticsearch(es_client, doc_id, index="logs", embedding=None):
"""
更新 Elasticsearch 文档,添加嵌入数据
"""
body = {
"script": {
"source": '''
if (ctx._source.containsKey("embedding_processed_at") && ctx._source.embedding_processed_at != null) {
ctx.op = "noop";
} else {
ctx._source.embedding = params.embedding;
ctx._source.embedding_processed_at = params.timestamp;
ctx._source.processing_status = params.status;
if (params.error_message != null) {
ctx._source.error_message = params.error_message;
}
}
''',
"params": {
"embedding": embedding if embedding else None,
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%SZ'),
"status": "failed" if embedding is None else "success",
"error_message": None if embedding else "嵌入生成失败"
}
}
}
es_client.update(index=index, id=doc_id, body=body)
def process_documents(es_client, batch_size=25):
"""
主函数:获取文档,生成嵌入,并更新 Elasticsearch
"""
for doc in fetch_documents_from_elasticsearch(es_client, batch_size=batch_size):
doc_id = doc["_id"]
text_content = doc["_source"].get("content", "")
embedding = fetch_embeddings(text_content)
update_document_in_elasticsearch(es_client, doc_id, embedding=embedding)
if __name__ == "__main__":
es = init_es_client(config_path='./conf/config.ini')
process_documents(es, batch_size=25)