// // MAWebSocketClient.swift // Mobile Music Assistant // // Created by Sven Hanold on 26.03.26. // import Foundation import OSLog private let logger = Logger(subsystem: "com.musicassistant.mobile", category: "WebSocket") /// WebSocket client for Music Assistant server communication @Observable final class MAWebSocketClient { enum ConnectionState: Equatable { case disconnected case connecting case connected case reconnecting(attempt: Int) var description: String { switch self { case .disconnected: return "Disconnected" case .connecting: return "Connecting..." case .connected: return "Connected" case .reconnecting(let attempt): return "Reconnecting (attempt \(attempt))..." } } } enum ClientError: LocalizedError { case notConnected case invalidURL case timeout case serverError(String) case decodingError(Error) var errorDescription: String? { switch self { case .notConnected: return "Not connected to server" case .invalidURL: return "Invalid server URL" case .timeout: return "Request timeout" case .serverError(let message): return "Server error: \(message)" case .decodingError(let error): return "Failed to decode response: \(error.localizedDescription)" } } } // MARK: - Properties private(set) var connectionState: ConnectionState = .disconnected private var webSocketTask: URLSessionWebSocketTask? private let session: URLSession // Request-Response matching private var pendingRequests: [String: CheckedContinuation] = [:] private let requestQueue = DispatchQueue(label: "com.musicassistant.requests") // Event stream private var eventContinuation: AsyncStream.Continuation? private(set) var eventStream: AsyncStream // Reconnection private var reconnectTask: Task? private var shouldReconnect = false private let maxReconnectDelay: TimeInterval = 30.0 private let initialReconnectDelay: TimeInterval = 3.0 // Configuration private var serverURL: URL? private var authToken: String? // MARK: - Initialization init() { let configuration = URLSessionConfiguration.default configuration.timeoutIntervalForRequest = 30 configuration.timeoutIntervalForResource = 300 self.session = URLSession(configuration: configuration) // Initialize event stream var continuation: AsyncStream.Continuation? self.eventStream = AsyncStream { cont in continuation = cont } self.eventContinuation = continuation } deinit { disconnect() } // MARK: - Connection Management /// Connect to Music Assistant server func connect(serverURL: URL, authToken: String?) async throws { print("🔵 MAWebSocketClient.connect: Checking state") guard connectionState == .disconnected else { logger.info("Already connected or connecting") print("⚠️ MAWebSocketClient.connect: Already connected/connecting, state = \(connectionState)") return } print("🔵 MAWebSocketClient.connect: Starting connection") print("🔵 MAWebSocketClient.connect: Server URL = \(serverURL.absoluteString)") print("🔵 MAWebSocketClient.connect: Has auth token = \(authToken != nil)") self.serverURL = serverURL self.authToken = authToken self.shouldReconnect = true try await performConnect() } private func performConnect() async throws { guard let serverURL else { print("❌ MAWebSocketClient.performConnect: No server URL") throw ClientError.invalidURL } connectionState = .connecting logger.info("Connecting to \(serverURL.absoluteString)") print("🔵 MAWebSocketClient.performConnect: Building WebSocket URL") // Build WebSocket URL (ws:// or wss://) var components = URLComponents(url: serverURL, resolvingAgainstBaseURL: false)! let originalScheme = components.scheme components.scheme = components.scheme == "https" ? "wss" : "ws" components.path = "/ws" guard let wsURL = components.url else { print("❌ MAWebSocketClient.performConnect: Failed to build WebSocket URL") throw ClientError.invalidURL } print("🔵 MAWebSocketClient.performConnect: Original scheme = \(originalScheme ?? "nil")") print("🔵 MAWebSocketClient.performConnect: WebSocket URL = \(wsURL.absoluteString)") var request = URLRequest(url: wsURL) // Add auth token if available if let authToken { request.setValue("Bearer \(authToken)", forHTTPHeaderField: "Authorization") print("✅ MAWebSocketClient.performConnect: Authorization header added") } else { print("⚠️ MAWebSocketClient.performConnect: No auth token provided") } let task = session.webSocketTask(with: request) self.webSocketTask = task print("🔵 MAWebSocketClient.performConnect: Starting WebSocket task") task.resume() // Start listening for messages startReceiving() connectionState = .connected logger.info("Connected successfully") print("✅ MAWebSocketClient.performConnect: Connection successful") } /// Disconnect from server func disconnect() { logger.info("Disconnecting") shouldReconnect = false reconnectTask?.cancel() reconnectTask = nil webSocketTask?.cancel(with: .goingAway, reason: nil) webSocketTask = nil // Cancel all pending requests requestQueue.sync { for (messageId, continuation) in pendingRequests { continuation.resume(throwing: ClientError.notConnected) } pendingRequests.removeAll() } connectionState = .disconnected eventContinuation?.finish() } // MARK: - Message Receiving private func startReceiving() { guard let task = webSocketTask else { return } task.receive { [weak self] result in guard let self else { return } switch result { case .success(let message): self.handleMessage(message) // Continue listening self.startReceiving() case .failure(let error): logger.error("WebSocket receive error: \(error.localizedDescription)") self.handleDisconnection() } } } private func handleMessage(_ message: URLSessionWebSocketTask.Message) { guard case .string(let text) = message else { logger.warning("Received non-text message") return } guard let data = text.data(using: .utf8) else { logger.error("Failed to convert message to data") return } // Try to decode as response (has message_id) if let response = try? JSONDecoder().decode(MAResponse.self, from: data), let messageId = response.messageId { handleResponse(messageId: messageId, response: response) return } // Try to decode as event if let event = try? JSONDecoder().decode(MAEvent.self, from: data) { handleEvent(event) return } logger.warning("Received unknown message format: \(text)") } private func handleResponse(messageId: String, response: MAResponse) { requestQueue.sync { guard let continuation = pendingRequests.removeValue(forKey: messageId) else { logger.warning("Received response for unknown message ID: \(messageId)") return } // Check for error if let errorCode = response.errorCode { let errorMsg = response.errorMessage ?? errorCode continuation.resume(throwing: ClientError.serverError(errorMsg)) } else { continuation.resume(returning: response) } } } private func handleEvent(_ event: MAEvent) { logger.debug("Received event: \(event.event)") eventContinuation?.yield(event) } private func handleDisconnection() { connectionState = .disconnected webSocketTask = nil // Cancel pending requests requestQueue.sync { for (_, continuation) in pendingRequests { continuation.resume(throwing: ClientError.notConnected) } pendingRequests.removeAll() } // Attempt reconnection if needed if shouldReconnect { scheduleReconnect(attempt: 1) } } // MARK: - Reconnection private func scheduleReconnect(attempt: Int) { connectionState = .reconnecting(attempt: attempt) // Exponential backoff: 3s, 10s, 30s, 30s, ... let delay = min( initialReconnectDelay * pow(2.0, Double(attempt - 1)), maxReconnectDelay ) logger.info("Scheduling reconnect attempt \(attempt) in \(delay)s") reconnectTask = Task { try? await Task.sleep(for: .seconds(delay)) guard !Task.isCancelled, shouldReconnect else { return } do { try await performConnect() } catch { logger.error("Reconnect attempt \(attempt) failed: \(error.localizedDescription)") scheduleReconnect(attempt: attempt + 1) } } } // MARK: - Sending Commands /// Send a command and wait for response func sendCommand( _ command: String, args: [String: Any]? = nil ) async throws -> MAResponse { guard webSocketTask != nil, connectionState == .connected else { throw ClientError.notConnected } let messageId = UUID().uuidString // Convert args to AnyCodable let encodableArgs = args?.mapValues { AnyCodable($0) } let cmd = MACommand( messageId: messageId, command: command, args: encodableArgs ) let data = try JSONEncoder().encode(cmd) guard let json = String(data: data, encoding: .utf8) else { throw ClientError.decodingError(NSError(domain: "Encoding", code: -1)) } logger.debug("Sending command: \(command) (ID: \(messageId))") // Send message and wait for response return try await withCheckedThrowingContinuation { continuation in requestQueue.sync { pendingRequests[messageId] = continuation } webSocketTask?.send(.string(json)) { [weak self] error in if let error { self?.requestQueue.sync { _ = self?.pendingRequests.removeValue(forKey: messageId) } continuation.resume(throwing: error) } } // Timeout after 30 seconds Task { try? await Task.sleep(for: .seconds(30)) self.requestQueue.sync { if let cont = self.pendingRequests.removeValue(forKey: messageId) { cont.resume(throwing: ClientError.timeout) } } } } } /// Convenience method to send command and decode result func sendCommand( _ command: String, args: [String: Any]? = nil, resultType: T.Type ) async throws -> T { let response = try await sendCommand(command, args: args) guard let result = response.result else { throw ClientError.serverError("No result in response") } do { return try result.decode(as: T.self) } catch { throw ClientError.decodingError(error) } } }