Files
Marc 7196e334a2 v1.1.0: per-room services, send_to_user, mautrix bridge support, searchable room picker
- Add per-room convenience actions (matrix_messenger.send_to_<roomname>)
- Add send_to_user action: finds existing portal/DM room or creates one;
  supports mautrix-whatsapp, -signal, -telegram puppet IDs
- Inject service descriptions dynamically so room dropdowns show friendly
  names instead of room IDs (full-state sync + direct state API fallback)
- Switch all room selectors to searchable dropdown mode
- Fix _find_or_create_dm to match bridge portal rooms (3+ members)
- Fix async_get_joined_rooms to use full_state sync
- Bump version to 1.1.0

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-25 15:46:28 +02:00

312 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Async Matrix client wrapper with E2EE support via matrix-nio."""
from __future__ import annotations
import logging
import os
from typing import Any, Awaitable, Callable
from nio import (
AsyncClient,
AsyncClientConfig,
JoinedRoomsResponse,
KeysUploadResponse,
LoginResponse,
MegolmEvent,
RoomCreateResponse,
RoomGetStateEventResponse,
RoomMessageText,
RoomSendResponse,
SyncResponse,
UnknownEvent,
WhoamiResponse,
)
_LOGGER = logging.getLogger(__name__)
MessageCallback = Callable[[Any, Any], Awaitable[None]]
class MatrixClientError(Exception):
"""Raised on unrecoverable Matrix client errors."""
class MatrixClient:
"""Async wrapper around nio.AsyncClient with E2EE and callback support."""
def __init__(self, homeserver: str, user_id: str, store_path: str) -> None:
self._homeserver = homeserver
self._user_id = user_id
self._store_path = store_path
self._client: AsyncClient | None = None
self._message_callbacks: list[MessageCallback] = []
self._reaction_callbacks: list[MessageCallback] = []
self._encryption_enabled = False
async def async_setup(self) -> None:
"""Create the nio client. Must be called before any other method."""
os.makedirs(self._store_path, exist_ok=True)
try:
config = AsyncClientConfig(
max_limit_exceeded=0,
max_timeouts=0,
store_sync_tokens=True,
encryption_enabled=True,
)
self._encryption_enabled = True
except ImportWarning:
_LOGGER.warning(
"E2EE-Abhängigkeiten (python-olm) nicht installiert "
"Verschlüsselung deaktiviert. Für E2EE 'matrix-nio[e2e]' installieren."
)
config = AsyncClientConfig(
max_limit_exceeded=0,
max_timeouts=0,
store_sync_tokens=True,
encryption_enabled=False,
)
self._client = AsyncClient(
homeserver=self._homeserver,
user=self._user_id,
store_path=self._store_path,
config=config,
)
self._client.add_event_callback(self._on_message, RoomMessageText)
self._client.add_event_callback(self._on_unknown_event, UnknownEvent)
if self._encryption_enabled:
self._client.add_event_callback(self._on_megolm, MegolmEvent)
# ------------------------------------------------------------------
# Login helpers
# ------------------------------------------------------------------
async def async_login_password(self, password: str, device_name: str) -> tuple[str, str]:
"""Login with username/password. Returns (access_token, device_id)."""
resp = await self._client.login(password, device_name=device_name)
if not isinstance(resp, LoginResponse):
raise MatrixClientError(f"Login fehlgeschlagen: {resp}")
await self._upload_keys_if_needed()
return resp.access_token, resp.device_id
async def async_restore_login(self, access_token: str, device_id: str) -> str:
"""Restore an existing session. Returns the device_id (fetched if empty)."""
self._client.access_token = access_token
self._client.user_id = self._user_id
if not device_id:
resp = await self._client.whoami()
if isinstance(resp, WhoamiResponse):
device_id = resp.device_id or ""
else:
_LOGGER.warning("whoami() failed: %s", resp)
if device_id:
self._client.restore_login(
user_id=self._user_id,
device_id=device_id,
access_token=access_token,
)
if self._encryption_enabled:
self._client.load_store()
await self._upload_keys_if_needed()
return device_id
async def async_whoami_device_id(self, access_token: str) -> tuple[str, str]:
"""Fetch (user_id, device_id) for a given access token (used during config flow)."""
self._client.access_token = access_token
resp = await self._client.whoami()
if isinstance(resp, WhoamiResponse):
return resp.user_id or self._user_id, resp.device_id or ""
raise MatrixClientError(f"Konnte Gerätedaten nicht abrufen: {resp}")
# ------------------------------------------------------------------
# Core operations
# ------------------------------------------------------------------
async def async_get_joined_rooms(self) -> dict[str, str]:
"""Return {room_id: display_name} for all joined rooms."""
rooms_resp = await self._client.joined_rooms()
if not isinstance(rooms_resp, JoinedRoomsResponse):
_LOGGER.error("Konnte Raumliste nicht laden: %s", rooms_resp)
return {}
room_ids = list(rooms_resp.rooms)
return await self.async_get_room_names(room_ids)
async def async_get_room_names(self, room_ids: list[str]) -> dict[str, str]:
"""Fetch room display names.
Strategy:
1. Direct state API (m.room.name / m.room.canonical_alias) works
without syncing and is unaffected by incremental sync tokens.
2. full_state sync fallback populates client.rooms with current
state so display_name can be calculated from room name / member list.
"""
result: dict[str, str] = {}
# --- Strategy 1: direct state API ---
for room_id in room_ids:
name = room_id
for event_type, field in (
("m.room.name", "name"),
("m.room.canonical_alias", "alias"),
):
try:
resp = await self._client.room_get_state_event(room_id, event_type)
# Use duck-typing so we work across different nio versions
content = getattr(resp, "content", None)
_LOGGER.debug(
"state_event(%s, %s) → %s content=%s",
room_id, event_type, type(resp).__name__, content,
)
if isinstance(content, dict):
val = str(content.get(field, "")).strip()
if val:
name = val
break
except Exception as exc:
_LOGGER.debug("state_event(%s, %s) exception: %s", room_id, event_type, exc)
result[room_id] = name
# --- Strategy 2: full_state sync for rooms still unresolved ---
unresolved = [rid for rid in room_ids if result.get(rid) == rid]
if unresolved:
_LOGGER.debug(
"%d room(s) unresolved after state API trying full_state sync", len(unresolved)
)
try:
sync_resp = await self._client.sync(timeout=12000, full_state=True)
if isinstance(sync_resp, SyncResponse):
for room_id in unresolved:
room = self._client.rooms.get(room_id)
if room:
name = getattr(room, "display_name", None) or getattr(room, "name", None)
_LOGGER.debug(" full_state display_name(%s) → %r", room_id, name)
if name and name != room_id:
result[room_id] = name
except Exception as exc:
_LOGGER.warning("full_state sync fehlgeschlagen: %s", exc)
return result
def get_room_display_names(self) -> dict[str, str]:
"""Return {room_id: display_name} from currently synced room state."""
if not self._client:
return {}
result = {}
for room_id, room in self._client.rooms.items():
name = (
getattr(room, "display_name", None)
or getattr(room, "name", None)
or room_id
)
result[room_id] = name
return result
async def async_send_message(self, room_id: str, message: str) -> bool:
"""Send a plain-text message. Returns True on success."""
resp = await self._client.room_send(
room_id=room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": message},
)
if not isinstance(resp, RoomSendResponse):
_LOGGER.error("send_message fehlgeschlagen (%s): %s", room_id, resp)
return False
return True
async def async_sync_once(self, timeout_ms: int = 5000, full_state: bool = False) -> None:
"""Perform one Matrix /sync call.
full_state=True forces the server to return complete room state (incl.
m.room.name) regardless of any stored sync token.
"""
resp = await self._client.sync(timeout=timeout_ms, full_state=full_state or None)
if not isinstance(resp, SyncResponse):
_LOGGER.debug("Sync fehlgeschlagen: %s", resp)
async def async_send_to_user(self, user_id: str, message: str) -> bool:
"""Send a direct message to a Matrix user. Finds or creates a DM room."""
room_id = await self._find_or_create_dm(user_id)
if not room_id:
return False
return await self.async_send_message(room_id, message)
async def _find_or_create_dm(self, user_id: str) -> str | None:
"""Return an existing room_id for user_id, or create one.
Bridge portal rooms (mautrix-whatsapp, -signal, -telegram) have 3+
members (user + puppet + bridge bot), so we search all joined rooms
for one that contains the target user and prefer the one with the
fewest members (most likely a direct/portal room).
"""
await self.async_sync_once(timeout_ms=5000)
my_id = self._client.user_id
candidates: list[tuple[int, str]] = []
for room_id, room in self._client.rooms.items():
joined = [
uid for uid, member in room.users.items()
if getattr(member, "membership", None) == "join"
]
if user_id in joined and my_id in joined:
candidates.append((len(joined), room_id))
if candidates:
candidates.sort() # fewest members first → most likely the direct/portal room
return candidates[0][1]
# No existing room found try to create a DM (works for native Matrix
# users; bridge puppets usually require the bridge bot to initiate).
resp = await self._client.room_create(is_direct=True, invite=[user_id])
if isinstance(resp, RoomCreateResponse):
return resp.room_id
_LOGGER.error("Konnte Raum für %s nicht erstellen: %s", user_id, resp)
return None
async def async_close(self) -> None:
"""Close the HTTP session."""
if self._client:
await self._client.close()
self._client = None
# ------------------------------------------------------------------
# Callback registration
# ------------------------------------------------------------------
def add_message_callback(self, cb: MessageCallback) -> None:
self._message_callbacks.append(cb)
def add_reaction_callback(self, cb: MessageCallback) -> None:
self._reaction_callbacks.append(cb)
# ------------------------------------------------------------------
# Internal nio callbacks
# ------------------------------------------------------------------
async def _on_message(self, room: Any, event: RoomMessageText) -> None:
for cb in self._message_callbacks:
try:
await cb(room, event)
except Exception:
_LOGGER.exception("Fehler im Nachrichten-Callback")
async def _on_unknown_event(self, room: Any, event: UnknownEvent) -> None:
if event.type == "m.reaction":
for cb in self._reaction_callbacks:
try:
await cb(room, event)
except Exception:
_LOGGER.exception("Fehler im Reaktions-Callback")
async def _on_megolm(self, room: Any, event: MegolmEvent) -> None:
_LOGGER.debug("Undecryptable MegolmEvent in %s (fehlende Session-Keys?)", room.room_id)
async def _upload_keys_if_needed(self) -> None:
if not self._encryption_enabled:
return
if self._client.should_upload_keys:
resp = await self._client.keys_upload()
if not isinstance(resp, KeysUploadResponse):
_LOGGER.warning("E2EE Key-Upload fehlgeschlagen: %s", resp)
await self._client.keys_query()