381 lines
12 KiB
Swift
381 lines
12 KiB
Swift
//
|
|
// MAWebSocketClient.swift
|
|
// Mobile Music Assistant
|
|
//
|
|
// Created by Sven Hanold on 26.03.26.
|
|
//
|
|
|
|
import Foundation
|
|
import OSLog
|
|
|
|
private let logger = Logger(subsystem: "com.musicassistant.mobile", category: "WebSocket")
|
|
|
|
/// WebSocket client for Music Assistant server communication
|
|
@Observable
|
|
final class MAWebSocketClient {
|
|
enum ConnectionState: Equatable {
|
|
case disconnected
|
|
case connecting
|
|
case connected
|
|
case reconnecting(attempt: Int)
|
|
|
|
var description: String {
|
|
switch self {
|
|
case .disconnected: return "Disconnected"
|
|
case .connecting: return "Connecting..."
|
|
case .connected: return "Connected"
|
|
case .reconnecting(let attempt): return "Reconnecting (attempt \(attempt))..."
|
|
}
|
|
}
|
|
}
|
|
|
|
enum ClientError: LocalizedError {
|
|
case notConnected
|
|
case invalidURL
|
|
case timeout
|
|
case serverError(String)
|
|
case decodingError(Error)
|
|
|
|
var errorDescription: String? {
|
|
switch self {
|
|
case .notConnected:
|
|
return "Not connected to server"
|
|
case .invalidURL:
|
|
return "Invalid server URL"
|
|
case .timeout:
|
|
return "Request timeout"
|
|
case .serverError(let message):
|
|
return "Server error: \(message)"
|
|
case .decodingError(let error):
|
|
return "Failed to decode response: \(error.localizedDescription)"
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - Properties
|
|
|
|
private(set) var connectionState: ConnectionState = .disconnected
|
|
private var webSocketTask: URLSessionWebSocketTask?
|
|
private let session: URLSession
|
|
|
|
// Request-Response matching
|
|
private var pendingRequests: [String: CheckedContinuation<MAResponse, Error>] = [:]
|
|
private let requestQueue = DispatchQueue(label: "com.musicassistant.requests")
|
|
|
|
// Event stream
|
|
private var eventContinuation: AsyncStream<MAEvent>.Continuation?
|
|
private(set) var eventStream: AsyncStream<MAEvent>
|
|
|
|
// Reconnection
|
|
private var reconnectTask: Task<Void, Never>?
|
|
private var shouldReconnect = false
|
|
private let maxReconnectDelay: TimeInterval = 30.0
|
|
private let initialReconnectDelay: TimeInterval = 3.0
|
|
|
|
// Configuration
|
|
private var serverURL: URL?
|
|
private var authToken: String?
|
|
|
|
// MARK: - Initialization
|
|
|
|
init() {
|
|
let configuration = URLSessionConfiguration.default
|
|
configuration.timeoutIntervalForRequest = 30
|
|
configuration.timeoutIntervalForResource = 300
|
|
self.session = URLSession(configuration: configuration)
|
|
|
|
// Initialize event stream
|
|
var continuation: AsyncStream<MAEvent>.Continuation?
|
|
self.eventStream = AsyncStream { cont in
|
|
continuation = cont
|
|
}
|
|
self.eventContinuation = continuation
|
|
}
|
|
|
|
deinit {
|
|
disconnect()
|
|
}
|
|
|
|
// MARK: - Connection Management
|
|
|
|
/// Connect to Music Assistant server
|
|
func connect(serverURL: URL, authToken: String?) async throws {
|
|
print("🔵 MAWebSocketClient.connect: Checking state")
|
|
guard connectionState == .disconnected else {
|
|
logger.info("Already connected or connecting")
|
|
print("⚠️ MAWebSocketClient.connect: Already connected/connecting, state = \(connectionState)")
|
|
return
|
|
}
|
|
|
|
print("🔵 MAWebSocketClient.connect: Starting connection")
|
|
print("🔵 MAWebSocketClient.connect: Server URL = \(serverURL.absoluteString)")
|
|
print("🔵 MAWebSocketClient.connect: Has auth token = \(authToken != nil)")
|
|
|
|
self.serverURL = serverURL
|
|
self.authToken = authToken
|
|
self.shouldReconnect = true
|
|
|
|
try await performConnect()
|
|
}
|
|
|
|
private func performConnect() async throws {
|
|
guard let serverURL else {
|
|
print("❌ MAWebSocketClient.performConnect: No server URL")
|
|
throw ClientError.invalidURL
|
|
}
|
|
|
|
connectionState = .connecting
|
|
logger.info("Connecting to \(serverURL.absoluteString)")
|
|
print("🔵 MAWebSocketClient.performConnect: Building WebSocket URL")
|
|
|
|
// Build WebSocket URL (ws:// or wss://)
|
|
var components = URLComponents(url: serverURL, resolvingAgainstBaseURL: false)!
|
|
let originalScheme = components.scheme
|
|
components.scheme = components.scheme == "https" ? "wss" : "ws"
|
|
components.path = "/ws"
|
|
|
|
guard let wsURL = components.url else {
|
|
print("❌ MAWebSocketClient.performConnect: Failed to build WebSocket URL")
|
|
throw ClientError.invalidURL
|
|
}
|
|
|
|
print("🔵 MAWebSocketClient.performConnect: Original scheme = \(originalScheme ?? "nil")")
|
|
print("🔵 MAWebSocketClient.performConnect: WebSocket URL = \(wsURL.absoluteString)")
|
|
|
|
var request = URLRequest(url: wsURL)
|
|
|
|
// Add auth token if available
|
|
if let authToken {
|
|
request.setValue("Bearer \(authToken)", forHTTPHeaderField: "Authorization")
|
|
print("✅ MAWebSocketClient.performConnect: Authorization header added")
|
|
} else {
|
|
print("⚠️ MAWebSocketClient.performConnect: No auth token provided")
|
|
}
|
|
|
|
let task = session.webSocketTask(with: request)
|
|
self.webSocketTask = task
|
|
|
|
print("🔵 MAWebSocketClient.performConnect: Starting WebSocket task")
|
|
task.resume()
|
|
|
|
// Start listening for messages
|
|
startReceiving()
|
|
|
|
connectionState = .connected
|
|
logger.info("Connected successfully")
|
|
print("✅ MAWebSocketClient.performConnect: Connection successful")
|
|
}
|
|
|
|
/// Disconnect from server
|
|
func disconnect() {
|
|
logger.info("Disconnecting")
|
|
shouldReconnect = false
|
|
reconnectTask?.cancel()
|
|
reconnectTask = nil
|
|
|
|
webSocketTask?.cancel(with: .goingAway, reason: nil)
|
|
webSocketTask = nil
|
|
|
|
// Cancel all pending requests
|
|
requestQueue.sync {
|
|
for (messageId, continuation) in pendingRequests {
|
|
continuation.resume(throwing: ClientError.notConnected)
|
|
}
|
|
pendingRequests.removeAll()
|
|
}
|
|
|
|
connectionState = .disconnected
|
|
eventContinuation?.finish()
|
|
}
|
|
|
|
// MARK: - Message Receiving
|
|
|
|
private func startReceiving() {
|
|
guard let task = webSocketTask else { return }
|
|
|
|
task.receive { [weak self] result in
|
|
guard let self else { return }
|
|
|
|
switch result {
|
|
case .success(let message):
|
|
self.handleMessage(message)
|
|
// Continue listening
|
|
self.startReceiving()
|
|
|
|
case .failure(let error):
|
|
logger.error("WebSocket receive error: \(error.localizedDescription)")
|
|
self.handleDisconnection()
|
|
}
|
|
}
|
|
}
|
|
|
|
private func handleMessage(_ message: URLSessionWebSocketTask.Message) {
|
|
guard case .string(let text) = message else {
|
|
logger.warning("Received non-text message")
|
|
return
|
|
}
|
|
|
|
guard let data = text.data(using: .utf8) else {
|
|
logger.error("Failed to convert message to data")
|
|
return
|
|
}
|
|
|
|
// Try to decode as response (has message_id)
|
|
if let response = try? JSONDecoder().decode(MAResponse.self, from: data),
|
|
let messageId = response.messageId {
|
|
handleResponse(messageId: messageId, response: response)
|
|
return
|
|
}
|
|
|
|
// Try to decode as event
|
|
if let event = try? JSONDecoder().decode(MAEvent.self, from: data) {
|
|
handleEvent(event)
|
|
return
|
|
}
|
|
|
|
logger.warning("Received unknown message format: \(text)")
|
|
}
|
|
|
|
private func handleResponse(messageId: String, response: MAResponse) {
|
|
requestQueue.sync {
|
|
guard let continuation = pendingRequests.removeValue(forKey: messageId) else {
|
|
logger.warning("Received response for unknown message ID: \(messageId)")
|
|
return
|
|
}
|
|
|
|
// Check for error
|
|
if let errorCode = response.errorCode {
|
|
let errorMsg = response.errorMessage ?? errorCode
|
|
continuation.resume(throwing: ClientError.serverError(errorMsg))
|
|
} else {
|
|
continuation.resume(returning: response)
|
|
}
|
|
}
|
|
}
|
|
|
|
private func handleEvent(_ event: MAEvent) {
|
|
logger.debug("Received event: \(event.event)")
|
|
eventContinuation?.yield(event)
|
|
}
|
|
|
|
private func handleDisconnection() {
|
|
connectionState = .disconnected
|
|
webSocketTask = nil
|
|
|
|
// Cancel pending requests
|
|
requestQueue.sync {
|
|
for (_, continuation) in pendingRequests {
|
|
continuation.resume(throwing: ClientError.notConnected)
|
|
}
|
|
pendingRequests.removeAll()
|
|
}
|
|
|
|
// Attempt reconnection if needed
|
|
if shouldReconnect {
|
|
scheduleReconnect(attempt: 1)
|
|
}
|
|
}
|
|
|
|
// MARK: - Reconnection
|
|
|
|
private func scheduleReconnect(attempt: Int) {
|
|
connectionState = .reconnecting(attempt: attempt)
|
|
|
|
// Exponential backoff: 3s, 10s, 30s, 30s, ...
|
|
let delay = min(
|
|
initialReconnectDelay * pow(2.0, Double(attempt - 1)),
|
|
maxReconnectDelay
|
|
)
|
|
|
|
logger.info("Scheduling reconnect attempt \(attempt) in \(delay)s")
|
|
|
|
reconnectTask = Task {
|
|
try? await Task.sleep(for: .seconds(delay))
|
|
|
|
guard !Task.isCancelled, shouldReconnect else { return }
|
|
|
|
do {
|
|
try await performConnect()
|
|
} catch {
|
|
logger.error("Reconnect attempt \(attempt) failed: \(error.localizedDescription)")
|
|
scheduleReconnect(attempt: attempt + 1)
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - Sending Commands
|
|
|
|
/// Send a command and wait for response
|
|
func sendCommand(
|
|
_ command: String,
|
|
args: [String: Any]? = nil
|
|
) async throws -> MAResponse {
|
|
guard webSocketTask != nil, connectionState == .connected else {
|
|
throw ClientError.notConnected
|
|
}
|
|
|
|
let messageId = UUID().uuidString
|
|
|
|
// Convert args to AnyCodable
|
|
let encodableArgs = args?.mapValues { AnyCodable($0) }
|
|
|
|
let cmd = MACommand(
|
|
messageId: messageId,
|
|
command: command,
|
|
args: encodableArgs
|
|
)
|
|
|
|
let data = try JSONEncoder().encode(cmd)
|
|
guard let json = String(data: data, encoding: .utf8) else {
|
|
throw ClientError.decodingError(NSError(domain: "Encoding", code: -1))
|
|
}
|
|
|
|
logger.debug("Sending command: \(command) (ID: \(messageId))")
|
|
|
|
// Send message and wait for response
|
|
return try await withCheckedThrowingContinuation { continuation in
|
|
requestQueue.sync {
|
|
pendingRequests[messageId] = continuation
|
|
}
|
|
|
|
webSocketTask?.send(.string(json)) { [weak self] error in
|
|
if let error {
|
|
self?.requestQueue.sync {
|
|
_ = self?.pendingRequests.removeValue(forKey: messageId)
|
|
}
|
|
continuation.resume(throwing: error)
|
|
}
|
|
}
|
|
|
|
// Timeout after 30 seconds
|
|
Task {
|
|
try? await Task.sleep(for: .seconds(30))
|
|
self.requestQueue.sync {
|
|
if let cont = self.pendingRequests.removeValue(forKey: messageId) {
|
|
cont.resume(throwing: ClientError.timeout)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Convenience method to send command and decode result
|
|
func sendCommand<T: Decodable>(
|
|
_ command: String,
|
|
args: [String: Any]? = nil,
|
|
resultType: T.Type
|
|
) async throws -> T {
|
|
let response = try await sendCommand(command, args: args)
|
|
|
|
guard let result = response.result else {
|
|
throw ClientError.serverError("No result in response")
|
|
}
|
|
|
|
do {
|
|
return try result.decode(as: T.self)
|
|
} catch {
|
|
throw ClientError.decodingError(error)
|
|
}
|
|
}
|
|
}
|