Initial Commit
This commit is contained in:
@@ -0,0 +1,380 @@
|
||||
//
|
||||
// MAWebSocketClient.swift
|
||||
// Mobile Music Assistant
|
||||
//
|
||||
// Created by Sven Hanold on 26.03.26.
|
||||
//
|
||||
|
||||
import Foundation
|
||||
import OSLog
|
||||
|
||||
private let logger = Logger(subsystem: "com.musicassistant.mobile", category: "WebSocket")
|
||||
|
||||
/// WebSocket client for Music Assistant server communication
|
||||
@Observable
|
||||
final class MAWebSocketClient {
|
||||
enum ConnectionState: Equatable {
|
||||
case disconnected
|
||||
case connecting
|
||||
case connected
|
||||
case reconnecting(attempt: Int)
|
||||
|
||||
var description: String {
|
||||
switch self {
|
||||
case .disconnected: return "Disconnected"
|
||||
case .connecting: return "Connecting..."
|
||||
case .connected: return "Connected"
|
||||
case .reconnecting(let attempt): return "Reconnecting (attempt \(attempt))..."
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum ClientError: LocalizedError {
|
||||
case notConnected
|
||||
case invalidURL
|
||||
case timeout
|
||||
case serverError(String)
|
||||
case decodingError(Error)
|
||||
|
||||
var errorDescription: String? {
|
||||
switch self {
|
||||
case .notConnected:
|
||||
return "Not connected to server"
|
||||
case .invalidURL:
|
||||
return "Invalid server URL"
|
||||
case .timeout:
|
||||
return "Request timeout"
|
||||
case .serverError(let message):
|
||||
return "Server error: \(message)"
|
||||
case .decodingError(let error):
|
||||
return "Failed to decode response: \(error.localizedDescription)"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Properties
|
||||
|
||||
private(set) var connectionState: ConnectionState = .disconnected
|
||||
private var webSocketTask: URLSessionWebSocketTask?
|
||||
private let session: URLSession
|
||||
|
||||
// Request-Response matching
|
||||
private var pendingRequests: [String: CheckedContinuation<MAResponse, Error>] = [:]
|
||||
private let requestQueue = DispatchQueue(label: "com.musicassistant.requests")
|
||||
|
||||
// Event stream
|
||||
private var eventContinuation: AsyncStream<MAEvent>.Continuation?
|
||||
private(set) var eventStream: AsyncStream<MAEvent>
|
||||
|
||||
// Reconnection
|
||||
private var reconnectTask: Task<Void, Never>?
|
||||
private var shouldReconnect = false
|
||||
private let maxReconnectDelay: TimeInterval = 30.0
|
||||
private let initialReconnectDelay: TimeInterval = 3.0
|
||||
|
||||
// Configuration
|
||||
private var serverURL: URL?
|
||||
private var authToken: String?
|
||||
|
||||
// MARK: - Initialization
|
||||
|
||||
init() {
|
||||
let configuration = URLSessionConfiguration.default
|
||||
configuration.timeoutIntervalForRequest = 30
|
||||
configuration.timeoutIntervalForResource = 300
|
||||
self.session = URLSession(configuration: configuration)
|
||||
|
||||
// Initialize event stream
|
||||
var continuation: AsyncStream<MAEvent>.Continuation?
|
||||
self.eventStream = AsyncStream { cont in
|
||||
continuation = cont
|
||||
}
|
||||
self.eventContinuation = continuation
|
||||
}
|
||||
|
||||
deinit {
|
||||
disconnect()
|
||||
}
|
||||
|
||||
// MARK: - Connection Management
|
||||
|
||||
/// Connect to Music Assistant server
|
||||
func connect(serverURL: URL, authToken: String?) async throws {
|
||||
print("🔵 MAWebSocketClient.connect: Checking state")
|
||||
guard connectionState == .disconnected else {
|
||||
logger.info("Already connected or connecting")
|
||||
print("⚠️ MAWebSocketClient.connect: Already connected/connecting, state = \(connectionState)")
|
||||
return
|
||||
}
|
||||
|
||||
print("🔵 MAWebSocketClient.connect: Starting connection")
|
||||
print("🔵 MAWebSocketClient.connect: Server URL = \(serverURL.absoluteString)")
|
||||
print("🔵 MAWebSocketClient.connect: Has auth token = \(authToken != nil)")
|
||||
|
||||
self.serverURL = serverURL
|
||||
self.authToken = authToken
|
||||
self.shouldReconnect = true
|
||||
|
||||
try await performConnect()
|
||||
}
|
||||
|
||||
private func performConnect() async throws {
|
||||
guard let serverURL else {
|
||||
print("❌ MAWebSocketClient.performConnect: No server URL")
|
||||
throw ClientError.invalidURL
|
||||
}
|
||||
|
||||
connectionState = .connecting
|
||||
logger.info("Connecting to \(serverURL.absoluteString)")
|
||||
print("🔵 MAWebSocketClient.performConnect: Building WebSocket URL")
|
||||
|
||||
// Build WebSocket URL (ws:// or wss://)
|
||||
var components = URLComponents(url: serverURL, resolvingAgainstBaseURL: false)!
|
||||
let originalScheme = components.scheme
|
||||
components.scheme = components.scheme == "https" ? "wss" : "ws"
|
||||
components.path = "/ws"
|
||||
|
||||
guard let wsURL = components.url else {
|
||||
print("❌ MAWebSocketClient.performConnect: Failed to build WebSocket URL")
|
||||
throw ClientError.invalidURL
|
||||
}
|
||||
|
||||
print("🔵 MAWebSocketClient.performConnect: Original scheme = \(originalScheme ?? "nil")")
|
||||
print("🔵 MAWebSocketClient.performConnect: WebSocket URL = \(wsURL.absoluteString)")
|
||||
|
||||
var request = URLRequest(url: wsURL)
|
||||
|
||||
// Add auth token if available
|
||||
if let authToken {
|
||||
request.setValue("Bearer \(authToken)", forHTTPHeaderField: "Authorization")
|
||||
print("✅ MAWebSocketClient.performConnect: Authorization header added")
|
||||
} else {
|
||||
print("⚠️ MAWebSocketClient.performConnect: No auth token provided")
|
||||
}
|
||||
|
||||
let task = session.webSocketTask(with: request)
|
||||
self.webSocketTask = task
|
||||
|
||||
print("🔵 MAWebSocketClient.performConnect: Starting WebSocket task")
|
||||
task.resume()
|
||||
|
||||
// Start listening for messages
|
||||
startReceiving()
|
||||
|
||||
connectionState = .connected
|
||||
logger.info("Connected successfully")
|
||||
print("✅ MAWebSocketClient.performConnect: Connection successful")
|
||||
}
|
||||
|
||||
/// Disconnect from server
|
||||
func disconnect() {
|
||||
logger.info("Disconnecting")
|
||||
shouldReconnect = false
|
||||
reconnectTask?.cancel()
|
||||
reconnectTask = nil
|
||||
|
||||
webSocketTask?.cancel(with: .goingAway, reason: nil)
|
||||
webSocketTask = nil
|
||||
|
||||
// Cancel all pending requests
|
||||
requestQueue.sync {
|
||||
for (messageId, continuation) in pendingRequests {
|
||||
continuation.resume(throwing: ClientError.notConnected)
|
||||
}
|
||||
pendingRequests.removeAll()
|
||||
}
|
||||
|
||||
connectionState = .disconnected
|
||||
eventContinuation?.finish()
|
||||
}
|
||||
|
||||
// MARK: - Message Receiving
|
||||
|
||||
private func startReceiving() {
|
||||
guard let task = webSocketTask else { return }
|
||||
|
||||
task.receive { [weak self] result in
|
||||
guard let self else { return }
|
||||
|
||||
switch result {
|
||||
case .success(let message):
|
||||
self.handleMessage(message)
|
||||
// Continue listening
|
||||
self.startReceiving()
|
||||
|
||||
case .failure(let error):
|
||||
logger.error("WebSocket receive error: \(error.localizedDescription)")
|
||||
self.handleDisconnection()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func handleMessage(_ message: URLSessionWebSocketTask.Message) {
|
||||
guard case .string(let text) = message else {
|
||||
logger.warning("Received non-text message")
|
||||
return
|
||||
}
|
||||
|
||||
guard let data = text.data(using: .utf8) else {
|
||||
logger.error("Failed to convert message to data")
|
||||
return
|
||||
}
|
||||
|
||||
// Try to decode as response (has message_id)
|
||||
if let response = try? JSONDecoder().decode(MAResponse.self, from: data),
|
||||
let messageId = response.messageId {
|
||||
handleResponse(messageId: messageId, response: response)
|
||||
return
|
||||
}
|
||||
|
||||
// Try to decode as event
|
||||
if let event = try? JSONDecoder().decode(MAEvent.self, from: data) {
|
||||
handleEvent(event)
|
||||
return
|
||||
}
|
||||
|
||||
logger.warning("Received unknown message format: \(text)")
|
||||
}
|
||||
|
||||
private func handleResponse(messageId: String, response: MAResponse) {
|
||||
requestQueue.sync {
|
||||
guard let continuation = pendingRequests.removeValue(forKey: messageId) else {
|
||||
logger.warning("Received response for unknown message ID: \(messageId)")
|
||||
return
|
||||
}
|
||||
|
||||
// Check for error
|
||||
if let errorCode = response.errorCode {
|
||||
let errorMsg = response.errorMessage ?? errorCode
|
||||
continuation.resume(throwing: ClientError.serverError(errorMsg))
|
||||
} else {
|
||||
continuation.resume(returning: response)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func handleEvent(_ event: MAEvent) {
|
||||
logger.debug("Received event: \(event.event)")
|
||||
eventContinuation?.yield(event)
|
||||
}
|
||||
|
||||
private func handleDisconnection() {
|
||||
connectionState = .disconnected
|
||||
webSocketTask = nil
|
||||
|
||||
// Cancel pending requests
|
||||
requestQueue.sync {
|
||||
for (_, continuation) in pendingRequests {
|
||||
continuation.resume(throwing: ClientError.notConnected)
|
||||
}
|
||||
pendingRequests.removeAll()
|
||||
}
|
||||
|
||||
// Attempt reconnection if needed
|
||||
if shouldReconnect {
|
||||
scheduleReconnect(attempt: 1)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Reconnection
|
||||
|
||||
private func scheduleReconnect(attempt: Int) {
|
||||
connectionState = .reconnecting(attempt: attempt)
|
||||
|
||||
// Exponential backoff: 3s, 10s, 30s, 30s, ...
|
||||
let delay = min(
|
||||
initialReconnectDelay * pow(2.0, Double(attempt - 1)),
|
||||
maxReconnectDelay
|
||||
)
|
||||
|
||||
logger.info("Scheduling reconnect attempt \(attempt) in \(delay)s")
|
||||
|
||||
reconnectTask = Task {
|
||||
try? await Task.sleep(for: .seconds(delay))
|
||||
|
||||
guard !Task.isCancelled, shouldReconnect else { return }
|
||||
|
||||
do {
|
||||
try await performConnect()
|
||||
} catch {
|
||||
logger.error("Reconnect attempt \(attempt) failed: \(error.localizedDescription)")
|
||||
scheduleReconnect(attempt: attempt + 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Sending Commands
|
||||
|
||||
/// Send a command and wait for response
|
||||
func sendCommand(
|
||||
_ command: String,
|
||||
args: [String: Any]? = nil
|
||||
) async throws -> MAResponse {
|
||||
guard webSocketTask != nil, connectionState == .connected else {
|
||||
throw ClientError.notConnected
|
||||
}
|
||||
|
||||
let messageId = UUID().uuidString
|
||||
|
||||
// Convert args to AnyCodable
|
||||
let encodableArgs = args?.mapValues { AnyCodable($0) }
|
||||
|
||||
let cmd = MACommand(
|
||||
messageId: messageId,
|
||||
command: command,
|
||||
args: encodableArgs
|
||||
)
|
||||
|
||||
let data = try JSONEncoder().encode(cmd)
|
||||
guard let json = String(data: data, encoding: .utf8) else {
|
||||
throw ClientError.decodingError(NSError(domain: "Encoding", code: -1))
|
||||
}
|
||||
|
||||
logger.debug("Sending command: \(command) (ID: \(messageId))")
|
||||
|
||||
// Send message and wait for response
|
||||
return try await withCheckedThrowingContinuation { continuation in
|
||||
requestQueue.sync {
|
||||
pendingRequests[messageId] = continuation
|
||||
}
|
||||
|
||||
webSocketTask?.send(.string(json)) { [weak self] error in
|
||||
if let error {
|
||||
self?.requestQueue.sync {
|
||||
_ = self?.pendingRequests.removeValue(forKey: messageId)
|
||||
}
|
||||
continuation.resume(throwing: error)
|
||||
}
|
||||
}
|
||||
|
||||
// Timeout after 30 seconds
|
||||
Task {
|
||||
try? await Task.sleep(for: .seconds(30))
|
||||
self.requestQueue.sync {
|
||||
if let cont = self.pendingRequests.removeValue(forKey: messageId) {
|
||||
cont.resume(throwing: ClientError.timeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience method to send command and decode result
|
||||
func sendCommand<T: Decodable>(
|
||||
_ command: String,
|
||||
args: [String: Any]? = nil,
|
||||
resultType: T.Type
|
||||
) async throws -> T {
|
||||
let response = try await sendCommand(command, args: args)
|
||||
|
||||
guard let result = response.result else {
|
||||
throw ClientError.serverError("No result in response")
|
||||
}
|
||||
|
||||
do {
|
||||
return try result.decode(as: T.self)
|
||||
} catch {
|
||||
throw ClientError.decodingError(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user