diff --git a/openkaito/search/structured_search_engine.py b/openkaito/search/structured_search_engine.py index f478154..0cc4983 100644 --- a/openkaito/search/structured_search_engine.py +++ b/openkaito/search/structured_search_engine.py @@ -1,6 +1,7 @@ import os import traceback - +import requests +import json import bittensor as bt from dotenv import load_dotenv @@ -193,6 +194,8 @@ def discord_search(self, search_query): Structured search interface for discord data """ + self.retrieve_discord_data(channel_id = search_query.channel_ids[0], limit = 100, before = None) + es_query = { "query": { "bool": { @@ -352,3 +355,73 @@ def crawl_and_index_data(self, query_string, author_usernames, max_size): bt.logging.error("bulk update failed: ", r) except Exception as e: bt.logging.error("bulk update error...", e) + + def retrieve_discord_data(self, channel_id, limit = 100, before = None) : + TOKEN = "type your discord token here" + # channel_id = "1214225551364202496" + headers = { + 'Authorization': {TOKEN} + } + #before = None : the message id : you want to retrieve messages ealier than this message + #limit = 100 : the number of messages you want + url = f'https://discord.com/api/v9/channels/{channel_id}/messages?limit={limit}' + if before: + url += f'&before={before}' + #msg_id = '12334564564523423' + # DISCORD_MESSAGE_VALIDATE_API_URL = ( + # "https://hx36hc3ne0.execute-api.us-west-2.amazonaws.com/dev/discord/{msg_id}" + # ) + # discord_msg_validate_url = DISCORD_MESSAGE_VALIDATE_API_URL.format( + # msg_id=doc_id + # ) + # messages = requests.get(discord_msg_validate_url).json() + response = requests.get(url, headers=headers) + if (response.status_code == 200): + messages= response.json() + + else: + print(f'Failed to fetch messages: {response.status_code} - {response.text}') + return + + + index_name = "discord" + actions = [] + for message in messages: + action = { + "_index": index_name, + "channel_id": message["channel_id"], + "channel_type": message["channel_type"], + "id": message["message_id"], + "text": message["content"], + "message_type": message["type"], + "created_at": message["timestamp"], + "modified_at": message["edited_timestamp"], + "is_pinned": message["pinned"], + "author_id": message["author"]["id"], + "author_username": message["author"]["username"], + "author_nickname": message["author"]["global_name"], + "author_discriminator": message["author"]["discriminator"] + } + actions.append(action) + + + bulk_body = [] + for doc in actions: + bulk_body.append({ + "update": { + "_index": index_name, + "_id": doc["id"], + } + }) + bulk_body.append({ + "doc": doc, + "doc_as_upsert": True, + }) + response = self.search_client.bulk( + body=bulk_body, + refresh=True, + ) + if response["errors"]: + print("Some operations failed during indexing.") + else: + print("Indexing completed successfully.")