440 lines
15 KiB
Swift
440 lines
15 KiB
Swift
//
|
|
// MAWebSocketClient.swift
|
|
// Mobile Music Assistant
|
|
//
|
|
// Created by Sven Hanold on 26.03.26.
|
|
//
|
|
|
|
import Foundation
|
|
import OSLog
|
|
|
|
private let logger = Logger(subsystem: "com.musicassistant.mobile", category: "WebSocket")
|
|
|
|
/// WebSocket client for Music Assistant server communication
|
|
@Observable
|
|
final class MAWebSocketClient {
|
|
enum ConnectionState: Equatable {
|
|
case disconnected
|
|
case connecting
|
|
case connected
|
|
case reconnecting(attempt: Int)
|
|
|
|
var description: String {
|
|
switch self {
|
|
case .disconnected: return "Disconnected"
|
|
case .connecting: return "Connecting..."
|
|
case .connected: return "Connected"
|
|
case .reconnecting(let attempt): return "Reconnecting (attempt \(attempt))..."
|
|
}
|
|
}
|
|
}
|
|
|
|
enum ClientError: LocalizedError {
|
|
case notConnected
|
|
case invalidURL
|
|
case timeout
|
|
case serverError(String)
|
|
case decodingError(Error)
|
|
|
|
var errorDescription: String? {
|
|
switch self {
|
|
case .notConnected:
|
|
return "Not connected to server"
|
|
case .invalidURL:
|
|
return "Invalid server URL"
|
|
case .timeout:
|
|
return "Request timeout"
|
|
case .serverError(let message):
|
|
return "Server error: \(message)"
|
|
case .decodingError(let error):
|
|
return "Failed to decode response: \(error.localizedDescription)"
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - Properties
|
|
|
|
private(set) var connectionState: ConnectionState = .disconnected
|
|
var isConnected: Bool { connectionState == .connected }
|
|
private var webSocketTask: URLSessionWebSocketTask?
|
|
private let session: URLSession
|
|
|
|
// Request-Response matching
|
|
private var pendingRequests: [String: CheckedContinuation<MAResponse, Error>] = [:]
|
|
private let requestQueue = DispatchQueue(label: "com.musicassistant.requests")
|
|
|
|
// Event stream
|
|
private var eventContinuation: AsyncStream<MAEvent>.Continuation?
|
|
private(set) var eventStream: AsyncStream<MAEvent>
|
|
|
|
// Reconnection
|
|
private var reconnectTask: Task<Void, Never>?
|
|
private var shouldReconnect = false
|
|
private let maxReconnectDelay: TimeInterval = 30.0
|
|
private let initialReconnectDelay: TimeInterval = 3.0
|
|
|
|
// Configuration
|
|
private var serverURL: URL?
|
|
private var authToken: String?
|
|
|
|
// MARK: - Initialization
|
|
|
|
init() {
|
|
let configuration = URLSessionConfiguration.default
|
|
configuration.timeoutIntervalForRequest = 30
|
|
configuration.timeoutIntervalForResource = 300
|
|
self.session = URLSession(configuration: configuration)
|
|
|
|
// Initialize event stream
|
|
var continuation: AsyncStream<MAEvent>.Continuation?
|
|
self.eventStream = AsyncStream { cont in
|
|
continuation = cont
|
|
}
|
|
self.eventContinuation = continuation
|
|
}
|
|
|
|
deinit {
|
|
disconnect()
|
|
}
|
|
|
|
// MARK: - Connection Management
|
|
|
|
/// Connect to Music Assistant server
|
|
func connect(serverURL: URL, authToken: String?) async throws {
|
|
guard connectionState == .disconnected else {
|
|
logger.info("Already connected or connecting")
|
|
return
|
|
}
|
|
|
|
self.serverURL = serverURL
|
|
self.authToken = authToken
|
|
self.shouldReconnect = true
|
|
|
|
try await performConnect()
|
|
}
|
|
|
|
private func performConnect() async throws {
|
|
guard let serverURL else {
|
|
throw ClientError.invalidURL
|
|
}
|
|
|
|
connectionState = .connecting
|
|
logger.info("Connecting to \(serverURL.absoluteString)")
|
|
|
|
// Build WebSocket URL (ws:// or wss://)
|
|
var components = URLComponents(url: serverURL, resolvingAgainstBaseURL: false)!
|
|
components.scheme = components.scheme == "https" ? "wss" : "ws"
|
|
components.path = "/ws"
|
|
|
|
guard let wsURL = components.url else {
|
|
throw ClientError.invalidURL
|
|
}
|
|
|
|
logger.debug("WebSocket URL: \(wsURL.absoluteString)")
|
|
|
|
let task = session.webSocketTask(with: URLRequest(url: wsURL))
|
|
self.webSocketTask = task
|
|
task.resume()
|
|
|
|
do {
|
|
// MA sends a server-info message immediately on connect; receive and discard it
|
|
_ = try await task.receive()
|
|
logger.debug("Received server info")
|
|
|
|
// Send auth command and wait for confirmation
|
|
if let authToken {
|
|
try await performAuth(task: task, token: authToken)
|
|
logger.info("Authenticated successfully")
|
|
}
|
|
|
|
// Now safe to start the regular message loop
|
|
startReceiving()
|
|
connectionState = .connected
|
|
logger.info("Connected successfully")
|
|
|
|
} catch {
|
|
task.cancel(with: .goingAway, reason: nil)
|
|
webSocketTask = nil
|
|
connectionState = .disconnected
|
|
throw error
|
|
}
|
|
}
|
|
|
|
private func performAuth(task: URLSessionWebSocketTask, token: String) async throws {
|
|
let messageId = UUID().uuidString
|
|
let cmd = MACommand(
|
|
messageId: messageId,
|
|
command: "auth",
|
|
args: ["token": AnyCodable(token)]
|
|
)
|
|
let data = try JSONEncoder().encode(cmd)
|
|
guard let json = String(data: data, encoding: .utf8) else {
|
|
throw ClientError.decodingError(NSError(domain: "Encoding", code: -1))
|
|
}
|
|
|
|
try await task.send(.string(json))
|
|
|
|
// Receive the auth response
|
|
let result = try await task.receive()
|
|
guard case .string(let responseText) = result,
|
|
let responseData = responseText.data(using: .utf8) else {
|
|
throw ClientError.serverError("Invalid auth response format")
|
|
}
|
|
|
|
if let response = try? JSONDecoder().decode(MAResponse.self, from: responseData),
|
|
let errorCode = response.errorCode {
|
|
throw ClientError.serverError(response.details ?? "Authentication failed (code \(errorCode))")
|
|
}
|
|
// Non-error response means auth succeeded
|
|
}
|
|
|
|
/// Disconnect from server
|
|
func disconnect() {
|
|
logger.info("Disconnecting")
|
|
shouldReconnect = false
|
|
reconnectTask?.cancel()
|
|
reconnectTask = nil
|
|
|
|
// Nil out BEFORE cancel so any in-flight receive callbacks see nil and exit early
|
|
let task = webSocketTask
|
|
webSocketTask = nil
|
|
connectionState = .disconnected
|
|
task?.cancel(with: .goingAway, reason: nil)
|
|
|
|
// Cancel all pending requests
|
|
requestQueue.sync {
|
|
for (_, continuation) in pendingRequests {
|
|
continuation.resume(throwing: ClientError.notConnected)
|
|
}
|
|
pendingRequests.removeAll()
|
|
}
|
|
|
|
eventContinuation?.finish()
|
|
}
|
|
|
|
// MARK: - Message Receiving
|
|
|
|
private func startReceiving() {
|
|
guard let task = webSocketTask else { return }
|
|
|
|
task.receive { [weak self] result in
|
|
guard let self else { return }
|
|
|
|
switch result {
|
|
case .success(let message):
|
|
self.handleMessage(message)
|
|
// Only continue if we are still connected to this same task
|
|
if self.webSocketTask === task {
|
|
self.startReceiving()
|
|
}
|
|
|
|
case .failure(let error):
|
|
// URLError.cancelled is expected during a clean disconnect — not a real error
|
|
let nsError = error as NSError
|
|
guard nsError.code != URLError.cancelled.rawValue else { return }
|
|
logger.error("WebSocket receive error: \(error.localizedDescription)")
|
|
self.handleDisconnection()
|
|
}
|
|
}
|
|
}
|
|
|
|
private func handleMessage(_ message: URLSessionWebSocketTask.Message) {
|
|
guard case .string(let text) = message else {
|
|
logger.warning("Received non-text message")
|
|
return
|
|
}
|
|
|
|
guard let data = text.data(using: .utf8) else {
|
|
logger.error("Failed to convert message to data")
|
|
return
|
|
}
|
|
|
|
// Try to decode as response (has message_id)
|
|
if let response = try? JSONDecoder().decode(MAResponse.self, from: data),
|
|
let messageId = response.messageId {
|
|
handleResponse(messageId: messageId, response: response)
|
|
return
|
|
}
|
|
|
|
// Try to decode as event
|
|
if let event = try? JSONDecoder().decode(MAEvent.self, from: data) {
|
|
handleEvent(event)
|
|
return
|
|
}
|
|
|
|
logger.warning("Received unknown message format: \(text)")
|
|
}
|
|
|
|
private func handleResponse(messageId: String, response: MAResponse) {
|
|
requestQueue.sync {
|
|
guard let continuation = pendingRequests.removeValue(forKey: messageId) else {
|
|
logger.warning("Received response for unknown message ID: \(messageId)")
|
|
return
|
|
}
|
|
|
|
// Check for error
|
|
if let errorCode = response.errorCode {
|
|
let errorMsg = response.details ?? "Error code: \(errorCode)"
|
|
logger.error("Server error \(errorCode) for message \(messageId): \(errorMsg)")
|
|
continuation.resume(throwing: ClientError.serverError(errorMsg))
|
|
} else {
|
|
continuation.resume(returning: response)
|
|
}
|
|
}
|
|
}
|
|
|
|
private func handleEvent(_ event: MAEvent) {
|
|
logger.debug("Received event: \(event.event)")
|
|
eventContinuation?.yield(event)
|
|
}
|
|
|
|
private func handleDisconnection() {
|
|
// Idempotency guard — can be called from receive callback and disconnect() simultaneously
|
|
guard connectionState != .disconnected else { return }
|
|
|
|
connectionState = .disconnected
|
|
let task = webSocketTask
|
|
webSocketTask = nil
|
|
task?.cancel(with: .goingAway, reason: nil)
|
|
|
|
// Cancel pending requests
|
|
requestQueue.sync {
|
|
for (_, continuation) in pendingRequests {
|
|
continuation.resume(throwing: ClientError.notConnected)
|
|
}
|
|
pendingRequests.removeAll()
|
|
}
|
|
|
|
// Attempt reconnection if needed
|
|
if shouldReconnect {
|
|
scheduleReconnect(attempt: 1)
|
|
}
|
|
}
|
|
|
|
// MARK: - Reconnection
|
|
|
|
private func scheduleReconnect(attempt: Int) {
|
|
connectionState = .reconnecting(attempt: attempt)
|
|
|
|
// Exponential backoff: 3s, 10s, 30s, 30s, ...
|
|
let delay = min(
|
|
initialReconnectDelay * pow(2.0, Double(attempt - 1)),
|
|
maxReconnectDelay
|
|
)
|
|
|
|
logger.info("Scheduling reconnect attempt \(attempt) in \(delay)s")
|
|
|
|
reconnectTask = Task {
|
|
try? await Task.sleep(for: .seconds(delay))
|
|
|
|
guard !Task.isCancelled, shouldReconnect else { return }
|
|
|
|
do {
|
|
try await performConnect()
|
|
} catch {
|
|
logger.error("Reconnect attempt \(attempt) failed: \(error.localizedDescription)")
|
|
scheduleReconnect(attempt: attempt + 1)
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - Sending Commands
|
|
|
|
/// Send a command and wait for response
|
|
func sendCommand(
|
|
_ command: String,
|
|
args: [String: Any]? = nil
|
|
) async throws -> MAResponse {
|
|
guard webSocketTask != nil, connectionState == .connected else {
|
|
throw ClientError.notConnected
|
|
}
|
|
|
|
let messageId = UUID().uuidString
|
|
|
|
// Convert args to AnyCodable
|
|
let encodableArgs = args?.mapValues { AnyCodable($0) }
|
|
|
|
let cmd = MACommand(
|
|
messageId: messageId,
|
|
command: command,
|
|
args: encodableArgs
|
|
)
|
|
|
|
let data = try JSONEncoder().encode(cmd)
|
|
guard let json = String(data: data, encoding: .utf8) else {
|
|
throw ClientError.decodingError(NSError(domain: "Encoding", code: -1))
|
|
}
|
|
|
|
logger.debug("Sending command: \(command) (ID: \(messageId)) payload: \(json)")
|
|
|
|
// Send message and wait for response
|
|
return try await withCheckedThrowingContinuation { continuation in
|
|
requestQueue.sync {
|
|
pendingRequests[messageId] = continuation
|
|
}
|
|
|
|
webSocketTask?.send(.string(json)) { [weak self] error in
|
|
if let error {
|
|
self?.requestQueue.sync {
|
|
_ = self?.pendingRequests.removeValue(forKey: messageId)
|
|
}
|
|
continuation.resume(throwing: error)
|
|
}
|
|
}
|
|
|
|
// Timeout after 30 seconds
|
|
Task {
|
|
try? await Task.sleep(for: .seconds(30))
|
|
self.requestQueue.sync {
|
|
if let cont = self.pendingRequests.removeValue(forKey: messageId) {
|
|
cont.resume(throwing: ClientError.timeout)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Convenience method to send command and decode result
|
|
func sendCommand<T: Decodable>(
|
|
_ command: String,
|
|
args: [String: Any]? = nil,
|
|
resultType: T.Type
|
|
) async throws -> T {
|
|
let response = try await sendCommand(command, args: args)
|
|
|
|
guard let result = response.result else {
|
|
throw ClientError.serverError("No result in response")
|
|
}
|
|
|
|
do {
|
|
// Debug: Log the raw result before decoding
|
|
if let jsonData = try? JSONEncoder().encode(result),
|
|
let jsonString = String(data: jsonData, encoding: .utf8) {
|
|
logger.debug("📦 Response result for '\(command)': \(jsonString)")
|
|
}
|
|
|
|
return try result.decode(as: T.self)
|
|
} catch {
|
|
logger.error("❌ Failed to decode result for '\(command)': \(error.localizedDescription)")
|
|
|
|
// Log more details about the decoding error
|
|
if let decodingError = error as? DecodingError {
|
|
switch decodingError {
|
|
case .dataCorrupted(let context):
|
|
logger.error("Data corrupted: \(context.debugDescription)")
|
|
case .keyNotFound(let key, let context):
|
|
logger.error("Key '\(key.stringValue)' not found: \(context.debugDescription)")
|
|
case .typeMismatch(let type, let context):
|
|
logger.error("Type mismatch for \(type): \(context.debugDescription)")
|
|
case .valueNotFound(let type, let context):
|
|
logger.error("Value not found for \(type): \(context.debugDescription)")
|
|
@unknown default:
|
|
logger.error("Unknown decoding error")
|
|
}
|
|
}
|
|
|
|
throw ClientError.decodingError(error)
|
|
}
|
|
}
|
|
}
|