-
Notifications
You must be signed in to change notification settings - Fork 2
Support socket mode slack bot #117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| { | ||
| "display_information": { | ||
| "name": "seuron", | ||
| "description": "SEUnglab neuRON pipeline task manager" | ||
| }, | ||
| "features": { | ||
| "bot_user": { | ||
| "display_name": "seuronbot", | ||
| "always_online": true | ||
| } | ||
| }, | ||
| "oauth_config": { | ||
| "scopes": { | ||
| "bot": [ | ||
| "chat:write", | ||
| "chat:write.customize", | ||
| "files:read", | ||
| "files:write", | ||
| "users:read", | ||
| "reactions:read", | ||
| "reactions:write", | ||
| "channels:history", | ||
| "groups:history", | ||
| "im:history", | ||
| "mpim:history", | ||
| "channels:read" | ||
| ] | ||
| } | ||
| }, | ||
| "settings": { | ||
| "event_subscriptions": { | ||
| "bot_events": [ | ||
| "message.channels", | ||
| "message.groups", | ||
| "message.im", | ||
| "message.mpim", | ||
| "reaction_added" | ||
| ] | ||
| }, | ||
| "interactivity": { | ||
| "is_enabled": true | ||
| }, | ||
| "org_deploy_enabled": false, | ||
| "socket_mode_enabled": true | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -5,15 +5,23 @@ | |||||||||||
| import itertools | ||||||||||||
| import difflib | ||||||||||||
| import concurrent.futures | ||||||||||||
| import logging | ||||||||||||
| import time | ||||||||||||
| import tenacity | ||||||||||||
| from threading import Event | ||||||||||||
| from slack_sdk.rtm_v2 import RTMClient | ||||||||||||
| from slack_sdk.socket_mode import SocketModeClient | ||||||||||||
| from slack_sdk.socket_mode.request import SocketModeRequest | ||||||||||||
| from slack_sdk.socket_mode.response import SocketModeResponse | ||||||||||||
| from slack_sdk.web import WebClient | ||||||||||||
| from bot_info import botid, workerid, broker_url | ||||||||||||
| from bot_utils import replyto, extract_command, update_slack_thread, create_run_token, send_message | ||||||||||||
| from airflow_api import check_running, set_variable, get_variable | ||||||||||||
| from kombu_helper import get_message, visible_messages | ||||||||||||
| from docker_helper import get_registry_data | ||||||||||||
|
|
||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| _help_trigger = ["help"] | ||||||||||||
|
|
||||||||||||
|
|
@@ -71,21 +79,32 @@ class SeuronBot: | |||||||||||
|
|
||||||||||||
| hello_listeners = [] | ||||||||||||
|
|
||||||||||||
| def __init__(self, slack_token=None): | ||||||||||||
| def __init__(self, slack_token=None, app_token=None): | ||||||||||||
| self.task_owner = "seuronbot" | ||||||||||||
| self.slack_token = slack_token | ||||||||||||
| self.app_token = app_token | ||||||||||||
| self.web_client = WebClient(token=slack_token) | ||||||||||||
|
|
||||||||||||
| if self.app_token: | ||||||||||||
| logger.info("Initializing bot in Socket Mode") | ||||||||||||
| self.socket_client = SocketModeClient( | ||||||||||||
| app_token=self.app_token, | ||||||||||||
| web_client=self.web_client, | ||||||||||||
| ) | ||||||||||||
| self.socket_client.socket_mode_request_listeners.append( | ||||||||||||
| self._handle_socket_mode_request | ||||||||||||
| ) | ||||||||||||
| else: | ||||||||||||
| logger.info("Initializing bot in RTM mode") | ||||||||||||
| self.rtmclient = RTMClient(token=slack_token) | ||||||||||||
| self.rtmclient.on("message")(functools.partial(self.process_message.__func__, self)) | ||||||||||||
| self.rtmclient.on("reaction_added")(functools.partial(self.process_reaction.__func__, self)) | ||||||||||||
| self.rtmclient.on("hello")(functools.partial(self.process_hello.__func__, self)) | ||||||||||||
|
|
||||||||||||
| self.rtmclient = RTMClient(token=slack_token) | ||||||||||||
| self.rtmclient.on("message")(functools.partial(self.process_message.__func__, self)) | ||||||||||||
| self.rtmclient.on("reaction_added")(functools.partial(self.process_reaction.__func__, self)) | ||||||||||||
| self.rtmclient.on("hello")(functools.partial(self.process_hello.__func__, self)) | ||||||||||||
| self.executor = concurrent.futures.ThreadPoolExecutor() | ||||||||||||
|
|
||||||||||||
| def update_task_owner(self, msg): | ||||||||||||
| sc = self.rtmclient.web_client | ||||||||||||
| rc = sc.users_info( | ||||||||||||
| user=msg['user'] | ||||||||||||
| ) | ||||||||||||
| rc = self.web_client.users_info(user=msg['user']) | ||||||||||||
| if rc["ok"]: | ||||||||||||
| self.task_owner = rc["user"]["profile"]["display_name"] | ||||||||||||
|
|
||||||||||||
|
|
@@ -137,7 +156,7 @@ def filter_msg(self, msg): | |||||||||||
| if re.search(r"^{}[\s,:]".format(workerid), text, re.IGNORECASE): | ||||||||||||
| return True | ||||||||||||
|
|
||||||||||||
| def process_message(self, client: RTMClient, event: dict): | ||||||||||||
| def process_message(self, client, event: dict): | ||||||||||||
| if self.filter_msg(event) or event.get("from_jupyter", False): | ||||||||||||
| handled = False | ||||||||||||
| check_image_updates(event) | ||||||||||||
|
|
@@ -154,15 +173,32 @@ def process_message(self, client: RTMClient, event: dict): | |||||||||||
| if not event.get("from_jupyter", False): | ||||||||||||
| self.update_task_owner(event) | ||||||||||||
|
|
||||||||||||
| def process_reaction(self, client: RTMClient, event: dict): | ||||||||||||
| def process_reaction(self, client, event: dict): | ||||||||||||
| print("reaction added") | ||||||||||||
| print(json.dumps(event, indent=4)) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def process_hello(self, client: RTMClient, event: dict): | ||||||||||||
| def process_hello(self, client, event: dict): | ||||||||||||
| for listener in self.hello_listeners: | ||||||||||||
| listener() | ||||||||||||
|
Comment on lines
181
to
182
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 It's better to use the
Suggested change
|
||||||||||||
|
|
||||||||||||
| def _handle_socket_mode_request(self, client: SocketModeClient, req: SocketModeRequest): | ||||||||||||
| """Route Socket Mode envelope to the appropriate event handler.""" | ||||||||||||
| response = SocketModeResponse(envelope_id=req.envelope_id) | ||||||||||||
| client.send_socket_mode_response(response) | ||||||||||||
|
|
||||||||||||
| if req.type == "events_api": | ||||||||||||
| event = req.payload.get("event", {}) | ||||||||||||
| event_type = event.get("type", "") | ||||||||||||
|
|
||||||||||||
| if event_type == "message": | ||||||||||||
| self.process_message(client, event) | ||||||||||||
| elif event_type == "reaction_added": | ||||||||||||
| self.process_reaction(client, event) | ||||||||||||
| else: | ||||||||||||
| logger.debug("Unhandled event type: %s", event_type) | ||||||||||||
| elif req.type == "hello": | ||||||||||||
| self.process_hello(client, {}) | ||||||||||||
|
|
||||||||||||
| @classmethod | ||||||||||||
| def on_hello(cls): | ||||||||||||
| def __call__(*args, **kwargs): | ||||||||||||
|
|
@@ -219,7 +255,7 @@ def new_message_listener(context): | |||||||||||
| wait=tenacity.wait_random_exponential(multiplier=1, max=60), | ||||||||||||
| ) | ||||||||||||
| def fetch_bot_messages(self, queue="bot-message-queue"): | ||||||||||||
| client = self.rtmclient.web_client | ||||||||||||
| client = self.web_client | ||||||||||||
| while True: | ||||||||||||
| msg_payload = get_message(broker_url, queue, timeout=30) | ||||||||||||
| if not msg_payload: | ||||||||||||
|
|
@@ -255,7 +291,13 @@ def start(self): | |||||||||||
| futures.append(self.executor.submit(self.fetch_bot_messages)) | ||||||||||||
| futures.append(self.executor.submit(self.fetch_jupyter_messages)) | ||||||||||||
| try: | ||||||||||||
| self.rtmclient.start() | ||||||||||||
| if self.app_token: | ||||||||||||
| self.socket_client.connect() | ||||||||||||
| logger.info("Socket Mode client connected") | ||||||||||||
| self.process_hello(self.socket_client, {}) | ||||||||||||
| Event().wait() | ||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟢 Consider adding a brief comment here to clarify that this blocks the main thread to keep the Socket Mode connection alive while background tasks continue in their threads.
Suggested change
|
||||||||||||
| else: | ||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
🟡 The manual call to `self.process_hello` here is likely redundant because `_handle_socket_mode_request` already handles the `hello` event type sent by Slack upon connection. This could lead to the hello listeners being executed twice.
Suggested change
|
||||||||||||
| self.rtmclient.start() | ||||||||||||
| except Exception: | ||||||||||||
| pass | ||||||||||||
| logger.exception("Error in bot main loop") | ||||||||||||
| concurrent.futures.wait(futures) | ||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.