8000 Add Weatherflow Cloud wind support via websocket by jeeftor · Pull Request #125611 · home-assistant/core · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add Weatherflow Cloud wind support via websocket #125611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: dev
Choose a base branch
from
109 changes: 103 additions & 6 deletions homeassistant/components/weatherflow_cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,127 @@

from __future__ import annotations

import asyncio
from dataclasses import dataclass

from weatherflow4py.api import WeatherFlowRestAPI
from weatherflow4py.models.ws.obs import WebsocketObservation
from weatherflow4py.models.ws.types import EventType
from weatherflow4py.models.ws.websocket_request import (
ListenStartMessage,
RapidWindListenStartMessage,
)
from weatherflow4py.models.ws.websocket_response import EventDataRapidWind, RapidWindWS
from weatherflow4py.ws import WeatherFlowWebsocketAPI

from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.const import CONF_API_TOKEN, Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession

from .const import DOMAIN
from .coordinator import WeatherFlowCloudDataUpdateCoordinator
from .const import DOMAIN, LOGGER
from .coordinator import (
WeatherFlowCloudDataCallbackCoordinator,
WeatherFlowCloudUpdateCoordinatorREST,
)

PLATFORMS: list[Platform] = [Platform.SENSOR, Platform.WEATHER]


@dataclass
class WeatherFlowCoordinators:
"""Data Class for Entry Data."""

rest: WeatherFlowCloudUpdateCoordinatorREST
wind: WeatherFlowCloudDataCallbackCoordinator[
EventDataRapidWind, RapidWindListenStartMessage, RapidWindWS
]
observation: WeatherFlowCloudDataCallbackCoordinator[
WebsocketObservation, ListenStartMessage, WebsocketObservation
]


