diff --git a/.dockerignore b/.dockerignore index 256108d..8898e7a 100644 --- a/.dockerignore +++ b/.dockerignore @@ -12,3 +12,7 @@ bot/docs config.yaml .env + +accounts/ +backups/ +data-dir/ diff --git a/Dockerfile b/Dockerfile index c406581..bd21436 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,9 +5,10 @@ COPY requirements.txt . COPY bot/requirements.txt bot/requirements.txt RUN pip install --no-cache-dir -r requirements.txt \ - && if [ -f bot/requirements.txt ]; then pip install --no-cache-dir -r bot/requirements.txt; fi + && if [ -f bot/requirements.txt ]; then pip install --no-cache-dir -r bot/requirements.txt; fi \ + && mkdir -p modules COPY . . -ENTRYPOINT ["python", "monitor.py"] +ENTRYPOINT ["python", "main.py"] CMD [] diff --git a/README.md b/README.md index efbcd9c..07557e4 100644 --- a/README.md +++ b/README.md @@ -1,95 +1,33 @@ -# [Status App Community Monitoring](https://status.app/) - -Monitoring tool for Status App communities. **No personal data is collected from users.** - - -| Field | Hashed | Description | -|:----------------------|:---------|:------------------------------------------------------------| -| **id** | **Yes** | The message's ID | -| **whisper_timestamp** | No | The whisper timestamp of the message | -| **from** | **Yes** | The public key of the user | -| **message_type** | No | The message type | -| **seen** | No | True if the message has been seen otherwise False | -| **chat_id** | No | The chat ID is a combination of community ID and channel ID | -| **community_id** | No | The ID of the community | -| **response_to** | **Yes** | Ithe public key of the user who the response is for | -| **timestamp** | No | The timestamp of the message | -| **deleted** | No | True if the message was deleted otherwise False | - -Status Bot account information can be found in [`config.yaml`](./config.yaml). - -## How it works - -```mermaid -graph LR - subgraph Communities[Status App] - subgraph Status[Status Community] - StatusMessages[Messages] - StatusInfo[Information] - end - subgraph Logos[Logos Community] - LogosMessages[Messages] - LogosInfo[Information] - end - - end - - subgraph Bot[Docker Container] - RawDataLocal[(Raw Data)] - Script[monitor.py] - end - - subgraph IFT[IFT Infrastructure] - RawDataIFT[(Raw Data)] - ProcessedDataIFT[(Processed Data)] - - end - - Communities <--> |class Account| Script - Script --> |SHA256| RawDataLocal - RawDataLocal --> |Airbyte| RawDataIFT - RawDataIFT --> |dbt| ProcessedDataIFT -``` +# Status Bot -# Setup +## Description -## Environment Variables +This repository allow to run a Bot for the [Status App](https://status.app). The bot can be used for multiple reason: +- [Interracting with Status Chats](./docs/usage/messaging.md) +- [Monitoring Community](./docs/usage/monitoring.md) -- `POSTGRES_USERNAME` - Postgres username. -- `POSTGRES_PASSWORD` - Postgres password. -- `POSTGRES_DATABASE` - The database name in the Postgres connection. -- `POSTGRES_HOST` - The Postgres host name that will be remotely connected to. -- `POSTGRES_PORT` - The Postgres port that will be remotely connected to. -- `STATUS_DISPLAY_NAME` - The Status display name that will be used to create an account. -- `STATUS_PASSWORD` - The Status password that will be used to create an account. -- `STATUS_MNEMONIC` - The mnemonic used to recover the account. If passed a `.bkp` file will be loaded as well. Use this when you want to login to a bot account via Status App, join a community / leave community and export the `.bkp` file. -- `STATUS_INFURA_TOKEN` - [Infura token](https://www.infura.io/) is required for **token gated communities** -- `STATUS_COINGECKO_API_KEY` - [Coingecko API Key](https://www.coingecko.com/) is required for **token gated communities** -## Docker deployement +## Setup -You can use the `docker-compose.yaml` to run the project. +The recommanded deployement option is with docker compose. -Example of `.env` file to use -``` -# Status Backend connection -STATUS_DISPLAY_NAME = "bot-status" -STATUS_PASSWORD = "ChangeThisPassword" -STATUS_MNEMONIC= "test test test test test test test test test test test test" - -# Necessary for communities that have tokens -STATUS_INFURA_TOKEN = "Token from https://www.infura.io/" -STATUS_COINGECKO_API_KEY = "Token from https://www.coingecko.com/" - -# Database config -POSTGRES_HOST=database -POSTGRES_PORT=5432 -POSTGRES_DATABASE=status-bot -POSTGRES_USERNAME=status -POSTGRES_PASSWORD=ChangeThisOneAlso +### Docker deployment + +The [docker compose](./docker-compose.yaml) deployement will spin 3 containers: +- Status Backend: to interract with the Logos Delivery network used by Status App. +- Status Bot: the instance of the bot +- A Postgres database + +You can run it with: + +```bash +docker-compose up -d --build ``` +it will require a `.env` file for secrets and `config.yaml`. The configuration is detailed in [the documentation](./docs/usage//configuration.md). -## Python +### Python + +You can also run the bot with python, it will require a Status Backend instance available. 1. Setup environment. [Conda](https://www.anaconda.com/) example: ```bash @@ -98,14 +36,10 @@ conda create -n status-monitoring python=3.12 **Note**: Code has been tested with **Python 3.12**. -2. Install `monitor.py` and `bot` requirements +2. Install requirements ```bash -# To run Status bot -pip install -r ./bot/requirements.txt - -# To run monitor.py -pip install -r ./requirements.txt +pip install -r requirements.txt ``` **Note**: If you are on Windows, you will have to install `psycopg2` instead of `psycopg2-binary`. @@ -114,13 +48,11 @@ pip install -r ./requirements.txt If you have already created a Status account and want to use it with it's current data, please make sure you export the `.bkp` file and put it in folder **backups** and have the following `.env` variables: -- `STATUS_DISPLAY_NAME` -- `STATUS_PASSWORD` -- `STATUS_MNEMONIC` +- `BOT_DISPLAY_NAME` +- `BOT_PASSWORD` +- `BOT_MNEMONIC_PHRASE` -## Files -- `monitor.py` - Status community message monitoring. It will download and upload messages in parallel. # Guidelines diff --git a/bot/__init__.py b/bot/__init__.py index d607e0e..548b641 100644 --- a/bot/__init__.py +++ b/bot/__init__.py @@ -1,2 +1,4 @@ from .account import Account from .logger import Logger +from .database import Database +from .config import Config diff --git a/bot/account.py b/bot/account.py index 8a1fd8a..12270e0 100644 --- a/bot/account.py +++ b/bot/account.py @@ -158,9 +158,14 @@ def login(self, password: str, key_uid: Optional[str] = None, display_name: Opti # Wallet usage if infura_token: params["infuraToken"] = infura_token + else: + self.logger.info("No infura token") if coingecko_api_key: params["coingeckoApiKey"] = coingecko_api_key + else: + self.logger.info("No coingecko token") + if infura_token and coingecko_api_key: self.__is_wallet_set = True @@ -181,7 +186,6 @@ def login(self, password: str, key_uid: Optional[str] = None, display_name: Opti self.__info = { "public_key": event["public-key"], "url": None, - "emojis": event["emojiHash"], "key_uid": event["key-uid"], "compressed_key": event["compressedKey"], "mnemonic": event.get("mnemonic", mnemonic), @@ -388,7 +392,6 @@ def contacts(self) -> dict[str, dict]: "url": self.__call_rpc("urls", "shareUserURLWithData", [contact["id"]]).get("result"), "chat_id": contact["id"], "key_uid": contact["compressedKey"], - "emojis": contact["emojiHash"], "contact_state": self.__mappings["contact_request"][contact["contactRequestState"]], "external_contact_state": self.__mappings["contact_request"][contact["contactRequestRemoteState"]], "has_added_us": contact["hasAddedUs"], @@ -523,6 +526,7 @@ def balance(self) -> pd.DataFrame: params = [[self.info["wallet_address"]], True] results = self.__call_rpc("wallets", "fetchOrGetCachedWalletBalances", params).get("result", {}).get(self.info["wallet_address"].lower(), []) if not results: + self.logger.warning(f"No result in RPC response") return empty.copy() balance = pd.DataFrame(results) @@ -990,8 +994,8 @@ def __load_backup(self): self.__signal.get("messages.new") self.logger.info(f"Successfully loaded file!") break - - self.logger.warning(error) + else: + self.logger.warning(f"Error while loading the backup: {error}") def __call_rpc(self, prefix: str, method_name: str, params: Optional[Union[list, dict]] = None) -> dict: """ diff --git a/bot/config.py b/bot/config.py new file mode 100644 index 0000000..31724e9 --- /dev/null +++ b/bot/config.py @@ -0,0 +1,86 @@ +from pydantic import BaseModel +from pydantic_settings import BaseSettings, SettingsConfigDict +from pydantic_settings.sources import YamlConfigSettingsSource + + +class BackendConfig(BaseModel): + domain: str = "localhost" + port: int = 8080 + is_secure: bool = False + + +class BotConfig(BaseModel): + display_name: str = "" + public_key: str = "" + password: str = "" + mnemonic_phrase: str = "" + init_account: bool = False + compressed_key: str = "" + infura_token: str = "" + coingecko_api_key: str = "" + +class DatabaseConfig(BaseModel): + type: str = "postgres" + host: str = "database" + port: int = 5432 + user: str = "" + password: str = "" + name: str = "" + schema: str = "public" + tables: dict = {} + + +class ApiConfig(BaseModel): + enable: bool = True + host: str = "0.0.0.0" + port: int = 8081 + api_key: str = "" + + +class MetricsConfig(BaseModel): + enabled: bool = False + host: str = "0.0.0.0" + port: int = 8000 + + +class FilesConfig(BaseModel): + current_state: str = "dates.pkl" + + +class ModulesConfig(BaseModel): + directories: list[str] = ["./modules", "bot/modules"] + enabled: list[str] = [] + settings: dict = {} + + +class Config(BaseSettings): + model_config = SettingsConfigDict( + env_nested_delimiter="__", + extra="ignore", + ) + + sleep: int = 10 + files: FilesConfig = FilesConfig() + bot: BotConfig = BotConfig() + backend: BackendConfig = BackendConfig() + api: ApiConfig = ApiConfig() + modules: ModulesConfig = ModulesConfig() + metrics: MetricsConfig = MetricsConfig() + database: DatabaseConfig = DatabaseConfig() + + _yaml_file = "./config.yaml" + + @classmethod + def settings_customise_sources( + cls, + settings_cls, + init_settings, + env_settings, + dotenv_settings, + file_secret_settings, + ): + return ( + YamlConfigSettingsSource(settings_cls, yaml_file=cls._yaml_file), + env_settings, + dotenv_settings, + ) diff --git a/bot/database.py b/bot/database.py new file mode 100644 index 0000000..8d015c2 --- /dev/null +++ b/bot/database.py @@ -0,0 +1,82 @@ +import logging + +import pandas as pd +from typing import Optional +from sqlalchemy import create_engine, inspect, text +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.exc import IntegrityError, NoSuchTableError + +from .models import Base + + +class Database: + + def __init__(self, db_type: str, host: str, port: int, user: str, password: str, name: str, schema: str): + self._type = db_type + self._schema = schema + self._url = self._build_url(db_type, host, port, user, password, name) + self._engine = create_engine(self._url) + self._logger = logging.getLogger("status_bot.database") + + def init_tables(self): + Base.metadata.create_all(self._engine) + self._logger.info("Database tables initialized") + + def _build_url(self, db_type: str, host: str, port: int, user: str, password: str, name: str) -> str: + if db_type == "postgres": + return f"postgresql://{user}:{password}@{host}:{port}/{name}" + elif db_type == "sqlite": + return f"sqlite:///{name}" + raise ValueError(f"Unsupported database type: {db_type}") + + def insert(self, data: pd.DataFrame, table_name: str, json_columns: Optional[list] = None): + if len(data) == 0: + return + + if self._type == "postgres": + with self._engine.begin() as conn: + conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {self._schema}")) + + data.columns = [column.lower() for column in data.columns] + + params = { + "name": table_name, + "con": self._engine, + "schema": self._schema, + "if_exists": "append", + "index": False, + } + if json_columns and self._type == "postgres": + params["dtype"] = {col: JSONB for col in json_columns} + + existing_columns = self.get_columns(table_name) + if existing_columns: + for column in data.columns: + if column not in existing_columns: + with self._engine.begin() as conn: + conn.execute( + text(f"ALTER TABLE {self._schema}.{table_name} ADD COLUMN {column} TEXT") + ) + + try: + data.to_sql(**params) + except IntegrityError: + self._logger.warning(f"Duplicate rows skipped in {table_name}") + + def execute(self, query: str): + with self._engine.begin() as conn: + conn.execute(text(query)) + + def to_pandas(self, query: str) -> pd.DataFrame: + return pd.read_sql(query, self._engine) + + def get_columns(self, table_name: str) -> list[str]: + insp = inspect(self._engine) + try: + columns = insp.get_columns(table_name, schema=self._schema) + except NoSuchTableError: + return [] + return [col["name"] for col in columns] + + def close(self): + self._engine.dispose() diff --git a/bot/metrics.py b/bot/metrics.py new file mode 100644 index 0000000..5147ddc --- /dev/null +++ b/bot/metrics.py @@ -0,0 +1,34 @@ +import logging + +from bot.config import MetricsConfig +from bot.modules.manager import ModuleManager +from prometheus_client import start_http_server, Gauge, Counter + + +def start_prometheus(metrics_config: MetricsConfig, manager: ModuleManager, logger: logging.Logger): + if not metrics_config.enabled: + logger.info("Prometheus metrics exporter disabled") + return + + health = Gauge("status_bot_health", "Bot health status") + version = Gauge("status_bot_version", "Bot version", ["version"]) + module_loaded = Gauge( + "status_bot_module_loaded", "Module loaded", ["module"] + ) + module_errors = Counter( + "status_bot_module_errors_total", "Module errors", ["module"] + ) + module_restarts = Counter( + "status_bot_module_restarts_total", "Module restarts", ["module"] + ) + + health.set(1) + version.labels(version="0.1.0").set(1) + + for module_name in manager.module_names: + module_loaded.labels(module=module_name).set(1) + + host = metrics_config.host + port = metrics_config.port + start_http_server(port, host) + logger.info(f"Prometheus metrics exporter server started on {host}:{port}") diff --git a/bot/models/__init__.py b/bot/models/__init__.py new file mode 100644 index 0000000..483df24 --- /dev/null +++ b/bot/models/__init__.py @@ -0,0 +1,3 @@ +from .base import Base +from .message import ReceivedMessage +from .chat import ReceivedChat diff --git a/bot/models/base.py b/bot/models/base.py new file mode 100644 index 0000000..fa2b68a --- /dev/null +++ b/bot/models/base.py @@ -0,0 +1,5 @@ +from sqlalchemy.orm import DeclarativeBase + + +class Base(DeclarativeBase): + pass diff --git a/bot/models/chat.py b/bot/models/chat.py new file mode 100644 index 0000000..70aeb74 --- /dev/null +++ b/bot/models/chat.py @@ -0,0 +1,12 @@ +from sqlalchemy import Column, DateTime, String + +from .base import Base + + +class ReceivedChat(Base): + __tablename__ = "received_chats" + + id = Column(String, primary_key=True) + type = Column(String, nullable=True) + name = Column(String, nullable=True) + received_timestamp = Column(DateTime, nullable=True) diff --git a/bot/models/message.py b/bot/models/message.py new file mode 100644 index 0000000..5a2ccaa --- /dev/null +++ b/bot/models/message.py @@ -0,0 +1,33 @@ +from sqlalchemy import Boolean, Column, DateTime, Integer, JSON, String, BigInteger + +from .base import Base + + +class ReceivedMessage(Base): + __tablename__ = "received_messages" + + id = Column(String, primary_key=True) + whisper_timestamp = Column(DateTime, nullable=True) + from_ = Column("from", String, nullable=True) + alias = Column(String, nullable=True) + seen = Column(Boolean, nullable=True) + quoted_message = Column(JSON, nullable=True) + rtl = Column(Boolean, nullable=True) + parsed_text = Column(JSON, nullable=True) + line_count = Column(Integer, nullable=True) + text = Column(String, nullable=True) + chat_id = Column(String, nullable=True) + local_chat_id = Column(String, nullable=True) + clock = Column(BigInteger, nullable=True) + replace = Column(String, nullable=True) + response_to = Column(String, nullable=True) + ens_name = Column(String, nullable=True) + display_name = Column(String, nullable=True) + gap_parameters = Column(JSON, nullable=True) + timestamp = Column(DateTime, nullable=True) + content_type = Column(Integer, nullable=True) + message_type = Column(Integer, nullable=True) + contact_request_state = Column(Integer, nullable=True) + compressed_key = Column(String, nullable=True) + emoji_hash = Column(JSON, nullable=True) + received_timestamp = Column(DateTime, nullable=True) diff --git a/bot/modules/__init__.py b/bot/modules/__init__.py new file mode 100644 index 0000000..aa7ce1d --- /dev/null +++ b/bot/modules/__init__.py @@ -0,0 +1,4 @@ +from .base import BaseModule, ModuleType, ModuleConfig, ModuleContext +from .manager import ModuleManager + +__all__ = ["BaseModule", "ModuleType", "ModuleConfig", "ModuleContext", "ModuleManager"] diff --git a/bot/modules/api_server.py b/bot/modules/api_server.py new file mode 100644 index 0000000..a091455 --- /dev/null +++ b/bot/modules/api_server.py @@ -0,0 +1,62 @@ +import threading + +import uvicorn +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse + +from bot.modules.base import BaseModule, ModuleType + + +class APIServerModule(BaseModule): + + @property + def module_type(self) -> ModuleType: + return ModuleType.SERVICE + + def on_start(self): + api_config = self.ctx.shared_state["config"].api + self._host = api_config.host + self._port = api_config.port + self._app: FastAPI = self.ctx.shared_state["fastapi_app"] + self._server = None + self._add_auth_middleware(api_config.api_key) + + def _add_auth_middleware(self, api_key: str): + if not api_key: + return + + exempt = {"/health", "/docs", "/redoc", "/openapi.json"} + + @self._app.middleware("http") + async def require_api_key(request: Request, call_next): + if request.url.path in exempt: + return await call_next(request) + if request.headers.get("X-API-Key") != api_key: + return JSONResponse( + status_code=401, + content={"detail": "Invalid or missing API key"}, + ) + return await call_next(request) + + def execute(self): + if not self.ctx.shared_state["config"].api.enable: + self.ctx.logger.info("API server is disabled, skipping startup") + return + + config = uvicorn.Config( + self._app, + host=self._host, + port=self._port, + log_level="info", + ) + server = uvicorn.Server(config) + self._server = server + thread = threading.Thread(target=server.run, daemon=True) + thread.start() + self.ctx.stop_event.wait() + server.should_exit = True + thread.join(timeout=10) + + def on_stop(self): + if self._server: + self._server.should_exit = True diff --git a/bot/modules/base.py b/bot/modules/base.py new file mode 100644 index 0000000..2ee68f5 --- /dev/null +++ b/bot/modules/base.py @@ -0,0 +1,73 @@ +from abc import ABC, abstractmethod +from typing import Any, Optional +from dataclasses import dataclass, field +from enum import Enum +import threading +import logging + + +class ModuleType(Enum): + PERIODIC = "periodic" + EVENT = "event" + SERVICE = "service" + + +@dataclass +class ModuleConfig: + name: str + enabled: bool = True + interval: int = 60 + max_retries: int = 3 + backoff_seconds: int = 30 + settings: dict = None + + def __post_init__(self): + if self.settings is None: + self.settings = {} + + +@dataclass +class ModuleContext: + account: Any + config: ModuleConfig + logger: logging.Logger + db: Optional[Any] = None + shared_state: dict = field(default_factory=dict) + stop_event: Optional[threading.Event] = None + + +class BaseModule(ABC): + + def __init__(self, ctx: ModuleContext): + self._ctx = ctx + self._running = False + + @property + def ctx(self) -> ModuleContext: + return self._ctx + + @property + @abstractmethod + def module_type(self) -> ModuleType: + ... + + @property + def name(self) -> str: + return self._ctx.config.name + + @abstractmethod + def execute(self) -> Any: + ... + + def on_start(self) -> None: + pass + + def on_stop(self) -> None: + pass + + def on_event(self, event: dict) -> Any: + return None + + @property + def is_running(self) -> bool: + return self._running diff --git a/bot/modules/manager.py b/bot/modules/manager.py new file mode 100644 index 0000000..6454ade --- /dev/null +++ b/bot/modules/manager.py @@ -0,0 +1,212 @@ +import os +import importlib.util +import threading +import logging +from typing import Optional, Type +from pathlib import Path + +from .base import BaseModule, ModuleConfig, ModuleContext, ModuleType +from bot.config import ModulesConfig + + +class ModuleManager: + + def __init__(self, modules_config: ModulesConfig, account, db, logger: logging.Logger, shared_state: dict = None): + self._modules_config = modules_config + self._account = account + self._db = db + self._logger = logger + self._modules: dict[str, BaseModule] = {} + self._module_classes: dict[str, Type[BaseModule]] = {} + self._threads: dict[str, threading.Thread] = {} + self._stop_event = threading.Event() + self._shared_state = shared_state or {} + + @property + def modules(self) -> dict[str, BaseModule]: + return self._modules + + @property + def module_names(self) -> list[str]: + return list(self._modules.keys()) + + def discover_modules(self) -> None: + builtin_dir = str(Path(__file__).parent.resolve()) + self._discover_from_directory(builtin_dir) + + for directory in self._modules_config.directories: + if directory == builtin_dir: + continue + self._discover_from_directory(directory) + + self._logger.info( + f"Discovered {len(self._module_classes)} module class(es): " + f"{list(self._module_classes.keys())}" + ) + + def _discover_from_directory(self, directory: str) -> None: + base_path = Path(directory) + if not base_path.exists(): + self._logger.warning(f"Module directory not found: {directory}") + return + + for file_path in sorted(base_path.glob("*.py")): + if file_path.name.startswith("_"): + continue + self._load_module_from_file(file_path) + + def _load_module_from_file(self, file_path: Path) -> None: + module_name = file_path.stem + + if module_name in ("base", "manager", "utils"): + return + + bot_modules_dir = Path(__file__).parent.resolve() + if file_path.parent.resolve() == bot_modules_dir: + pkg_name = f"bot.modules.{module_name}" + else: + pkg_name = f"modules.{module_name}" + + spec = importlib.util.spec_from_file_location(pkg_name, file_path) + if spec is None or spec.loader is None: + return + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and issubclass(attr, BaseModule) + and attr is not BaseModule + ): + self._module_classes[module_name] = attr + self._logger.debug(f"Found module class: {module_name}.{attr_name}") + + def load_modules(self) -> None: + enabled = set(self._modules_config.enabled) + settings = self._modules_config.settings + + if not enabled: + self._logger.info("No modules enabled in config") + return + + for module_name in enabled: + if module_name not in self._module_classes: + self._logger.error( + f"Module '{module_name}' not found. " + f"Available: {list(self._module_classes.keys())}" + ) + continue + + module_class = self._module_classes[module_name] + module_settings = settings.get(module_name, {}) + + module_config = ModuleConfig( + name=module_name, + enabled=True, + interval=module_settings.get("interval", 60), + max_retries=module_settings.get("max_retries", 3), + backoff_seconds=module_settings.get("backoff_seconds", 30), + settings=module_settings, + ) + + ctx = ModuleContext( + account=self._account, + config=module_config, + logger=self._logger, + db=self._db, + shared_state=self._shared_state, + stop_event=self._stop_event, + ) + + try: + module = module_class(ctx) + self._modules[module_name] = module + self._logger.info(f"Loaded module: {module_name}") + except Exception as e: + self._logger.error(f"Failed to load module '{module_name}': {e}") + + def start_all(self) -> None: + for name, module in self._modules.items(): + t = threading.Thread( + target=self._run_module_wrapper, + args=(module,), + daemon=True, + name=f"module-{name}", + ) + self._threads[name] = t + t.start() + self._logger.info(f"Started module thread: {name}") + + def stop_all(self) -> None: + self._logger.info("Stopping all modules...") + self._stop_event.set() + for name, t in self._threads.items(): + self._logger.debug(f"Waiting for module '{name}' to stop...") + t.join(timeout=5) + if t.is_alive(): + self._logger.warning(f"Module '{name}' did not stop in time") + self._logger.info("All modules stopped") + + def _run_module_wrapper(self, module: BaseModule) -> None: + retries = 0 + max_retries = module.ctx.config.max_retries + backoff = module.ctx.config.backoff_seconds + + while retries <= max_retries and not self._stop_event.is_set(): + try: + module._running = True + module.on_start() + + if module.module_type == ModuleType.PERIODIC: + self._run_periodic(module) + elif module.module_type == ModuleType.EVENT: + self._run_event(module) + elif module.module_type == ModuleType.SERVICE: + module.execute() + + break + + except Exception as e: + retries += 1 + self._logger.error( + f"Module '{module.name}' failed ({retries}/{max_retries}): {e}", + exc_info=True, + ) + if retries <= max_retries: + wait = backoff * (2 ** (retries - 1)) + self._logger.info(f"Restarting '{module.name}' in {wait}s...") + self._stop_event.wait(wait) + else: + self._logger.error( + f"Module '{module.name}' permanently failed after {max_retries} retries" + ) + finally: + module._running = False + try: + module.on_stop() + except Exception: + pass + + def _run_periodic(self, module: BaseModule) -> None: + interval = module.ctx.config.interval + while not self._stop_event.is_set(): + module.execute() + self._stop_event.wait(interval) + + def _run_event(self, module: BaseModule) -> None: + for event in self._account.signal.listen("messages.new", stop_event=self._stop_event): + if self._stop_event.is_set(): + break + try: + module.on_event(event) + except Exception as e: + self._logger.error( + f"Error in event module '{module.name}': {e}", + exc_info=True, + ) + + def has_alive_modules(self) -> bool: + return any(t.is_alive() for t in self._threads.values()) diff --git a/bot/modules/messaging.py b/bot/modules/messaging.py new file mode 100644 index 0000000..546aa6c --- /dev/null +++ b/bot/modules/messaging.py @@ -0,0 +1,110 @@ +from datetime import datetime +from typing import Optional + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +from bot.modules.base import BaseModule, ModuleType + + +class AddContactRequest(BaseModel): + public_key: str + display_name: Optional[str] = None + + +class SendMessageRequest(BaseModel): + text: str + + +class WebhookMessage(BaseModel): + content: str + + +class MessagingModule(BaseModule): + + @property + def module_type(self) -> ModuleType: + return ModuleType.SERVICE + + def on_start(self): + app: FastAPI = self.ctx.shared_state["fastapi_app"] + self._setup_routes(app) + + def _setup_routes(self, app: FastAPI): + account = self.ctx.account + + @app.get("/health") + def health(): + return {"status": "healthy"} + + @app.get("/api/v1/contacts") + def get_contacts(): + return account.contacts + + @app.post("/api/v1/contacts", status_code=201) + def add_contact(payload: AddContactRequest): + try: + account.add_contact(payload.public_key, payload.display_name) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + return {"status": "ok"} + + @app.delete("/api/v1/contacts/{public_key}") + def remove_contact(public_key: str): + if not account.remove_contact(public_key): + raise HTTPException( + status_code=404, + detail="Contact not found or already removed", + ) + return {"status": "ok"} + + @app.get("/api/v1/chats") + def get_chats(): + return account.chats + + @app.get("/api/v1/chats/{chat_id}/messages") + def get_messages( + chat_id: str, + start_timestamp: Optional[str] = None, + end_timestamp: Optional[str] = None, + ): + start = None + end = None + if start_timestamp: + try: + start = datetime.fromisoformat(start_timestamp) + except ValueError: + raise HTTPException( + status_code=400, + detail="Invalid start_timestamp, use ISO format", + ) + if end_timestamp: + try: + end = datetime.fromisoformat(end_timestamp) + except ValueError: + raise HTTPException( + status_code=400, + detail="Invalid end_timestamp, use ISO format", + ) + return account.get_messages(chat_id, start, end) + + @app.post("/api/v1/chats/{chat_id}/messages", status_code=201) + def send_message(chat_id: str, payload: SendMessageRequest): + if not payload.text: + raise HTTPException(status_code=400, detail="Message text is required") + account.send_message(chat_id, payload.text) + return {"status": "ok"} + + @app.post("/api/v1/webhook/messages/{chat_id}") + def webhook_send_message(chat_id: str, payload: WebhookMessage): + if not payload.content: + raise HTTPException(status_code=400, detail="Content is required") + account.send_message(chat_id, payload.content) + return {"status": "ok"} + + @app.get("/api/v1/communities") + def get_communities(): + return account.communities + + def execute(self): + self.ctx.stop_event.wait() diff --git a/bot/modules/monitoring.py b/bot/modules/monitoring.py new file mode 100644 index 0000000..310be8c --- /dev/null +++ b/bot/modules/monitoring.py @@ -0,0 +1,213 @@ +import datetime +import os +from pathlib import Path + +import pandas as pd + +from bot.modules.base import BaseModule, ModuleType +from bot.modules.utils import save_file, to_midnight, to_sha256_hash + + +def extract_community_channels(account, community: dict, latest_dates: dict[str, pd.Timestamp]) -> pd.DataFrame: + bridge_key = "bridge_message" + columns = { + "id": True, + "whisper_timestamp": False, + "from": True, + "seen": False, + "chat_id": False, + "community_id": False, + "message_type": False, + "response_to": True, + "timestamp": False, + "deleted": False, + "extracted_timestamp": False, + } + + final = [] + for channel in community["channels"]: + now = datetime.datetime.now() + start_timestamp = latest_dates.get(channel["chat_id"]) + if start_timestamp: + start_timestamp += datetime.timedelta(seconds=1) + else: + start_timestamp = to_midnight(now - datetime.timedelta(days=30)) + + account.logger.info( + f"Starting message extraction for # {channel['name']} [{start_timestamp} - {now}]" + ) + messages = account.get_messages(channel["chat_id"], start_timestamp, now) + messages = pd.DataFrame(messages) + if len(messages) == 0: + account.logger.info("No messages found") + continue + + account.logger.info(f"Extracted {len(messages)} message(s)") + messages = messages.assign( + community_id=community["id"], + extracted_timestamp=now, + ) + final.append(messages) + + extracted_data = pd.concat(final, ignore_index=True) if final else pd.DataFrame() + if len(extracted_data) == 0: + return extracted_data + + existing_columns = extracted_data.columns.to_list() + for column, should_hash in columns.items(): + if column not in existing_columns: + loc = len(extracted_data.columns.to_list()) + extracted_data.insert(loc, column, None) + continue + + if should_hash: + extracted_data[column] = extracted_data[column].astype(str).apply(to_sha256_hash) + + if bridge_key in extracted_data.columns: + extracted_data["source"] = extracted_data[bridge_key].apply( + lambda value: value["bridgeName"] if not pd.isna(value) else "status" + ) + else: + extracted_data["source"] = "status" + + extracted_data = extracted_data[list(columns.keys()) + ["source"]].assign( + deleted=extracted_data["deleted"].fillna(False), + seen=extracted_data["seen"].fillna(False), + ) + account.logger.info("Sensitive data has been hashed") + + return extracted_data + + +class MonitoringModule(BaseModule): + + @property + def module_type(self) -> ModuleType: + return ModuleType.PERIODIC + + def on_start(self) -> None: + account = self.ctx.account + balance = account["GBP"] + query = ( + (balance["symbol"] == "SNT") + & (balance["fiat_value"] > 0) + & (balance["chain_id"] == 1) + ) + if query.sum() != 1: + raise RuntimeError("Wallet balance check failed — Infura or Coingecko issue") + + def execute(self) -> None: + config = self.ctx.shared_state.get("config") + if config is None: + self.ctx.logger.error("MonitoringModule: config not found in shared_state") + return + + project_root = self.ctx.shared_state.get("project_root") + if not project_root: + project_root = os.path.dirname(os.path.abspath(__file__)) + + account = self.ctx.account + logger = self.ctx.logger + + upload_folder = self.ctx.config.settings.get("upload_folder", "uploads") + upload_path = os.path.join(project_root, upload_folder) + current_state_path = os.path.join(project_root, config.files.current_state) + + self._download(account, upload_path, current_state_path, config) + + if self.ctx.db is not None: + self._store(upload_path, current_state_path, config, logger) + else: + logger.info("No database configured, skipping store step") + + def _download(self, account, upload_path: str, current_state_path: str, config) -> None: + latest_dates: dict[str, pd.Timestamp] = ( + pd.read_pickle(current_state_path) if os.path.exists(current_state_path) else {} + ) + + get_file_name = lambda: str(to_midnight(datetime.datetime.now()).timestamp()).replace(".", "") + communities = account.communities + if not communities: + account.logger.warning("No communities found...") + return + + for community in communities: + if not community["is_member"]: + continue + + community_folder_name = community["name"].replace(" ", "-") + messages_folder = os.path.join(upload_path, "messages", community_folder_name) + community_info_folder = os.path.join(upload_path, "community", community_folder_name) + + account.logger.info(f"Extracting data for {community['name']}") + community["extracted_timestamp"] = datetime.datetime.now() + + file_path = os.path.join(community_info_folder, get_file_name() + ".pkl") + if not os.path.exists(file_path): + save_file(file_path, community) + account.logger.info(f"Created {file_path}") + + file_path = os.path.join(messages_folder, get_file_name() + ".csv") + if not os.path.exists(file_path): + messages = extract_community_channels(account, community, latest_dates) + if len(messages) > 0: + save_file(file_path, messages) + account.logger.info(f"Created {file_path}") + + def _store(self, upload_path: str, current_state_path: str, config, logger) -> None: + path = Path(upload_path) + table_name_mapping: dict[str, str] = config.database.tables + + upload: dict[str, list] = {} + latest_dates: dict[str, pd.Timestamp] = ( + pd.read_pickle(current_state_path) if os.path.exists(current_state_path) else {} + ) + completed = [] + + files = list(path.rglob("*.pkl")) + list(path.rglob("*.csv")) + logger.info(f"There are {len(files)} file(s) to upload") + for file_path in files: + table_name = table_name_mapping.get(file_path.parent.parent.name) + if not table_name: + continue + + file_name = str(file_path.name) + data = pd.read_pickle(file_path) if file_name.endswith(".pkl") else pd.read_csv(file_path) + if isinstance(data, dict): + data = pd.DataFrame([data]) + + for column in data.columns: + if "timestamp" not in column: + continue + data[column] = pd.to_datetime(data[column]) + + if table_name not in upload: + upload[table_name] = [] + + if "timestamp" in data.columns: + latest_dates.update(data.groupby("chat_id")["timestamp"].max().to_dict()) + + upload[table_name].append(data) + completed.append(str(file_path)) + + save_file(current_state_path, latest_dates) + logger.info(f"Updated {current_state_path}") + + connector = self.ctx.db + for table_name, data in upload.items(): + if len(data) == 0: + continue + + df = pd.concat(data, ignore_index=True).assign(batch_timestamp=datetime.datetime.now()) + json_columns = [ + column + for column in df.columns + if len(df[column].dropna()) > 0 + and isinstance(df[column].dropna().reset_index(drop=True).iloc[0], (dict, list)) + ] + connector.insert(df, table_name, json_columns) + logger.info(f"Uploaded {len(df)} record(s) to {table_name}") + + for file_path in completed: + os.remove(file_path) + logger.info(f"Deleted {file_path}") diff --git a/bot/modules/receiver.py b/bot/modules/receiver.py new file mode 100644 index 0000000..3f84ed4 --- /dev/null +++ b/bot/modules/receiver.py @@ -0,0 +1,76 @@ +import datetime + +import pandas as pd + +from bot.modules.base import BaseModule, ModuleType +from bot.modules.utils import to_sha256_hash + + +class ReceiverModule(BaseModule): + + @property + def module_type(self) -> ModuleType: + return ModuleType.EVENT + + def on_start(self): + settings = self.ctx.config.settings + self._messages_table = settings.get("messages_table", "received_messages") + self._chats_table = settings.get("chats_table", "received_chats") + self._hash_columns = {"id", "from", "response_to"} + + if self.ctx.db is None: + self.ctx.logger.warning("Receiver: no database configured, disabling") + self._disabled = True + return + + self._disabled = False + + def execute(self): + pass + + def on_event(self, event: dict): + if self._disabled: + return + + event_data = event.get("event", {}) + + messages = event_data.get("messages", []) + if messages: + + self.ctx.logger.info(f"message received {messages}") + self._process_and_insert(messages, self._messages_table) + + chats = event_data.get("chats", []) + if chats: + self._process_and_insert(chats, self._chats_table) + + def _process_and_insert(self, raw_data: list[dict], table_name: str): + if not raw_data: + return + + df = pd.DataFrame(raw_data) + df.columns = [col.lower() for col in df.columns] + + for col in self._hash_columns: + if col in df.columns: + df[col] = df[col].astype(str).apply(to_sha256_hash) + + df["received_timestamp"] = datetime.datetime.now() + + json_columns = self._detect_json_columns(df) + + self.ctx.db.insert(df, table_name, json_columns) + self.ctx.logger.info( + f"Receiver: stored {len(df)} record(s) in {table_name}" + ) + + @staticmethod + def _detect_json_columns(df: pd.DataFrame) -> list[str]: + json_columns = [] + for col in df.columns: + non_null = df[col].dropna() + if len(non_null) > 0: + first = non_null.reset_index(drop=True).iloc[0] + if isinstance(first, (dict, list)): + json_columns.append(col) + return json_columns diff --git a/bot/modules/utils.py b/bot/modules/utils.py new file mode 100644 index 0000000..efd9c9d --- /dev/null +++ b/bot/modules/utils.py @@ -0,0 +1,28 @@ +import datetime +import os +import pickle +from hashlib import sha256 + +import pandas as pd +from typing import Any + + +def to_sha256_hash(value: str) -> str: + return sha256(value.encode()).hexdigest() + + +def to_midnight(timestamp: datetime.datetime) -> datetime.datetime: + return timestamp.replace(minute=0, second=0, hour=0, microsecond=0) + + +def save_file(file_path: str, data: Any): + folder = os.path.dirname(file_path) + if len(folder) > 0: + os.makedirs(folder, exist_ok=True) + + if isinstance(data, pd.DataFrame): + data.to_csv(file_path, index=False) + return + + with open(file_path, "wb") as f: + pickle.dump(data, f) diff --git a/bot/signal.py b/bot/signal.py index 46b070b..558a56b 100644 --- a/bot/signal.py +++ b/bot/signal.py @@ -120,12 +120,13 @@ def __close_thread(self): if self.__thread.is_alive(): self.__thread.join(1) - def listen(self, signal_type: str): + def listen(self, signal_type: str, stop_event: Optional[threading.Event] = None): """ Listen for a specific Signal forever Parameters: - `signal_type` - the "type" as it is in Status Backend + - `stop_event` - optional threading.Event for graceful shutdown """ self.__signal_type = signal_type ws = websocket.WebSocketApp( @@ -139,7 +140,11 @@ def listen(self, signal_type: str): self.__thread.start() while True: try: - data = self.__queue.get() + data = self.__queue.get(timeout=1) + except queue.Empty: + if stop_event and stop_event.is_set(): + break + continue except KeyboardInterrupt: break if self.__error_message: diff --git a/config.yaml b/config.yaml index 35f763b..b547f5a 100644 --- a/config.yaml +++ b/config.yaml @@ -1,23 +1,44 @@ -postgres: - schema: "status_app_monitoring" - tables: - messages: "raw_messages" - community: "raw_community_info" +database: + type: postgres + host: "database" + port: 5432 + schema: "status_app_monitoring" + name: "status-bot" + tables: + messages: "raw_messages" + community: "raw_community_info" # Value must be in MINUTES sleep: 10 files: - # Get the latest community chat dates for next run - current_state: "dates.pkl" + # Get the latest community chat dates for next run + current_state: "dates.pkl" bot: - # Public information for the bot - public_key: "0x041658626a9e1303b631f6d0fb1e047211d5603b977454f7d5d29fe583c3d6c1bd3d8e395d67f6c44b5bc659aae912040e9dd8164b5107368a29029cb53389d8b0" - compressed_key: "zQ3shNv1tnajHo5FvCvP662cWcbBfS5ZejB4TWaH9iAuFCZZe" - # Parameters based on Account class - params: - # localhost - to run code locally with Status Backend Dockerfile - # status-backend - to run code with docker-compose.yaml - # domain: "status-backend" - domain: "status-backend" - port: 8080 - is_secure: false + init_account: true + display_name: 'e-raccoon' + public_key: "0xExample" + compressed_key: "example-compressed-key" + +backend: + domain: "status-backend" + port: 8080 + is_secure: false + +api: + enable: true + host: "0.0.0.0" + port: 8081 + # api_key: "your-secret-key" + +modules: + directories: [] + enabled: ["messaging", "receiver"] + settings: + receiver: + messages_table: "received_messages" + chats_table: "received_chats" + +prometheus: + enabled: true + host: "0.0.0.0" + port: 8000 diff --git a/docker-compose.yaml b/docker-compose.yaml index b30370b..39825dd 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -24,22 +24,28 @@ services: build: context: . dockerfile: Dockerfile - container_name: status-monitor + container_name: status-bot depends_on: - backend - database volumes: - ./backups:/backups + - ./config.yaml:/app/config.yaml env_file: - .env + ports: + - 8081:8081 + - 8000:8000 networks: - status-bridge database: image: postgres:15 container_name: database - env_file: - - .env + environment: + POSTGRES_DB: 'status-bot' + POSTGRES_USER: 'status' + POSTGRES_PASSWORD: 'ChangeMeIfYouCare' ports: - 5432:5432 volumes: diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index b2d1e9f..e2a3633 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -1,3 +1,14 @@ # Summary -[Account](./account.md) +## Development + +- [Account](./development//account.md) +- [Modules](./development/modules.md) + + +## Usage + +- [Configuration](./usage/configuration.md) +- [Messaging API](./usage/messaging.md) +- [Community Monitoring](./usage/monitoring.md) +- [Metrics](./usage/metrics.md) diff --git a/docs/account.md b/docs/development/account.md similarity index 100% rename from docs/account.md rename to docs/development/account.md diff --git a/docs/development/modules.md b/docs/development/modules.md new file mode 100644 index 0000000..13a7136 --- /dev/null +++ b/docs/development/modules.md @@ -0,0 +1,139 @@ +# Modules + +Modules are plugins that extend the bot with custom logic. Each module runs in its own daemon thread and follows a defined lifecycle. + +--- + +## Module types + +| Type | Behaviour | Use case | +|------|-----------|----------| +| `PERIODIC` | Calls `execute()` in a loop, sleeping `interval` seconds between runs | Scheduled data extraction, polling | +| `EVENT` | Iterates over WebSocket signal events, calling `on_event()` for each one | Real-time message reactions, auto-reply | +| `SERVICE` | Calls `execute()` once. Expected to block until shutdown (e.g. runs a server) | HTTP API server, long-running workers | + +--- + +## BaseModule API + +```python +from bot.modules.base import BaseModule, ModuleType + +class MyModule(BaseModule): + + @property + def module_type(self) -> ModuleType: + return ModuleType.PERIODIC + + def on_start(self): + ... # called once when the module starts + + def execute(self): + ... # main logic (called periodically or once) + + def on_stop(self): + ... # called once when the module stops + + def on_event(self, event: dict): + ... # handle a signal event (EVENT type only) +``` + +### ModuleContext + +Every module receives a `ModuleContext` via the constructor, accessible as `self.ctx`: + +| Field | Type | Description | +|-------|------|-------------| +| `account` | `Account` | Logged-in Status account | +| `config` | `ModuleConfig` | This module's configuration | +| `logger` | `Logger` | Shared logger | +| `db` | `Postgres / None` | Optional Postgres connection | +| `shared_state` | `dict` | Cross-module shared data | +| `stop_event` | `threading.Event` | Set when the bot is shutting down | + +### ModuleConfig + +Each module receives only its own settings from `config.yaml`: + +```python +self.ctx.config.name # module identifier +self.ctx.config.interval # seconds between PERIODIC runs +self.ctx.config.max_retries # restart attempts before permanent failure +self.ctx.config.settings # dict of module-specific settings +``` + +--- + +## Adding API routes + +Modules that want to expose HTTP endpoints can register routes on the shared FastAPI app instead of starting their own server. + +The central `api_server` module owns the uvicorn lifecycle. Other modules register routes during `on_start()`: + +```python +class MyAPIModule(BaseModule): + + @property + def module_type(self) -> ModuleType: + return ModuleType.SERVICE + + def on_start(self): + app = self.ctx.shared_state["fastapi_app"] + self._setup_routes(app) + + def _setup_routes(self, app): + @app.get("/api/v1/my-endpoint") + def my_handler(): + return {"hello": "world"} + + def execute(self): + self.ctx.stop_event.wait() # block until shutdown +``` + +The `api_server` module is auto-loaded whenever any API module is enabled and `api.enable` is `true`. + +--- + +## Signals and EVENT modules + +EVENT modules react to Status WebSocket signals: + +```python +class AutoReplyModule(BaseModule): + + @property + def module_type(self) -> ModuleType: + return ModuleType.EVENT + + def on_start(self): + self._commands = self.ctx.config.settings.get("commands", {}) + + def on_event(self, event: dict): + messages = event.get("event", {}).get("messages", []) + for msg in messages: + self.ctx.account.send_message(msg["chatId"], "Message received") +``` + +The signal listener respects `stop_event` for graceful shutdown. + +--- + +## Utility functions + +`bot/modules/utils.py` provides shared helpers: + +| Function | Description | +|----------|-------------| +| `to_sha256_hash(value)` | Returns the SHA-256 hex digest of a string | +| `to_midnight(timestamp)` | Truncates a datetime to the start of its day | +| `save_file(file_path, data)` | Saves a DataFrame as CSV or pickles any other object | + +--- + +## Best practices + +- **Error isolation**: One module crash never affects others. The `ModuleManager` restarts failed modules with exponential backoff. +- **Graceful shutdown**: Always check `self.ctx.stop_event.is_set()` or use `self.ctx.stop_event.wait()` in long-running loops. +- **No blocking in EVENT modules**: EVENT handlers process one event at a time — keep `on_event()` fast. +- **Thread safety**: Modules run in separate threads. Use `shared_state` with caution for shared mutable data. +- **Configuration**: Read module-specific settings from `self.ctx.config.settings` — each module gets its own config namespace. diff --git a/docs/usage/configuration.md b/docs/usage/configuration.md new file mode 100644 index 0000000..2b4a87c --- /dev/null +++ b/docs/usage/configuration.md @@ -0,0 +1,180 @@ +# Configuration + +The bot is configured through a YAML file (`config.yaml`) and environment variables. +Environment variables override values from the YAML file. + +## Loading order + +1. `config.yaml` — base configuration +2. Shell environment variables — override YAML values +3. `.env` file — override YAML values (loaded from the same directory as `config.yaml`) + +## Usage + +```bash +python main.py --config /path/to/config.yaml +``` + +The `--config` argument defaults to `./config.yaml`. + +--- + +## Reference + +### `sleep` + +| Type | Default | Description | +|------|---------|-------------| +| `int` | `10` | Sleep interval in minutes between monitoring cycles | + +### `files` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `current_state` | `str` | `"dates.pkl"` | File path for tracking latest message timestamps per chat | + +```yaml +files: + current_state: "dates.pkl" +``` + +--- + +### `bot` + +| Field | Type | Default | Env var | Description | +|-------|------|---------|---------|-------------| +| `display_name` | `str` | `""` | `BOT_DISPLAY_NAME` | Status display name used to log in | +| `public_key` | `str` | `""` | — | Expected public key for verification | +| `password` | `str` | `""` | `BOT_PASSWORD` | Status account password | +| `mnemonic_phrase` | `str` | `""` | `BOT_MNEMONIC_PHRASE` | 12-word recovery phrase (used when `init_account: true`) | +| `init_account` | `bool` | `false` | `BOT_INIT_ACCOUNT` | If `false`, the account must already exist. If `true`, creates or restores the account using `mnemonic_phrase` | +| `compressed_key` | `str` | `""` | — | Expected compressed key for verification after login | +| `infura_token` | `str` | `""` | `BOT_INFURA_TOKEN` | [Infura token](https://www.infura.io/) required for token-gated communities | +| `coingecko_api_key` | `str` | `""` | `BOT_COINGECKO_API_KEY` | [CoinGecko API key](https://www.coingecko.com/) required for token-gated communities | + + +```yaml +bot: + display_name: 'my-bot' + public_key: '0x...' + password: 'ChangeMe' + mnemonic_phrase: 'word1 word2 ... word12' + init_account: false + compressed_key: 'zQ3...' +``` + +### `backend` + +Parameter to connect to the Status Backend instance. + +| Field | Type | Default | Env var | Description | +|-------|------|---------|---------|-------------| +| `domain` | `str` | `"localhost"` | `BOT_PARAMS_DOMAIN` | Status Backend hostname (`localhost` for local, `status-backend` for Docker) | +| `port` | `int` | `8080` | `BOT_PARAMS_PORT` | Status Backend API port | +| `is_secure` | `bool` | `false` | `BOT_PARAMS_IS_SECURE` | Use HTTPS instead of HTTP | +```yaml +backend: + domain: "status-backend" + port: 8080 + is_secure: false +``` + +--- + +### `api` + +Configuration for the WebServer avaialable to the modules. + +| Field | Type | Default | Env var | Description | +|-------|------|---------|---------|-------------| +| `enable` | `bool` | `true` | `API_ENABLE` | Enable the REST API server | +| `host` | `str` | `"0.0.0.0"` | `API_HOST` | API server bind address | +| `port` | `int` | `8081` | `API_PORT` | API server port | +| `api_key` | `str` | `""` | `API_API_KEY` | API key for request authentication (empty = disabled) | + +```yaml +api: + enable: true + host: "0.0.0.0" + port: 8081 + # api_key: "your-secret-key" +``` + +--- + +### `database` + +Supports **Postgres** and **SQLite** (SQLite via `type: sqlite`, where `name` is the file path). + +| Field | Type | Default | Env var | Description | +|-------|------|---------|---------|-------------| +| `type` | `str` | `"postgres"` | `DATABASE_TYPE` | Database engine (`postgres` or `sqlite`) | +| `host` | `str` | `"database"` | `DATABASE_HOST` | Database server hostname (Postgres) | +| `port` | `int` | `5432` | `DATABASE_PORT` | Database server port (Postgres) | +| `user` | `str` | `""` | `DATABASE_USER` | Database username (Postgres) | +| `password` | `str` | `""` | `DATABASE_PASSWORD` | Database password (Postgres) | +| `name` | `str` | `""` | `DATABASE_NAME` | Database name (Postgres) or file path (SQLite) | +| `schema` | `str` | `"public"` | `DATABASE_SCHEMA` | Database schema (Postgres) | +| `tables` | `dict` | `{}` | — | Mapping of data type to table name | + +Postgres example: +```yaml +database: + type: postgres + host: database + port: 5432 + user: 'myuser' + password: 'ChangeMe' + name: 'status-bot' + schema: "status_app_monitoring" + tables: + messages: "raw_messages" + community: "raw_community_info" +``` + +SQLite example: +```yaml +database: + type: sqlite + name: "/data/status-bot.db" +``` + +--- + +### `modules` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `directories` | `list[str]` | `["./modules"]` | Directories to scan for module `.py` files | +| `enabled` | `list[str]` | `[]` | List of module names to enable | +| `settings` | `dict` | `{}` | Per-module settings (each module defines its own schema) | + +The `api_server` module is auto-loaded whenever `api.enable` is `true` — it does not need to be listed in `enabled`. + +```yaml +modules: + directories: + - ./modules + enabled: + - messaging + settings: + messaging: {} +``` + +--- + +### `metrics` + +| Field | Type | Default | Env var | Description | +|-------|------|---------|---------|-------------| +| `enabled` | `bool` | `false` | `METRICS_ENABLED` | Enable Prometheus metrics HTTP server | +| `host` | `str` | `"0.0.0.0"` | `METRICS_HOST` | Prometheus HTTP server bind address | +| `port` | `int` | `8000` | `METRICS_PORT` | Prometheus HTTP server port | + +```yaml +metrics: + enabled: true + host: "0.0.0.0" + port: 8000 +``` diff --git a/docs/usage/messaging.md b/docs/usage/messaging.md new file mode 100644 index 0000000..45b1adf --- /dev/null +++ b/docs/usage/messaging.md @@ -0,0 +1,197 @@ +# Messaging API + +The bot exposes a REST endpoint for sending messages, managing contacts, and querying chats. + +## Interactive docs + +| Tool | URL | +|------|-----| +| Swagger UI | `http://localhost:8081/docs` | +| ReDoc | `http://localhost:8081/redoc` | +| OpenAPI JSON | `http://localhost:8081/openapi.json` | + +## Authentication + +If `api.api_key` is configured in `config.yaml`, all requests (except `/health`, `/docs`, `/redoc`, and `/openapi.json`) must include the `X-API-Key` header: + +```bash +curl -H "X-API-Key: your-secret-key" http://localhost:8081/api/v1/chats +``` + +Requests without a valid key return `401 Unauthorized`: + +```json +{"detail": "Invalid or missing API key"} +``` + +--- + +## Endpoints + +### `GET /health` + +Health check. + +```bash +curl http://localhost:8081/health +``` + +Response `200`: +```json +{"status": "healthy"} +``` + +--- + +### `POST /api/v1/contacts` + +Add a contact (send friend request). + +```bash +curl -X POST http://localhost:8081/api/v1/contacts \ + -H "Content-Type: application/json" \ + -d '{"public_key": "0x...", "display_name": "Alice"}' +``` + +Request body: + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `public_key` | `string` | Yes | Contact's public key | +| `display_name` | `string` | No | Display name (required if not already a contact) | + +Response `201`: +```json +{"status": "ok"} +``` + +Error `400` — missing or invalid fields. + +--- + +### `DELETE /api/v1/contacts/{public_key}` + +Remove a contact. + +```bash +curl -X DELETE http://localhost:8081/api/v1/contacts/0x... +``` + +Response `200`: +```json +{"status": "ok"} +``` + +Error `404` — contact not found or already removed. + +--- + +### `GET /api/v1/chats` + +List all available chats (contacts, community channels, and group chats). + +```bash +curl http://localhost:8081/api/v1/chats +``` + +Response `200`: +```json +[ + {"type": "contact", "id": "0x...", "name": "Alice"}, + {"type": "channel", "id": "0x...", "name": "Community #general"}, + {"type": "group_chat", "id": "0x...", "name": "Group Chat Name"} +] +``` + +Each chat has: +| Field | Type | Description | +|-------|------|-------------| +| `type` | `string` | `contact`, `channel`, or `group_chat` | +| `id` | `string` | Chat ID (used in message endpoints) | +| `name` | `string` | Human-readable name | + +--- + +### `GET /api/v1/chats/{chat_id}/messages` + +Get messages from a chat. Messages are returned newest-first. + +```bash +curl "http://localhost:8081/api/v1/chats/0x.../messages" +``` + +Optional query parameters: + +| Parameter | Type | Description | +|-----------|------|-------------| +| `start_timestamp` | ISO 8601 | Only messages after this timestamp | +| `end_timestamp` | ISO 8601 | Only messages before this timestamp | + +```bash +curl "http://localhost:8081/api/v1/chats/0x.../messages?start_timestamp=2025-01-01T00:00:00&end_timestamp=2025-06-01T00:00:00" +``` + +Response `200` — array of message objects. + +--- + +### `POST /api/v1/chats/{chat_id}/messages` + +Send a text message to a chat. + +```bash +curl -X POST http://localhost:8081/api/v1/chats/0x.../messages \ + -H "Content-Type: application/json" \ + -d '{"text": "Hello from the bot!"}' +``` + +Request body: + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `text` | `string` | Yes | Message content | + +Response `201`: +```json +{"status": "ok"} +``` + +--- + +### `POST /api/v1/webhook/messages/{chat_id}` + +Send a text message to a chat. Designed for external services to trigger messages without extra client logic. + +```bash +curl -X POST http://localhost:8081/api/v1/webhook/messages/0x... \ + -H "Content-Type: application/json" \ + -d '{"content": "Notification from external service"}' +``` + +Request body: + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `content` | `string` | Yes | Message content | + +Response `200`: +```json +{"status": "ok"} +``` + +--- + +## Error handling + +All errors return a JSON body with a `detail` field: + +```json +{"detail": "Contact not found or already removed"} +``` + +| Status | Meaning | +|--------|---------| +| `400` | Bad request (invalid input, missing fields, wrong format) | +| `404` | Resource not found (contact, chat) | +| `422` | Validation error (malformed request body) | +| `500` | Internal server error | diff --git a/docs/usage/metrics.md b/docs/usage/metrics.md new file mode 100644 index 0000000..5904fbb --- /dev/null +++ b/docs/usage/metrics.md @@ -0,0 +1,66 @@ +# Metrics + +The bot exposes Prometheus metrics for health monitoring and observability. + +--- + +## Endpoint + +``` +http://:/metrics +``` + +Default: `http://0.0.0.0:8000/metrics` + +Configured via the `metrics` section in `config.yaml` (see [Configuration](./configuration.md#metrics)). + +--- + +## Available metrics + +### `status_bot_health` + +| Type | Labels | Description | +|------|--------|-------------| +| Gauge | — | `1` if the bot is running, `0` if stopped | + +### `status_bot_version` + +| Type | Labels | Description | +|------|--------|-------------| +| Gauge | `version` | Constant `1` with the bot version as a label value | + +### `status_bot_module_loaded` + +| Type | Labels | Description | +|------|--------|-------------| +| Gauge | `module` | `1` for each loaded module | + +Example: +``` +status_bot_module_loaded{module="messaging"} 1 +status_bot_module_loaded{module="api_server"} 1 +``` + +### `status_bot_module_errors_total` + +| Type | Labels | Description | +|------|--------|-------------| +| Counter | `module` | Total number of errors encountered by a module | + +### `status_bot_module_restarts_total` + +| Type | Labels | Description | +|------|--------|-------------| +| Counter | `module` | Total number of times a module has been restarted | + +--- + +## Example Prometheus scrape config + +```yaml +scrape_configs: + - job_name: "status-bot" + static_configs: + - targets: ["localhost:8000"] +``` diff --git a/docs/usage/monitoring.md b/docs/usage/monitoring.md new file mode 100644 index 0000000..6398f98 --- /dev/null +++ b/docs/usage/monitoring.md @@ -0,0 +1,58 @@ +# Community Monitoring + +The Status Bot can be used to monitore activity in community + +## Fetch Data + + **No personal data is collected from users.** + + +| Field | Hashed | Description | +|:----------------------|:---------|:------------------------------------------------------------| +| **id** | **Yes** | The message's ID | +| **whisper_timestamp** | No | The whisper timestamp of the message | +| **from** | **Yes** | The public key of the user | +| **message_type** | No | The message type | +| **seen** | No | True if the message has been seen otherwise False | +| **chat_id** | No | The chat ID is a combination of community ID and channel ID | +| **community_id** | No | The ID of the community | +| **response_to** | **Yes** | Ithe public key of the user who the response is for | +| **timestamp** | No | The timestamp of the message | +| **deleted** | No | True if the message was deleted otherwise False | + +Status Bot account information can be found in [`config.yaml`](./config.yaml). + +## How it works + +```mermaid +graph LR + subgraph Communities[Status App] + subgraph Status[Status Community] + StatusMessages[Messages] + StatusInfo[Information] + end + subgraph Logos[Logos Community] + LogosMessages[Messages] + LogosInfo[Information] + end + + end + + subgraph Bot[Docker Container] + RawDataLocal[(Raw Data)] + Script[monitor.py] + end + + subgraph IFT[IFT Infrastructure] + RawDataIFT[(Raw Data)] + ProcessedDataIFT[(Processed Data)] + + end + + Communities <--> |class Account| Script + Script --> |SHA256| RawDataLocal + RawDataLocal --> |Airbyte| RawDataIFT + RawDataIFT --> |dbt| ProcessedDataIFT +``` + + diff --git a/main.py b/main.py new file mode 100644 index 0000000..e0299de --- /dev/null +++ b/main.py @@ -0,0 +1,176 @@ +import os +import sys +import signal +import argparse + +from fastapi import FastAPI + +from bot import Account, Logger, Config, Database +from bot.metrics import start_prometheus +from bot.modules.manager import ModuleManager + + +def create_bot(config: Config, project_root: str) -> Account: + account = Account(**config.backend.model_dump()) + available_accounts = [a["display_name"] for a in account.available_accounts] + + display_name = config.bot.display_name + password = config.bot.password + + if display_name in available_accounts: + account.logger.info(f"Logging in with display name: {display_name}") + account.login( + display_name=display_name, + password=password, + infura_token=config.bot.infura_token, + coingecko_api_key=config.bot.coingecko_api_key + ) + elif config.bot.init_account: + mnemonic = config.bot.mnemonic_phrase + if not mnemonic: + raise ValueError( + "init_account is true but no mnemonic_phrase provided" + ) + account.logger.info(f"Creating/restoring account: {display_name}") + account.login( + display_name=display_name, + password=password, + mnemonic=mnemonic, + infura_token=config.bot.infura_token, + coingecko_api_key=config.bot.coingecko_api_key + ) + else: + raise ValueError( + f"Account '{display_name}' not found and init_account is false. " + f"Available accounts: {[a['display_name'] for a in available_accounts]}" + ) + + if account.info["compressed_key"] != config.bot.compressed_key: + raise Exception( + "Target compressed key and logged in compressed key are different." + ) + + profile_path = os.path.join(project_root, "assets", "profile.jpg") + account.profile_picture = profile_path + account.logger.info( + f"Account Information: {account.info['display_name']}\n" + f"\tCompressed Key: {account.info['compressed_key']}\n" + f"\tPublic Key: {account.info['public_key']}\n" + f"\tURL: {account.info['url']}" + ) + return account + + +def init_database(config: Config) -> Database: + return Database( + db_type=config.database.type, + host=config.database.host, + port=config.database.port, + user=config.database.user, + password=config.database.password, + name=config.database.name, + schema=config.database.schema, + ) + + +def main(): + parser = argparse.ArgumentParser( + description="Status Bot - Modular monitoring framework" + ) + parser.add_argument( + "--config", + default="./config.yaml", + help="Path to configuration file (default: ./config.yaml)", + ) + args = parser.parse_args() + + config_path = args.config + + logger = Logger() + + try: + logger.info("Loading the configuration") + Config._yaml_file = config_path + config = Config() + except Exception as e: + logger.error(f"Failed to load config: {e}") + sys.exit(1) + + logger.info("Status Bot starting...") + + project_root = os.path.dirname(os.path.abspath(__file__)) + + try: + logger.info(f"{config}") + account = create_bot(config, project_root) + except Exception as e: + logger.error(f"Failed to create bot account: {e}") + sys.exit(1) + + db = None + has_database = all([ + config.database.host, + config.database.user, + config.database.password, + config.database.name, + ]) + + if has_database: + try: + db = init_database(config) + db.init_tables() + logger.info(f"Database connection established ({config.database.type})") + except Exception as e: + logger.warning(f"Failed to connect to database: {e}") + logger.warning("Continuing without database connection") + else: + logger.info("No database configuration found, running without database") + + fastapi_app = FastAPI(title="Status Bot API") + shared_state = {"config": config, "project_root": project_root, "fastapi_app": fastapi_app} + + if config.api.enable and "api_server" not in config.modules.enabled: + config.modules.enabled.append("api_server") + + manager = ModuleManager(config.modules, account, db, logger, shared_state=shared_state) + manager.discover_modules() + manager.load_modules() + + start_prometheus(config.metrics, manager, logger) + + stop_event = manager._stop_event + + def handle_sigterm(signum, frame): + logger.info("Received SIGTERM, shutting down...") + stop_event.set() + + signal.signal(signal.SIGTERM, handle_sigterm) + signal.signal(signal.SIGINT, handle_sigterm) + + if manager.module_names: + logger.info( + f"Starting {len(manager.module_names)} module(s): " + f"{manager.module_names}" + ) + manager.start_all() + + try: + while manager.has_alive_modules(): + stop_event.wait(1) + except KeyboardInterrupt: + logger.info("Received keyboard interrupt, shutting down...") + else: + logger.info("No modules enabled. Bot running without modules.") + logger.info("Press Ctrl+C to stop.") + try: + while True: + stop_event.wait(1) + except KeyboardInterrupt: + logger.info("Received keyboard interrupt, shutting down...") + + manager.stop_all() + logger.info("Status Bot stopped") + + +if __name__ == "__main__": + main() diff --git a/monitor.py b/monitor.py deleted file mode 100644 index 3cc105d..0000000 --- a/monitor.py +++ /dev/null @@ -1,325 +0,0 @@ -import datetime, os, pickle, yaml, time -import pandas as pd -from typing import Any -from pathlib import Path -from dotenv import load_dotenv -from hashlib import sha256 -# Manual file imports -from bot import Account, Logger -from postgres import Postgres - -def to_sha256_hash(value: str) -> str: - """ - Hash personal information before it is put in the database. - - Parameters: - - `value` - personal information - - Output: - - sha256 hashed value - """ - return sha256(value.encode()).hexdigest() - -def to_midnight(timestamp: datetime.datetime) -> datetime.datetime: - """ - Convert the given timestamp to midnight - - Parameters: - - `timestamp` - current timestap - - Output: - - `timestamp` at midnight - """ - return timestamp.replace(minute=0, second=0, hour=0, microsecond=0) - -def load_config(file_path: str) -> dict: - """ - Load the config file and the `.env` variables - - Parameter: - - `file_path` - the file path of the config yaml file. The `.env` variable must be in the same folder - - Output: - - The config variables and secret from `.env` - """ - with open(file_path, "r") as f: - config: dict = yaml.safe_load(f) - - env_file_path = os.path.join(os.path.dirname(file_path), ".env") - load_dotenv(env_file_path) - - config["env_vars"] = { - key: value - for key, value in os.environ.items() - if key.startswith(("POSTGRES_", "STATUS_")) - } - - return config - -def extract_community_channels(account: Account, community: dict, latest_dates: dict[str, pd.Timestamp]) -> pd.DataFrame: - """ - Extract the community channel messages. - - Parameters: - - `account` - logged in Status Bot account - - `community` - the current community from `account` - - `start_timestamp` - start timestamp for message fetching - - `end_timestamp` - end timestamp for message fetching - - Output: - - DataFrame with all of the community messages for the given start and end timestamps - """ - # Column name -> True if data should be hashed - bridge_key = "bridge_message" - columns = { - "id": True, - "whisper_timestamp": False, - "from": True, - "seen": False, - "chat_id": False, - "community_id": False, - "message_type": False, - "response_to": True, - "timestamp": False, - "deleted": False, - "extracted_timestamp": False, - } - - final = [] - for channel in community["channels"]: - - now = datetime.datetime.now() - start_timestamp = latest_dates.get(channel["chat_id"]) - if start_timestamp: - start_timestamp += datetime.timedelta(seconds=1) - else: - # Node will only return known / fetched messages for this channel. - # Without enabling community archives feature the node can only fetch last 30 days (from store nodes). - start_timestamp = to_midnight(now - datetime.timedelta(days=30)) - - account.logger.info(f"Starting message extraction for # {channel['name']} [{start_timestamp} - {now}]") - messages = account.get_messages(channel["chat_id"], start_timestamp, now) - messages = pd.DataFrame(messages) - if len(messages) == 0: - account.logger.info(f"No messages found") - continue - - account.logger.info(f"Extracted {len(messages)} message(s)") - messages = messages.assign( - community_id = community["id"], - extracted_timestamp = now - ) - final.append(messages) - - extracted_data = pd.concat(final, ignore_index=True) if final else pd.DataFrame() - if len(extracted_data) == 0: - return extracted_data - - existing_columns = extracted_data.columns.to_list() - for column, should_hash in columns.items(): - if column not in existing_columns: - loc = len(extracted_data.columns.to_list()) - extracted_data.insert(loc, column, None) - continue - - if should_hash: - extracted_data[column] = extracted_data[column].astype(str).apply(to_sha256_hash) - - if bridge_key in extracted_data.columns: - extracted_data["source"] = extracted_data[bridge_key].apply(lambda value: value["bridgeName"] if not pd.isna(value) else "status") - else: - extracted_data["source"] = "status" - - extracted_data = extracted_data[list(columns.keys()) + ["source"]].assign( - deleted = extracted_data["deleted"].fillna(False), - seen = extracted_data["seen"].fillna(False) - ) - account.logger.info(f"Sensitive data has been hashed") - - return extracted_data - -def save_file(file_path: str, data: Any): - """ - Save data to a pickle file. Creates directories if they don't exist. - - Parameters: - - `file_path` - Full pikle path - - `data` - Python object to be saved - """ - folder = os.path.dirname(file_path) - if len(folder) > 0: - os.makedirs(folder, exist_ok=True) - - if isinstance(data, pd.DataFrame): - data.to_csv(file_path, index=False) - return - - with open(file_path, "wb") as f: - pickle.dump(data, f) - -def create_bot(config: dict) -> Account: - """ - Initialized a logged in bot account that will monitor the communities. - - Parameters: - - `config` - the `load_config` configuration - - Output: - - Logged in Bot account - """ - params = config.get("bot", {}).get("params", {}) - account = Account(**params) - available_accounts = [acc["display_name"] for acc in account.available_accounts] - - prefix = "STATUS_" - params = { - key.replace(prefix, "").lower(): value - for key, value in config["env_vars"].items() - if key.startswith(prefix) - } - if params["display_name"] in available_accounts: - params.pop("mnemonic") - - account.login(**params) - if account.info["compressed_key"] != config["bot"]["compressed_key"]: - raise Exception("Target compressed key and logged in compressed key are different...") - else: - account.logger.info("[SUCCESS] Logged in with correct account") - - balance = account["GBP"] - query = (balance["symbol"] == "SNT") & (balance["fiat_value"] > 0) & (balance["chain_id"] == 1) - if query.sum() != 1: - raise Exception("There were issues with Infura Token and Coingecko initialization...") - else: - account.logger.info("[SUCCESS] Wallet balance is available") - - account.profile_picture = os.path.join(os.path.dirname(__file__), "assets", "profile.jpg") - account.logger.info(f"Account Information:\nCompressed Key: {account.info['compressed_key']}\nPublic Key: {account.info['public_key']}\nURL: {account.info['url']}") - return account - -def download(account: Account, folder: str, config: dict): - """ - Download Status App messages / info from communities and store them in pickle files. - - Parameters: - - `folder` - the folder where the files will be created. Sub folders are automatically created - - `config` - the `load_config` configuration - """ - file_path = os.path.join(os.path.dirname(__file__), config["files"]["current_state"]) - latest_dates: dict[str, pd.Timestamp] = pd.read_pickle(file_path) if os.path.exists(file_path) else {} - - get_file_name = lambda: str(to_midnight(datetime.datetime.now()).timestamp()).replace(".", "") - communities = account.communities - if not communities: - account.logger.warning("No communities found...") - - for community in communities: - - if not community["is_member"]: - continue - - community_folder_name = community["name"].replace(" ", "-") - messages_folder = os.path.join(folder, "messages", community_folder_name) - community_info_folder = os.path.join(folder, "community", community_folder_name) - - account.logger.info(f"Extracting data for {community['name']}") - community["extracted_timestamp"] = datetime.datetime.now() - - file_path = os.path.join(community_info_folder, get_file_name() + ".pkl") - if not os.path.exists(file_path): - save_file(file_path, community) - account.logger.info(f"Created {file_path}") - - file_path = os.path.join(messages_folder, get_file_name() + ".csv") - if not os.path.exists(file_path): - messages = extract_community_channels(account, community, latest_dates) - if len(messages) > 0: - save_file(file_path, messages) - account.logger.info(f"Created {file_path}") - -def store(folder: str, config: dict, logger: Logger): - """ - Upload Status App `download` file to Postgres. - NOTE: The Postgres schema must already exist - - Parameters: - - `folder` - the folder where the files will be created. Sub folders are automatically created - - `config` - the `load_config` configuration - """ - path = Path(folder) - table_name_mapping: dict[str, str] = config["postgres"]["tables"] - table_schema = config["postgres"]["schema"] - - upload: dict[str, list] = {} - - file_path = os.path.join(os.path.dirname(__file__), config["files"]["current_state"]) - latest_dates: dict[str, pd.Timestamp] = pd.read_pickle(file_path) if os.path.exists(file_path) else {} - - completed = [] - - files = list(path.rglob("*.pkl")) + list(path.rglob("*.csv")) - logger.info(f"There are {len(files)} file(s) to upload") - for file_path in files: - - table_name = table_name_mapping.get(file_path.parent.parent.name) - if not table_name: - continue - - file_name = str(file_path.name) - data = pd.read_pickle(file_path) if file_name.endswith(".pkl") else pd.read_csv(file_path) - if isinstance(data, dict): - data = pd.DataFrame([data]) - - for column in data.columns: - if "timestamp" not in column: - continue - data[column] = pd.to_datetime(data[column]) - - if table_name not in upload: - upload[table_name] = [] - - if "timestamp" in data.columns: - latest_dates.update(data.groupby("chat_id")["timestamp"].max().to_dict()) - - upload[table_name].append(data) - completed.append(str(file_path)) - - save_file(config["files"]["current_state"], latest_dates) - logger.info(f"Updated {config['files']['current_state']}") - - prefix = "POSTGRES_" - params = { - key.replace(prefix, "").lower(): value - for key, value in config["env_vars"].items() - if key.startswith(prefix) - } - connector = Postgres(**params) - for table_name, data in upload.items(): - if len(data) == 0: - continue - - df = pd.concat(data, ignore_index=True).assign(batch_timestamp = datetime.datetime.now()) - json_columns = [ - column - for column in df.columns - if len(df[column].dropna()) > 0 and isinstance(df[column].dropna().reset_index(drop=True).iloc[0], (dict, list)) - ] - connector.insert(df, table_name, table_schema, json_columns) - logger.info(f"Uploaded {len(df)} record(s) to {table_schema}.{table_name}") - - for file_path in completed: - os.remove(file_path) - logger.info(f"Deleted {file_path}") - -if __name__ == "__main__": - folder = os.path.dirname(__file__) - config = load_config(os.path.join(folder, "config.yaml")) - upload_folder = os.path.join(os.path.dirname(__file__), "uploads") - logger = Logger() - account = create_bot(config) - - while True: - download(account, upload_folder, config) - store(upload_folder, config, logger) - logger.info(f"Sleeping for {config['sleep']} minute(s)") - time.sleep(config["sleep"] * 60) diff --git a/postgres.py b/postgres.py deleted file mode 100644 index 6a4512e..0000000 --- a/postgres.py +++ /dev/null @@ -1,149 +0,0 @@ -""" -Minimum code to upload data taken from: -https://github.com/status-im/ift-data-py/blob/master/ift_data/clients/postgres.py -""" - -import psycopg2 -import pandas as pd -from typing import Optional, Union -from sqlalchemy import create_engine -from sqlalchemy.dialects.postgresql import JSONB - -class Postgres: - - def __init__(self, username: str, password: str, port: Union[int, str], database: str, host: str): - - if isinstance(port, str): - port = int(port) - - self.__params = { - "host": host, - "user": username, - "password": password, - "port": port, - "database": database - } - - self.__url = f"postgresql://{username}:{password}@{host}:{port}/{database}" - self.__conn: psycopg2.extensions.connection = psycopg2.connect(**self.__params) - self.__cursor: psycopg2.extensions.cursor = self.__conn.cursor() - - def insert(self, data: pd.DataFrame, table_name: str, schema: str, json_columns: Optional[list] = None): - """ - Insert the DataFrame in the specified schema > table. - If the schema / table name does not exist, it will be created. - - Parameters: - - `data` - the data to be inserted in Postgres - - `table_name` - the name of the table - - `schema` - the name of the schema - - `json_columns` - when creating the table, `dict` columns will be turned into JSON objects in Postgres - """ - self.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}") - engine = create_engine(self.__url) - - data.columns = [column.lower() for column in data.columns] - - params = { - "name": table_name, - "con": engine, - "schema": schema, - "if_exists": "append", - "index": False - } - if json_columns: - params["dtype"] = { - json_column: JSONB - for json_column in json_columns - } - - # Add new columns as they come - existing_columns = self.get_columns(schema, table_name) - - if existing_columns: - for column in data.columns: - if column in existing_columns: - continue - # NOTE: New values will have to be transformed - self.execute(f"ALTER TABLE {schema}.{table_name} ADD COLUMN {column} TEXT") - - data.to_sql(**params) - - def execute(self, query: str): - """ - Execute queries such as INSERT, UPDATE, DELETE etc. - - Parameters: - - `query` - the PostgreSQL query - """ - self.__execute(query) - self.__conn.commit() - - def to_pandas(self, query: str, batch_size: int = 50_000, uppercase: bool = True) -> pd.DataFrame: - """ - Create a DataFrame from the given query - - Parameters: - - `query` - the PostgreSQL query - - `batch_size` - how many rows will be fetched at once - - `uppercase` - if `True` then the columns will be uppercase. If `False` the columns will be lowercase - Output: - - DataFrame for the executed query - """ - self.__execute(query) - columns = [column.name.upper() if uppercase else column.name.lower() for column in self.__cursor.description] - chunks = [] - - while True: - rows = self.__cursor.fetchmany(batch_size) - if not rows: - break - chunks.append(pd.DataFrame(rows, columns=columns)) - - return pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame(columns=columns) - - def close(self): - self.__cursor.close() - self.__conn.close() - - def __del__(self): - self.close() - - def __execute(self, query: str): - - failed = False - is_closed = bool(self.__conn.closed) - - if is_closed: - self.__conn: psycopg2.extensions.connection = psycopg2.connect(**self.__params) - self.__cursor: psycopg2.extensions.cursor = self.__conn.cursor() - - try: - self.__cursor.execute(query) - except psycopg2.errors.InFailedSqlTransaction: - self.__conn.rollback() - failed = True - - if failed: - self.__cursor.execute(query) - - - def get_columns(self, schema: str, table_name: str) -> list[str]: - """ - Get the column names in the correct order for the given table. - - Parameters: - - `table_name` - the name of the table - - `schema` - the name of the schema - - Output: - - the table's columns in the correct order - """ - query = f""" - SELECT column_name - FROM information_schema.columns - WHERE table_name = '{table_name}' - AND table_schema = '{schema}' - ORDER BY ordinal_position ASC - """ - return self.to_pandas(query)["COLUMN_NAME"].to_list() diff --git a/requirements.txt b/requirements.txt index 3307083..e5d2708 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,8 @@ pyyaml python-dotenv psycopg2-binary sqlalchemy +prometheus-client +pydantic>=2.0 +pydantic-settings>=2.0 +fastapi +uvicorn \ No newline at end of file