from elasticsearch import Elasticsearch, helpersimport requestsimport configparserimport warningsimport timeimport randomimport concurrent.futuresimport logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)warnings.filterwarnings("ignore")def init_es_client(config_path='./conf/config.ini'): """初始化并返回基于配置文件中的 Elasticsearch 客户端""" config = configparser.ConfigParser() config.read(config_path) es_host = config.get('elasticsearch', 'ES_HOST') es_user = config.get('elasticsearch', 'ES_USER') es_password = config.get('elasticsearch', 'ES_PASSWORD') es = Elasticsearch( hosts=[es_host], basic_auth=(es_user, es_password), verify_certs=False, ca_certs='conf/http_ca.crt' ) return esEMBEDDING_SERVICE_URL = "http://localhost:11434/api/embeddings"def fetch_documents_from_elasticsearch(es_client, index="logs", query=None
, batch_size=25): """ 从 Elasticsearch 中获取缺少嵌入的文档 """ query = query or { "query": { "bool": { "must_not": {"exists": {"field": "embedding"}} } }, "size": batch_size, "sort": [{"@timestamp": "asc"}] } response = es_client.search(index=index, body=query, scroll="1m") scroll_id = response["_scroll_id"] documents = response["hits"]["hits"] while documents: for doc in documents: yield doc response = es_client.scroll(scroll_id=scroll_id, scroll="1m") scroll_id = response["_scroll_id"] documents = response["hits"]["hits"]def fetch_embeddings(text): try: response = requests.post( EMBEDDING_SERVICE_URL, json={"model": "all-minilm", "prompt": text}, timeout=10 ) response.raise_for_status() result = response.json() logger.info("result.embedding: %s", result["embedding"]) return result.get("embedding") except requests.exceptions.RequestException as e: logger.error("Error fetching embedding: %s", str(e)) return Nonedef update_document_in_elasticsearch(es_client, doc_id, index="logs", embedding=None): """ 更新 Elasticsearch 文档,添加嵌入数据 """ body = { "script": { "source": ''' if (ctx._source.containsKey("embedding_processed_at") && ctx._source.embedding_processed_at != null) { ctx.op = "noop"; } else { ctx._source.embedding = params.embedding; ctx._source.embedding_processed_at = params.timestamp; ctx._source.processing_status = params.status; if (params.error_message != null) { ctx._source.error_message = params.error_message; } } ''', "params": { "embedding": embedding if embedding else None, "timestamp": time.strftime('%Y-%m-%dT%H:%M:%SZ'), "status": "failed" if embedding is None else "success", "error_message": None if embedding else "嵌入生成失败" } } } es_client.update(index=index, id=doc_id, body=body)def process_documents(es_client, batch_size=25): """ 主函数:获取文档,生成嵌入,并更新 Elasticsearch """ for doc in fetch_documents_from_elasticsearch(es_client, batch_size=batch_size): doc_id = doc["_id"] text_content = doc["_source"].get("content", "") embedding = fetch_embeddings(text_content) update_document_in_elasticsearch(es_client, doc_id, embedding=embedding)if __name__ == "__main__": es = init_es_client(config_path='./conf/config.ini') process_documents(es, batch_size=25)