async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up WeatherFlowCloud from a config entry."""

data_coordinator = WeatherFlowCloudDataUpdateCoordinator(hass, entry)
await data_coordinator.async_config_entry_first_refresh()
LOGGER.debug("Initializing WeatherFlowCloudDataUpdateCoordinatorREST coordinator")
8000
rest_api = WeatherFlowRestAPI(
api_token=entry.data[CONF_API_TOKEN], session=async_get_clientsession(hass)
)

stations = await rest_api.async_get_stations()

# Define Rest Coordinator
rest_data_coordinator = WeatherFlowCloudUpdateCoordinatorREST(
hass=hass, config_entry=entry, rest_api=rest_api, stations=stations
)

# Initialize the stations
await rest_data_coordinator.async_config_entry_first_refresh()

hass.data.setdefault(DOMAIN, {})[entry.entry_id] = data_coordinator
# Construct Websocket Coordinators
LOGGER.debug(
"Initializing WeatherFlowCloudDataUpdateCoordinatorWebsocketWind coordinator"
)
websocket_device_ids = rest_data_coordinator.device_ids

# Build API once
websocket_api = WeatherFlowWebsocketAPI(
access_token=entry.data[CONF_API_TOKEN], device_ids=websocket_device_ids
)

websocket_observation_coordinator = WeatherFlowCloudDataCallbackCoordinator[
WebsocketObservation, WebsocketObservation, ListenStartMessage
](
hass=hass,
config_entry=entry,
rest_api=rest_api,
websocket_api=websocket_api,
stations=stations,
listen_request_type=ListenStartMessage,
event_type=EventType.OBSERVATION,
)

websocket_wind_coordinator = WeatherFlowCloudDataCallbackCoordinator[
EventDataRapidWind, EventDataRapidWind, RapidWindListenStartMessage
](
hass=hass,
config_entry=entry,
stations=stations,
rest_api=rest_api,
websocket_api=websocket_api,
listen_request_type=RapidWindListenStartMessage,
event_type=EventType.RAPID_WIND,
)

# Run setup method.
await asyncio.gather(
websocket_wind_coordinator.async_setup(),
websocket_observation_coordinator.async_setup(),
)

hass.data.setdefault(DOMAIN, {})[entry.entry_id] = WeatherFlowCoordinators(
rest_data_coordinator,
8000 websocket_wind_coordinator,
websocket_observation_coordinator,
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

# Websocket disconnect handler
async def _async_disconnect_websocket() -> None:
await websocket_api.stop_all_listeners()
await websocket_api.close()

# Register a websocket shutdown handler
entry.async_on_unload(_async_disconnect_websocket)

return True


async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""

if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
hass.data[DOMAIN].pop(entry.entry_id)

Expand Down
14 changes: 9 additions & 5 deletions homeassistant/components/weatherflow_cloud/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,15 @@ async def async_step_reauth_confirm(
errors = await _validate_api_token(api_token)
if not errors:
# Update the existing entry and abort
return self.async_update_reload_and_abort(
self._get_reauth_entry(),
data={CONF_API_TOKEN: api_token},
reload_even_if_entry_is_unchanged=False,
)
if existing_entry := self.hass.config_entries.async_get_entry(
self.context["entry_id"]
):
return self.async_update_reload_and_abort(
existing_entry,
data={CONF_API_TOKEN: api_token},
reason="reauth_successful",
reload_even_if_entry_is_unchanged=False,
)
Comment on lines +52 to +60
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure I saw it in another integration. Is it wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also appears the file is 100% covered by tests so I guess Its at least being tested. Hard cause I did this so long ago to remember things in my advanced age. :)


return self.async_show_form(
step_id="reauth_confirm",
Expand Down
5 changes: 4 additions & 1 deletion homeassistant/components/weatherflow_cloud/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
DOMAIN = "weatherflow_cloud"
LOGGER = logging.getLogger(__package__)

ATTR_ATTRIBUTION = "Weather data delivered by WeatherFlow/Tempest REST Api"
ATTR_ATTRIBUTION = "Weather data delivered by WeatherFlow/Tempest API"
MANUFACTURER = "WeatherFlow"

STATE_MAP = {
Expand All @@ -29,3 +29,6 @@
"thunderstorm": "lightning",
"windy": "windy",
}

WEBSOCKET_API = "Websocket API"
REST_API = "REST API"
173 changes: 158 additions & 15 deletions homeassistant/components/weatherflow_cloud/coordinator.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,189 @@
"""Data coordinator for WeatherFlow Cloud Data."""
"""Data coordinators."""

from abc import ABC, abstractmethod
from datetime import timedelta

from aiohttp import ClientResponseError
from weatherflow4py.api import WeatherFlowRestAPI
from weatherflow4py.models.rest.stations import StationsResponseREST
from weatherflow4py.models.rest.unified import WeatherFlowDataREST
from weatherflow4py.models.ws.obs import WebsocketObservation
from weatherflow4py.models.ws.types import EventType
from weatherflow4py.models.ws.websocket_request import (
ListenStartMessage,
RapidWindListenStartMessage,
)
from weatherflow4py.models.ws.websocket_response import (
EventDataRapidWind,
ObservationTempestWS,
RapidWindWS,
)
from weatherflow4py.ws import WeatherFlowWebsocketAPI

from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_API_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util.ssl import client_context

from .const import DOMAIN, LOGGER


class WeatherFlowCloudDataUpdateCoordinator(
DataUpdateCoordinator[dict[int, WeatherFlowDataREST]]
):
"""Class to manage fetching REST Based WeatherFlow Forecast data."""
class BaseWeatherFlowCoordinator[T](DataUpdateCoordinator[dict[int, T]], ABC):
"""Base class for WeatherFlow coordinators."""

config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: ConfigEntry,
rest_api: WeatherFlowRestAPI,
stations: StationsResponseREST,
update_interval: timedelta | None = None,
always_update: bool = False,
) -> None:
"""Initialize Coordinator."""
self._token = rest_api.api_token
self._rest_api = rest_api
self.stations = stations
self.device_to_station_map = stations.device_station_map

self.device_ids = list(stations.device_station_map.keys())

def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
"""Initialize global WeatherFlow forecast data updater."""
self.weather_api = WeatherFlowRestAPI(
api_token=config_entry.data[CONF_API_TOKEN]
)
super().__init__(
hass,
LOGGER,
config_entry=config_entry,
name=DOMAIN,
always_update=always_update,
update_interval=update_interval,
)

@abstractmethod
def get_station_name(self, station_id: int):
"""Define a default implementation - that should always be overridden."""


class WeatherFlowCloudUpdateCoordinatorREST(
BaseWeatherFlowCoordinator[WeatherFlowDataREST]
):
"""Class to manage fetching REST Based WeatherFlow Forecast data."""

def __init__(
self,
hass: HomeAssistant,
config_entry: ConfigEntry,
rest_api: WeatherFlowRestAPI,
stations: StationsResponseREST,
) -> None:
"""Initialize global WeatherFlow forecast data updater."""

super().__init__(
hass,
config_entry,
rest_api,
stations,
update_interval=timedelta(seconds=60),
always_update=True,
)

async def _async_update_data(self) -> dict[int, WeatherFlowDataREST]:
"""Fetch data from WeatherFlow Forecast."""
"""Update rest data."""
try:
async with self.weather_api:
return await self.weather_api.get_all_data()
async with self._rest_api:
return await self._rest_api.get_all_data()
except ClientResponseError as err:
if err.status == 401:
raise ConfigEntryAuthFailed(err) from err
raise UpdateFailed(f"Update failed: {err}") from err

def get_station(self, station_id: int) -> WeatherFlowDataREST:
"""Return station for id."""
return self.data[station_id]

def get_station_name(self, station_id: int) -> str:
"""Return station name for id."""
return self.data[station_id].station.name


class WeatherFlowCloudDataCallbackCoordinator[
T: EventDataRapidWind | WebsocketObservation,
M: RapidWindListenStartMessage | ListenStartMessage,
C: RapidWindWS | ObservationTempestWS,
](BaseWeatherFlowCoordinator[dict[int, T | None]]):
"""A Generic coordinator to handle Websocket connections.

