// // MAWebSocketClient.swift // Mobile Music Assistant // // Created by Sven Hanold on 26.03.26. // import Foundation import OSLog private let logger = Logger(subsystem: "com.musicassistant.mobile", category: "WebSocket") /// WebSocket client for Music Assistant server communication @Observable final class MAWebSocketClient { enum ConnectionState: Equatable { case disconnected case connecting case connected case reconnecting(attempt: Int) var description: String { switch self { case .disconnected: return "Disconnected" case .connecting: return "Connecting..." case .connected: return "Connected" case .reconnecting(let attempt): return "Reconnecting (attempt \(attempt))..." } } } enum ClientError: LocalizedError { case notConnected case invalidURL case timeout case serverError(String) case decodingError(Error) var errorDescription: String? { switch self { case .notConnected: return "Not connected to server" case .invalidURL: return "Invalid server URL" case .timeout: return "Request timeout" case .serverError(let message): return "Server error: \(message)" case .decodingError(let error): return "Failed to decode response: \(error.localizedDescription)" } } } // MARK: - Properties private(set) var connectionState: ConnectionState = .disconnected private var webSocketTask: URLSessionWebSocketTask? private let session: URLSession // Request-Response matching private var pendingRequests: [String: CheckedContinuation] = [:] private let requestQueue = DispatchQueue(label: "com.musicassistant.requests") // Event stream private var eventContinuation: AsyncStream.Continuation? private(set) var eventStream: AsyncStream // Reconnection private var reconnectTask: Task? private var shouldReconnect = false private let maxReconnectDelay: TimeInterval = 30.0 private let initialReconnectDelay: TimeInterval = 3.0 // Configuration private var serverURL: URL? private var authToken: String? // MARK: - Initialization init() { let configuration = URLSessionConfiguration.default configuration.timeoutIntervalForRequest = 30 configuration.timeoutIntervalForResource = 300 self.session = URLSession(configuration: configuration) // Initialize event stream var continuation: AsyncStream.Continuation? self.eventStream = AsyncStream { cont in continuation = cont } self.eventContinuation = continuation } deinit { disconnect() } // MARK: - Connection Management /// Connect to Music Assistant server func connect(serverURL: URL, authToken: String?) async throws { guard connectionState == .disconnected else { logger.info("Already connected or connecting") return } self.serverURL = serverURL self.authToken = authToken self.shouldReconnect = true try await performConnect() } private func performConnect() async throws { guard let serverURL else { throw ClientError.invalidURL } connectionState = .connecting logger.info("Connecting to \(serverURL.absoluteString)") // Build WebSocket URL (ws:// or wss://) var components = URLComponents(url: serverURL, resolvingAgainstBaseURL: false)! components.scheme = components.scheme == "https" ? "wss" : "ws" components.path = "/ws" guard let wsURL = components.url else { throw ClientError.invalidURL } logger.debug("WebSocket URL: \(wsURL.absoluteString)") let task = session.webSocketTask(with: URLRequest(url: wsURL)) self.webSocketTask = task task.resume() do { // MA sends a server-info message immediately on connect; receive and discard it _ = try await task.receive() logger.debug("Received server info") // Send auth command and wait for confirmation if let authToken { try await performAuth(task: task, token: authToken) logger.info("Authenticated successfully") } // Now safe to start the regular message loop startReceiving() connectionState = .connected logger.info("Connected successfully") } catch { task.cancel(with: .goingAway, reason: nil) webSocketTask = nil connectionState = .disconnected throw error } } private func performAuth(task: URLSessionWebSocketTask, token: String) async throws { let messageId = UUID().uuidString let cmd = MACommand( messageId: messageId, command: "auth", args: ["token": AnyCodable(token)] ) let data = try JSONEncoder().encode(cmd) guard let json = String(data: data, encoding: .utf8) else { throw ClientError.decodingError(NSError(domain: "Encoding", code: -1)) } try await task.send(.string(json)) // Receive the auth response let result = try await task.receive() guard case .string(let responseText) = result, let responseData = responseText.data(using: .utf8) else { throw ClientError.serverError("Invalid auth response format") } if let response = try? JSONDecoder().decode(MAResponse.self, from: responseData), let errorCode = response.errorCode { throw ClientError.serverError(response.details ?? "Authentication failed (code \(errorCode))") } // Non-error response means auth succeeded } /// Disconnect from server func disconnect() { logger.info("Disconnecting") shouldReconnect = false reconnectTask?.cancel() reconnectTask = nil // Nil out BEFORE cancel so any in-flight receive callbacks see nil and exit early let task = webSocketTask webSocketTask = nil connectionState = .disconnected task?.cancel(with: .goingAway, reason: nil) // Cancel all pending requests requestQueue.sync { for (_, continuation) in pendingRequests { continuation.resume(throwing: ClientError.notConnected) } pendingRequests.removeAll() } eventContinuation?.finish() } // MARK: - Message Receiving private func startReceiving() { guard let task = webSocketTask else { return } task.receive { [weak self] result in guard let self else { return } switch result { case .success(let message): self.handleMessage(message) // Only continue if we are still connected to this same task if self.webSocketTask === task { self.startReceiving() } case .failure(let error): // URLError.cancelled is expected during a clean disconnect — not a real error let nsError = error as NSError guard nsError.code != URLError.cancelled.rawValue else { return } logger.error("WebSocket receive error: \(error.localizedDescription)") self.handleDisconnection() } } } private func handleMessage(_ message: URLSessionWebSocketTask.Message) { guard case .string(let text) = message else { logger.warning("Received non-text message") return } guard let data = text.data(using: .utf8) else { logger.error("Failed to convert message to data") return } // Try to decode as response (has message_id) if let response = try? JSONDecoder().decode(MAResponse.self, from: data), let messageId = response.messageId { handleResponse(messageId: messageId, response: response) return } // Try to decode as event if let event = try? JSONDecoder().decode(MAEvent.self, from: data) { handleEvent(event) return } logger.warning("Received unknown message format: \(text)") } private func handleResponse(messageId: String, response: MAResponse) { requestQueue.sync { guard let continuation = pendingRequests.removeValue(forKey: messageId) else { logger.warning("Received response for unknown message ID: \(messageId)") return } // Check for error if let errorCode = response.errorCode { let errorMsg = response.details ?? "Error code: \(errorCode)" logger.error("Server error \(errorCode) for message \(messageId): \(errorMsg)") continuation.resume(throwing: ClientError.serverError(errorMsg)) } else { continuation.resume(returning: response) } } } private func handleEvent(_ event: MAEvent) { logger.debug("Received event: \(event.event)") eventContinuation?.yield(event) } private func handleDisconnection() { // Idempotency guard — can be called from receive callback and disconnect() simultaneously guard connectionState != .disconnected else { return } connectionState = .disconnected let task = webSocketTask webSocketTask = nil task?.cancel(with: .goingAway, reason: nil) // Cancel pending requests requestQueue.sync { for (_, continuation) in pendingRequests { continuation.resume(throwing: ClientError.notConnected) } pendingRequests.removeAll() } // Attempt reconnection if needed if shouldReconnect { scheduleReconnect(attempt: 1) } } // MARK: - Reconnection private func scheduleReconnect(attempt: Int) { connectionState = .reconnecting(attempt: attempt) // Exponential backoff: 3s, 10s, 30s, 30s, ... let delay = min( initialReconnectDelay * pow(2.0, Double(attempt - 1)), maxReconnectDelay ) logger.info("Scheduling reconnect attempt \(attempt) in \(delay)s") reconnectTask = Task { try? await Task.sleep(for: .seconds(delay)) guard !Task.isCancelled, shouldReconnect else { return } do { try await performConnect() } catch { logger.error("Reconnect attempt \(attempt) failed: \(error.localizedDescription)") scheduleReconnect(attempt: attempt + 1) } } } // MARK: - Sending Commands /// Send a command and wait for response func sendCommand( _ command: String, args: [String: Any]? = nil ) async throws -> MAResponse { guard webSocketTask != nil, connectionState == .connected else { throw ClientError.notConnected } let messageId = UUID().uuidString // Convert args to AnyCodable let encodableArgs = args?.mapValues { AnyCodable($0) } let cmd = MACommand( messageId: messageId, command: command, args: encodableArgs ) let data = try JSONEncoder().encode(cmd) guard let json = String(data: data, encoding: .utf8) else { throw ClientError.decodingError(NSError(domain: "Encoding", code: -1)) } logger.debug("Sending command: \(command) (ID: \(messageId)) payload: \(json)") // Send message and wait for response return try await withCheckedThrowingContinuation { continuation in requestQueue.sync { pendingRequests[messageId] = continuation } webSocketTask?.send(.string(json)) { [weak self] error in if let error { self?.requestQueue.sync { _ = self?.pendingRequests.removeValue(forKey: messageId) } continuation.resume(throwing: error) } } // Timeout after 30 seconds Task { try? await Task.sleep(for: .seconds(30)) self.requestQueue.sync { if let cont = self.pendingRequests.removeValue(forKey: messageId) { cont.resume(throwing: ClientError.timeout) } } } } } /// Convenience method to send command and decode result func sendCommand( _ command: String, args: [String: Any]? = nil, resultType: T.Type ) async throws -> T { let response = try await sendCommand(command, args: args) guard let result = response.result else { throw ClientError.serverError("No result in response") } do { // Debug: Log the raw result before decoding if let jsonData = try? JSONEncoder().encode(result), let jsonString = String(data: jsonData, encoding: .utf8) { logger.debug("📦 Response result for '\(command)': \(jsonString)") } return try result.decode(as: T.self) } catch { logger.error("❌ Failed to decode result for '\(command)': \(error.localizedDescription)") // Log more details about the decoding error if let decodingError = error as? DecodingError { switch decodingError { case .dataCorrupted(let context): logger.error("Data corrupted: \(context.debugDescription)") case .keyNotFound(let key, let context): logger.error("Key '\(key.stringValue)' not found: \(context.debugDescription)") case .typeMismatch(let type, let context): logger.error("Type mismatch for \(type): \(context.debugDescription)") case .valueNotFound(let type, let context): logger.error("Value not found for \(type): \(context.debugDescription)") @unknown default: logger.error("Unknown decoding error") } } throw ClientError.decodingError(error) } } }