Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions twikit/castle_token/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .castle_token import CastleToken

__all__ = ['CastleToken']
102 changes: 102 additions & 0 deletions twikit/castle_token/castle_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import secrets
import time
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from ..client.client import Client


class CastleToken:
"""
Handles Castle Token generation for Twitter API requests.
The token is cached for 1 minute to avoid unnecessary API calls.

Parameters
----------
client : Client
The Twitter client instance
api_key : str | None, default=None
Optional API key for castle.botwitter.com
- Without API key: 3 requests/second, 100 requests/hour (default rate limits)
- With API key: Custom rate limits, higher quotas, priority support
"""

def __init__(self, client: 'Client', api_key: str | None = None) -> None:
self.client = client
self.api_key = api_key
self._castle_token: str | None = None
self._cuid: str | None = None
self._token_timestamp: float | None = None

def _generate_cuid(self) -> str:
"""
Generate a 32-character hexadecimal string for use as cuid.
Example: 169c90ba59a6f01cc46e69d2669e080b
"""
return secrets.token_hex(16)

async def generate_castle_token(self) -> str:
"""
Generate a new Castle token by:
1. Generating a 32-character hex string (cuid)
2. Setting it as the __cuid cookie
3. Sending a POST request to https://castle.botwitter.com/generate-token
4. Returning the Castle token from the response

Rate Limits:
- Default (no API key): 3 requests/second, 100 requests/hour

Returns
-------
str
The generated Castle token
"""
# Generate cuid
self._cuid = self._generate_cuid()

# Set __cuid cookie
self.client.http.cookies.set('__cuid', self._cuid)
Comment thread
heysurfer marked this conversation as resolved.

# Prepare request data
payload = {
'userAgent': self.client._user_agent,
'cuid': self._cuid
}

# Prepare headers with optional API key authentication
headers = {}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'

# Send POST request to castle token API
response = await self.client.http.post(
'https://castle.botwitter.com/generate-token',
json=payload,
headers=headers
)

# Extract and cache the castle token from response
response_data = response.json()
self._castle_token = response_data.get('token', '')
self._token_timestamp = time.time()
Comment thread
heysurfer marked this conversation as resolved.

return self._castle_token
Comment thread
heysurfer marked this conversation as resolved.

async def get_castle_token(self) -> str:
"""
Get the cached Castle token or generate a new one if not cached or expired.
Token cache expires after 60 seconds.

Returns
-------
str
The Castle token
"""
if self._castle_token is None or self._token_timestamp is None:
return await self.generate_castle_token()

# Check if token is older than 60 seconds
if time.time() - self._token_timestamp > 60:
return await self.generate_castle_token()

return self._castle_token
12 changes: 12 additions & 0 deletions twikit/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
)
from ..x_client_transaction.utils import handle_x_migration
from ..x_client_transaction import ClientTransaction
from ..castle_token import CastleToken
from .gql import GQLClient
from .v11 import V11Client

Expand All @@ -77,6 +78,12 @@ class Client:
(e.g., 'http://0.0.0.0:0000').
captcha_solver : :class:`.Capsolver` | None, default=None
See :class:`.Capsolver`.
user_agent : :class:`str` | None, default=None
Custom user agent string for requests.
castle_api_key : :class:`str` | None, default=None
Optional API key for castle.botwitter.com Castle Token service.
Without API key: 3 requests/second, 100 requests/hour (default).
With API key: Custom rate limits, higher quotas, priority support.

Examples
--------
Expand All @@ -87,6 +94,9 @@ class Client:
... auth_info_2='email@example.com',
... password='00000000'
... )

>>> # With Castle API key for better rate limits
>>> client = Client(castle_api_key='premium_key_abc123def456ghi789')
Comment thread
heysurfer marked this conversation as resolved.
"""

def __init__(
Expand All @@ -95,6 +105,7 @@ def __init__(
proxy: str | None = None,
captcha_solver: Capsolver | None = None,
user_agent: str | None = None,
castle_api_key: str | None = None,
**kwargs
) -> None:
if 'proxies' in kwargs:
Expand All @@ -111,6 +122,7 @@ def __init__(
if captcha_solver is not None:
captcha_solver.client = self
self.client_transaction = ClientTransaction()
self.castle_token = CastleToken(self, api_key=castle_api_key)

self._token = TOKEN
self._user_id = None
Expand Down
6 changes: 6 additions & 0 deletions twikit/client/v11.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ async def onboarding_task(self, guest_token, token, subtask_inputs, data = None,
if subtask_inputs is not None:
data['subtask_inputs'] = subtask_inputs

# Inject castle_token if settings_list is present
for subtask_input in subtask_inputs:
if isinstance(subtask_input, dict) and 'settings_list' in subtask_input:
castle_token = await self.base.castle_token.get_castle_token()
subtask_input['settings_list']['castle_token'] = castle_token
Comment thread
heysurfer marked this conversation as resolved.

headers = {
Comment thread
heysurfer marked this conversation as resolved.
'x-guest-token': guest_token,
'Authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA'
Expand Down