This class takes 3 generics - T, M, and C.
T - The ED8E type of data that will be stored in the coordinator.
M - The type of message that will be sent to the websocket API.
C - The type of message that will be received from the websocket API.
"""

def __init__(
self,
hass: HomeAssistant,
config_entry: ConfigEntry,
rest_api: WeatherFlowRestAPI,
websocket_api: WeatherFlowWebsocketAPI,
stations: StationsResponseREST,
listen_request_type: type[M],
event_type: EventType,
) -> None:
"""Initialize Coordinator."""

super().__init__(
hass=hass, config_entry=config_entry, rest_api=rest_api, stations=stations
)

self._event_type = event_type
self.websocket_api = websocket_api
self._listen_request_type = listen_request_type

# configure the websocket data structure
self._ws_data: dict[int, dict[int, T | None]] = {
station: dict.fromkeys(devices)
for station, devices in self.stations.station_device_map.items()
}

async def _generic_callback(self, data: C) -> None:
"""Handle incoming websocket data - RapidWindWS data will be parsed from the ob field, whereas ObservationTempestWS will be parsed directly."""
device_id = data.device_id
station_id = self.device_to_station_map[device_id]

# Handle possible message types with isinstance
if isinstance(data, RapidWindWS):
processed_data = data.ob
elif isinstance(data, ObservationTempestWS):
processed_data = data
else:
LOGGER.warning("Unknown message type received: %s", type(data))
return

self._ws_data[station_id][device_id] = processed_data
self.async_set_updated_data(self._ws_data)
return

async def async_setup(self) -> None:
"""Set up the websocket connection."""
assert self.websocket_api is not None

await self.websocket_api.connect(client_context())
# Register callback
self.websocket_api.register_callback(
message_type=self._event_type,
callback=self._generic_callback,
)
# Subscribe to messages
for device_id in self.device_ids:
await self.websocket_api.send_message(
self._listen_request_type(device_id=str(device_id))
)

def get_station(self, station_id: int):
"""Return station for id."""
return self.stations.stations[station_id]

def get_station_name(self, station_id: int) -> str:
"""Return station name for id."""
if name := self.stations.station_map[station_id].name:
return name
return ""
Loading
Loading
0