Home Assistant (maximeallanic)
Controls Home Assistant devices, automations, dashboards, HACS, cameras, and Fully Kiosk tablets with 70+ tools via REST and WebSocket APIs.
0/100
0Tools
12Findings
0Stars
—Downloads
Mar 22, 2026Last Scanned
Score Breakdown5 categories
Code85
Dependencies55
Config0
Description100
Behavior100
OWASP MCP Top 10 Coverage
MCP10-supply-chainMCP10-supply-chainFail
MCP02-tool-poisoningMCP02-tool-poisoningPass
MCP07-insecure-configMCP07-insecure-configFail
MCP08-dependency-vulnMCP08-dependency-vulnFail
MCP01-prompt-injectionMCP01-prompt-injectionPass
MCP03-command-injectionMCP03-command-injectionPass
MCP04-data-exfiltrationMCP04-data-exfiltrationFail
MCP09-logging-monitoringMCP09-logging-monitoringFail
MCP05-privilege-escalationMCP05-privilege-escalationFail
MCP06-excessive-permissionsMCP06-excessive-permissionsFail
Findings12
3critical
6high
2medium
1low
0informational
criticalQ13MCP Bridge Package Supply Chain AttackMCP10-supply-chainAML.T0054
Pattern "(?:mcp|fastmcp|langchain-mcp|llama-index-mcp)(?:>=|~=|==)?(?!\d)" matched in source_code: "MCP" (at position 70)
MCP bridge packages (mcp-remote, mcp-proxy, @modelcontextprotocol/sdk, fastmcp) are high-value supply chain targets — CVE-2025-6514 (CVSS 9.6) in mcp-remote affected 437,000+ installs. Always pin exact versions (no ^ or ~ ranges). Use lockfiles (package-lock.json, pnpm-lock.yaml, uv.lock). Never run `npx mcp-remote` without version pinning. Verify package integrity with `npm audit` or `pip-audit` before deployment. Reference: CVE-2025-6514, OWASP ASI04.
criticalL7Transitive MCP Server DelegationMCP06-excessive-permissionsAML.T0054
Pattern "(?:connect|initialize).*(?:mcp|modelcontextprotocol).*(?:server|endpoint|url)" matched in source_code: "Initialize MCP server" (at position 40285)
MCP servers MUST NOT create client connections to other MCP servers without explicit user disclosure. If delegation is required, declare all downstream servers in the server's capabilities and tool descriptions. Never forward user credentials to sub-servers. Implement a trust boundary between the approved server and any delegated servers. Log all transitive delegations for audit.
criticalK8Cross-Boundary Credential SharingMCP05-privilege-escalationAML.T0054
Pattern "(forward|pass|send|relay|proxy|propagate)[_\s-]?(token|credential|api[_\s-]?key|secret|password|auth)" matched in source_code: "Send auth" (at position 2889)
Never forward, share, or embed credentials across trust boundaries. Use OAuth token exchange (RFC 8693) to create scoped, delegated tokens instead of passing original credentials. Never include credentials in tool responses. Required by ISO 27001 A.5.17 and OWASP ASI03.
highD1Known CVEs in DependenciesMCP08-dependency-vuln
Dependency "pillow@10.0.0" has known CVEs:
Update dependencies to versions that patch known CVEs. Run 'npm audit fix' or 'pip-audit' to identify and resolve vulnerable dependencies.
highC3Server-Side Request Forgery (SSRF)MCP04-data-exfiltrationAML.T0057
Pattern "\.(?:get|post|put|delete|patch|request)\s*\([^)]*(?:url|uri|target|endpoint|host)[^)]*(?:req|input|param|args|f['"]|\+)" matched in source_code: ".request(
method, url, headers=self.headers, **kwargs" (at position 10166)
Validate ALL user-supplied URLs before making HTTP requests:
1. Parse the URL and check the hostname against an explicit allowlist of permitted domains.
2. Block requests to RFC 1918 private ranges: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16.
3. Block loopback (127.0.0.0/8), link-local (169.254.0.0/16), and IPv6 equivalents.
4. Block file:// and other non-http(s) protocols explicitly.
5. Disable automatic redirect following, or re-validate each redirect destination.
6. In cloud environments: block requests to IMDS endpoints (169.254.169.254,
metadata.google.internal) at both the application AND network layer.
Example (Node.js): Use the `ssrf-req-filter` package or implement URL validation
against an allowlist before calling fetch/axios/got.
highD1Known CVEs in DependenciesMCP08-dependency-vuln
Dependency "aiohttp@3.8.0" has known CVEs:
Update dependencies to versions that patch known CVEs. Run 'npm audit fix' or 'pip-audit' to identify and resolve vulnerable dependencies.
highD1Known CVEs in DependenciesMCP08-dependency-vuln
Dependency "mcp@1.0.0" has known CVEs:
Update dependencies to versions that patch known CVEs. Run 'npm audit fix' or 'pip-audit' to identify and resolve vulnerable dependencies.
highN2JSON-RPC Notification FloodingMCP07-insecure-configAML.T0054
Pattern "(?:sendNotification|send_notification|notify|emit)\s*\((?!.*(?:queue|throttle|rateLimit|backpressure|bounded))" matched in source_code: "send_notification(" (at position 29384)
Implement bounded notification queues (recommended max size: 100 per subscription). Apply backpressure — when the queue is full, drop oldest notifications or pause the producer. Enforce per-client notification rate limits. JSON-RPC notifications have no response, so there is no natural flow control — servers MUST implement it explicitly.
highO8Timing-Based Covert ChannelMCP04-data-exfiltrationAML.T0057
Pattern "(?:delay|sleep|timeout|wait)\s*[:=]\s*(?:[^;]*(?:charCodeAt|charAt|bit|byte|\&\s*1|>>|<<|\[i\]))" matched in source_code: "timeout=aiohttp.ClientTimeout(total=30)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._ws_close()
if self.session:
await self.session.close()
# --- WebSocket methods ---
async def _ws_ensure_connected(self):
"""Ensure WebSocket is connected and authenticated"""
if self._ws is not None and not self._ws.closed:
return
if not self.session:
raise RuntimeError("Client not initialized. Use async context manager.")
logger.info(f"Connecting WebSocket to {self._ws_url}")
self._ws = await self.session.ws_connect(self._ws_url)
self._ws_authenticated = False
self._ws_id = 0
# Wait for auth_required message
msg = await self._ws.receive_json()
if msg.get("type") != "auth_required":
raise RuntimeError(f"Expected auth_required, got: {msg.get('type')}")
# Send auth
await self._ws.send_json({
"type": "auth",
"access_token": self.token,
})
# Wait for auth response
msg = await self._ws.receive_json()
if msg.get("type") != "auth_ok":
raise RuntimeError(f"WebSocket auth failed: {msg}")
self._ws_authenticated = True
logger.info("WebSocket authenticated successfully")
async def _ws_send_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
"""Send a WebSocket command and wait for result"""
await self._ws_ensure_connected()
self._ws_id += 1
command["id"] = self._ws_id
await self._ws.send_json(command)
# Read messages until we get the response for our id
while True:
msg = await self._ws.receive_json()
if msg.get("id") == self._ws_id:
if not msg.get("success", False):
error = msg.get("error", {})
raise RuntimeError(
f"WebSocket command failed: {error.get('code', 'unknown')} - {error.get('message', str(error))}"
)
return msg.get("result")
async def _ws_close(self):
"""Close WebSocket connection"""
if self._ws is not None and not self._ws.closed:
await self._ws.close()
self._ws = None
self._ws_authenticated = False
async def ws_command(self, type: str, data: Dict[str, Any] = None) -> Any:
"""Send a generic WebSocket command."""
command = {"type": type}
if data:
command.update(data)
return await self._ws_send_command(command)
async def get_dashboards(self) -> Any:
return await self.ws_command("lovelace/dashboards/list")
async def get_dashboard_config(self, url_path: Optional[str] = None) -> Any:
cmd = {"type": "lovelace/config"}
if url_path:
cmd["url_path"] = url_path
return await self._ws_send_command(cmd)
async def save_dashboard_config(self, config: Dict, url_path: Optional[str] = None, force: bool = False) -> Any:
cmd = {"type": "lovelace/config/save", "config": config}
if url_path:
cmd["url_path"] = url_path
if force:
cmd["force"] = True
return await self._ws_send_command(cmd)
async def create_dashboard(self, title: str, url_path: str, icon: Optional[str] = None,
show_in_sidebar: bool = True, require_admin: bool = False) -> Any:
cmd = {"type": "lovelace/dashboards/create", "title": title,
"url_path": url_path, "mode": "storage",
"show_in_sidebar": show_in_sidebar, "require_admin": require_admin}
if icon:
cmd["icon"] = icon
return await self._ws_send_command(cmd)
async def delete_dashboard(self, dashboard_id: str) -> Any:
return await self._ws_send_command({"type": "lovelace/dashboards/delete", "dashboard_id": dashboard_id})
async def get_lovelace_resources(self) -> Any:
return await self.ws_command("lovelace/resources")
# Built-in HA Lovelace cards (static list, current as of HA 2024+)
BUILTIN_CARDS = [
"alarm-panel", "area", "button", "calendar", "camera", "conditional",
"energy-carbon-consumed-gauge", "energy-date-selection",
"energy-devices-detail-graph", "energy-devices-graph",
"energy-distribution", "energy-gas-gauge", "energy-grid-neutrality-gauge",
"energy-sources-table", "energy-solar-consumed-gauge",
"energy-solar-graph", "energy-summary", "energy-usage-graph",
"energy-water-gauge",
"entities", "entity", "entity-filter", "gauge", "glance", "grid",
"history-graph", "horizontal-stack", "humidifier", "iframe", "light",
"logbook", "map", "markdown", "media-control", "picture",
"picture-elements", "picture-entity", "picture-glance", "plant-status",
"sensor", "shopping-list", "statistic", "statistics-graph", "thermostat",
"tile", "todo-list", "vertical-stack", "weather-forecast", "webpage",
]
async def list_lovelace_cards(self) -> Dict[str, Any]:
"""List all available Lovelace cards: built-in, registered resources, and HACS plugins."""
result: Dict[str, Any] = {
"builtin_cards": [f"card-{c}" for c in self.BUILTIN_CARDS],
"registered_resources": [],
"hacs_plugins": [],
}
# Registered Lovelace resources
try:
resources = await self.get_lovelace_resources()
result["registered_resources"] = resources or []
except Exception as e:
result["registered_resources_error"] = str(e)
# HACS installed plugins
try:
repos = await self.hacs_list_repositories(categories=["plugin"])
result["hacs_plugins"] = [
{
"name": r.get("name"),
"full_name": r.get("full_name"),
"installed": r.get("installed", False),
"version_installed": r.get("installed_version") or r.get("version_installed"),
"version_available": r.get("available_version") or r.get("last_version"),
"description": r.get("description"),
}
for r in (repos if isinstance(repos, list) else [])
if r.get("installed")
]
except Exception as e:
result["hacs_plugins_error"] = str(e)
return result
# --- HACS methods ---
async def hacs_status(self) -> Any:
return await self.ws_command("hacs/info")
async def hacs_list_repositories(self, categories: Optional[List[str]] = None) -> Any:
data: Dict[str, Any] = {}
if categories:
data["categories"] = categories
return await self.ws_command("hacs/repositories/list", data or None)
async def hacs_repository_info(self, repository_id: str) -> Any:
return await self.ws_command("hacs/repository/info", {"repository_id": repository_id})
async def hacs_download(self, repository_id: str, version: Optional[str] = None) -> Any:
data: Dict[str, Any] = {"repository": repository_id}
if version:
data["version"] = version
return await self.ws_command("hacs/repository/download", data)
async def hacs_remove(self, repository_id: str) -> Any:
return await self.ws_command("hacs/repository/remove", {"repository": repository_id})
async def hacs_refresh(self, repository_id: str) -> Any:
return await self.ws_command("hacs/repository/refresh", {"repository": repository_id})
async def hacs_releases(self, repository_id: str) -> Any:
return await self.ws_command("hacs/repository/releases", {"repository_id": repository_id})
async def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
"""Make HTTP request to Home Assistant API"""
if not self.session:
raise RuntimeError("Client not initialized. Use async context manager.")
url = urljoin(self.base_url, endpoint)
try:
async with self.session.request(
method, url, headers=self.headers, **kwargs
) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
logger.error(f"HTTP request failed: {e}")
raise
except Exception as e:
logger.error(f"Request failed: {e}")
raise
async def get_states(self) -> List[Dict[str, Any]]:
"""Get all entity states"""
return await self._request("GET", "/api/states")
async def get_state(self, entity_id: str) -> Dict[str, Any]:
"""Get state of specific entity"""
return await self._request("GET", f"/api/states/{entity_id}")
async def call_service(self, domain: str, service: str, service_data: Optional[Dict] = None) -> List[Dict[str, Any]]:
"""Call a Home Assistant service"""
endpoint = f"/api/services/{domain}/{service}"
data = service_data or {}
return await self._request("POST", endpoint, json=data)
async def get_config(self) -> Dict[str, Any]:
"""Get Home Assistant configuration"""
return await self._request("GET", "/api/config")
async def get_services(self) -> List[Dict[str, Any]]:
"""Get all available services"""
return await self._request("GET", "/api/services")
async def get_events(self) -> List[Dict[str, Any]]:
"""Get event types"""
return await self._request("GET", "/api/events")
async def fire_event(self, event_type: str, event_data: Optional[Dict] = None) -> Dict[str, Any]:
"""Fire an event"""
endpoint = f"/api/events/{event_type}"
data = event_data or {}
return await self._request("POST", endpoint, json=data)
async def get_history(self, start_time: Optional[str] = None, end_time: Optional[str] = None,
filter_entity_id: Optional[str] = None, minimal_response: bool = False,
no_attributes: bool = False, significant_changes_only: bool = False) -> List[Dict[str, Any]]:
"""Get historical data"""
if start_time:
endpoint = f"/api/history/period/{start_time}"
else:
endpoint = "/api/history/period"
params = {}
if end_time:
params["end_time"] = end_time
if filter_entity_id:
params["filter_entity_id"] = filter_entity_id
if minimal_response:
params["minimal_response"] = "true"
if no_attributes:
params["no_attributes"] = "true"
if significant_changes_only:
params["significant_changes_only"] = "true"
return await self._request("GET", endpoint, params=params)
async def get_logbook(self, start_time: Optional[str] = None, end_time: Optional[str] = None,
entity: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get logbook entries"""
if start_time:
endpoint = f"/api/logbook/{start_time}"
else:
endpoint = "/api/logbook"
params = {}
if end_time:
params["end_time"] = end_time
if entity:
params["entity"] = entity
return await self._request("GET", endpoint, params=params)
async def get_error_log(self) -> str:
"""Get error log"""
response = await self._request("GET", "/api/error_log")
return response if isinstance(response, str) else str(response)
async def get_calendars(self) -> List[Dict[str, Any]]:
"""Get list of calendar entities"""
return await self._request("GET", "/api/calendars")
async def get_calendar_events(self, calendar_id: str, start: str, end: str) -> List[Dict[str, Any]]:
"""Get calendar events for a specific calendar"""
endpoint = f"/api/calendars/{calendar_id}"
params = {"start": start, "end": end}
return await self._request("GET", endpoint, params=params)
async def get_camera_proxy(self, camera_entity_id: str) -> byte" (at position 1920)
Remove all code that calculates sleep/delay durations from application data, secrets, or any variable-length content. Tool response times should be constant or determined only by legitimate processing time. If rate limiting is needed, use fixed intervals not derived from data values. Monitor for anomalous response time patterns that could indicate timing-based exfiltration.
mediumK20Insufficient Audit Context in LoggingMCP09-logging-monitoringAML.T0054
Pattern "logger\.(info|warn|error)\s*\(.*(?:tool|request|handle|invoke)(?!.*(?:requestId|correlationId|traceId|spanId|agent[_\s-]?id|user[_\s-]?id))" matched in source_code: "logger.error(f"HTTP request" (at position 10405)
Use structured logging that includes all five ISO 27001 A.8.15 fields: (1) WHO — agent/user identity, (2) WHAT — tool name and operation, (3) WHEN — ISO 8601 timestamp, (4) WHERE — server ID and correlation ID, (5) OUTCOME — success/failure and result summary. Replace console.log with structured loggers (pino, winston). Add correlation IDs for request tracing across multi-agent chains.
mediumK17Missing Timeout or Circuit BreakerMCP07-insecure-configAML.T0054
Pattern "(?:fetch|axios|got|request|urllib|httpx|http\.get|http\.post)\s*\((?!.*(?:timeout|signal|AbortSignal|deadline|cancel))" matched in source_code: "request(" (at position 9814)
Add timeouts to ALL external calls: HTTP requests (30s), database queries (10s), subprocess execution (60s), and MCP tool calls (30s). Implement circuit breakers that open after N consecutive failures (e.g., opossum, cockatiel). Use AbortSignal for cancellable operations. Required by EU AI Act Art. 15 and OWASP ASI08.
lowF4MCP Spec Non-ComplianceMCP07-insecure-config
Server fails MCP spec compliance checks: required:server_name; required:server_version; required:protocol_version; recommended:tool_descriptions; recommended:parameter_descriptions
Follow the MCP specification for server metadata. Include server name, version, and protocol version. Provide descriptions for all tools and parameters.
Security Category Deep Dive
Sub-Category Tree · Remediation Roadmap · Attack Stories · Compliance Overlay · ATLAS Techniques · Maturity Model
Prompt Injection
Prompt & context manipulation attacks
69
MATURITY
14
RULES
5
SUB-CATS
1
GAPS
64%
IMPL.
56
TESTS
1
STORIES
100%3 rules
Injection via tool descriptions and parameter fields
GAP-001Prompt Injection Coverage GapMissing detection coverage for emerging prompt injection attack variants not addressed by current rules
100%4 rules
Hidden instructions via external content and tool responses
100%2 rules
Context window saturation and prior-approval exploitation
100%3 rules
Payload hiding via invisible chars, base64, schema fields
100%2 rules
Injection via prompt templates and runtime tool output