Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion openkaito/search/structured_search_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import traceback

import requests
import json
import bittensor as bt
from dotenv import load_dotenv

Expand Down Expand Up @@ -193,6 +194,8 @@ def discord_search(self, search_query):
Structured search interface for discord data
"""

self.retrieve_discord_data(channel_id = search_query.channel_ids[0], limit = 100, before = None)

es_query = {
"query": {
"bool": {
Expand Down Expand Up @@ -352,3 +355,73 @@ def crawl_and_index_data(self, query_string, author_usernames, max_size):
bt.logging.error("bulk update failed: ", r)
except Exception as e:
bt.logging.error("bulk update error...", e)

def retrieve_discord_data(self, channel_id, limit = 100, before = None) :
TOKEN = "type your discord token here"
# channel_id = "1214225551364202496"
headers = {
'Authorization': {TOKEN}
}
#before = None : the message id : you want to retrieve messages ealier than this message
#limit = 100 : the number of messages you want
url = f'https://discord.com/api/v9/channels/{channel_id}/messages?limit={limit}'
if before:
url += f'&before={before}'
#msg_id = '12334564564523423'
# DISCORD_MESSAGE_VALIDATE_API_URL = (
# "https://hx36hc3ne0.execute-api.us-west-2.amazonaws.com/dev/discord/{msg_id}"
# )
# discord_msg_validate_url = DISCORD_MESSAGE_VALIDATE_API_URL.format(
# msg_id=doc_id
# )
# messages = requests.get(discord_msg_validate_url).json()
response = requests.get(url, headers=headers)
if (response.status_code == 200):
messages= response.json()

else:
print(f'Failed to fetch messages: {response.status_code} - {response.text}')
return


index_name = "discord"
actions = []
for message in messages:
action = {
"_index": index_name,
"channel_id": message["channel_id"],
"channel_type": message["channel_type"],
"id": message["message_id"],
"text": message["content"],
"message_type": message["type"],
"created_at": message["timestamp"],
"modified_at": message["edited_timestamp"],
"is_pinned": message["pinned"],
"author_id": message["author"]["id"],
"author_username": message["author"]["username"],
"author_nickname": message["author"]["global_name"],
"author_discriminator": message["author"]["discriminator"]
}
actions.append(action)


bulk_body = []
for doc in actions:
bulk_body.append({
"update": {
"_index": index_name,
"_id": doc["id"],
}
})
bulk_body.append({
"doc": doc,
"doc_as_upsert": True,
})
response = self.search_client.bulk(
body=bulk_body,
refresh=True,
)
if response["errors"]:
print("Some operations failed during indexing.")
else:
print("Indexing completed successfully.")