SocketEngine.swift 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707
  1. //
  2. // SocketEngine.swift
  3. // Socket.IO-Client-Swift
  4. //
  5. // Created by Erik Little on 3/3/15.
  6. //
  7. // Permission is hereby granted, free of charge, to any person obtaining a copy
  8. // of this software and associated documentation files (the "Software"), to deal
  9. // in the Software without restriction, including without limitation the rights
  10. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11. // copies of the Software, and to permit persons to whom the Software is
  12. // furnished to do so, subject to the following conditions:
  13. //
  14. // The above copyright notice and this permission notice shall be included in
  15. // all copies or substantial portions of the Software.
  16. //
  17. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23. // THE SOFTWARE.
  24. import Dispatch
  25. import Foundation
  26. import Starscream
  27. /// The class that handles the engine.io protocol and transports.
  28. /// See `SocketEnginePollable` and `SocketEngineWebsocket` for transport specific methods.
  29. open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, SocketEngineWebsocket, ConfigSettable {
  30. // MARK: Properties
  31. private static let logType = "SocketEngine"
  32. /// The queue that all engine actions take place on.
  33. public let engineQueue = DispatchQueue(label: "com.socketio.engineHandleQueue")
  34. /// The connect parameters sent during a connect.
  35. public var connectParams: [String: Any]? {
  36. didSet {
  37. (urlPolling, urlWebSocket) = createURLs()
  38. }
  39. }
  40. /// A dictionary of extra http headers that will be set during connection.
  41. public var extraHeaders: [String: String]?
  42. /// A queue of engine.io messages waiting for POSTing
  43. ///
  44. /// **You should not touch this directly**
  45. public var postWait = [Post]()
  46. /// `true` if there is an outstanding poll. Trying to poll before the first is done will cause socket.io to
  47. /// disconnect us.
  48. ///
  49. /// **Do not touch this directly**
  50. public var waitingForPoll = false
  51. /// `true` if there is an outstanding post. Trying to post before the first is done will cause socket.io to
  52. /// disconnect us.
  53. ///
  54. /// **Do not touch this directly**
  55. public var waitingForPost = false
  56. /// `true` if this engine is closed.
  57. public private(set) var closed = false
  58. /// If `true` the engine will attempt to use WebSocket compression.
  59. public private(set) var compress = false
  60. /// `true` if this engine is connected. Connected means that the initial poll connect has succeeded.
  61. public private(set) var connected = false
  62. /// An array of HTTPCookies that are sent during the connection.
  63. public private(set) var cookies: [HTTPCookie]?
  64. /// When `true`, the engine is in the process of switching to WebSockets.
  65. ///
  66. /// **Do not touch this directly**
  67. public private(set) var fastUpgrade = false
  68. /// When `true`, the engine will only use HTTP long-polling as a transport.
  69. public private(set) var forcePolling = false
  70. /// When `true`, the engine will only use WebSockets as a transport.
  71. public private(set) var forceWebsockets = false
  72. /// `true` If engine's session has been invalidated.
  73. public private(set) var invalidated = false
  74. /// If `true`, the engine is currently in HTTP long-polling mode.
  75. public private(set) var polling = true
  76. /// If `true`, the engine is currently seeing whether it can upgrade to WebSockets.
  77. public private(set) var probing = false
  78. /// The URLSession that will be used for polling.
  79. public private(set) var session: URLSession?
  80. /// The session id for this engine.
  81. public private(set) var sid = ""
  82. /// The path to engine.io.
  83. public private(set) var socketPath = "/engine.io/"
  84. /// The url for polling.
  85. public private(set) var urlPolling = URL(string: "http://localhost/")!
  86. /// The url for WebSockets.
  87. public private(set) var urlWebSocket = URL(string: "http://localhost/")!
  88. /// If `true`, then the engine is currently in WebSockets mode.
  89. @available(*, deprecated, message: "No longer needed, if we're not polling, then we must be doing websockets")
  90. public private(set) var websocket = false
  91. /// When `true`, the WebSocket `stream` will be configured with the enableSOCKSProxy `true`.
  92. public private(set) var enableSOCKSProxy = false
  93. /// The WebSocket for this engine.
  94. public private(set) var ws: WebSocket?
  95. /// The client for this engine.
  96. public weak var client: SocketEngineClient?
  97. private weak var sessionDelegate: URLSessionDelegate?
  98. private let url: URL
  99. private var pingInterval: Int?
  100. private var pingTimeout = 0 {
  101. didSet {
  102. pongsMissedMax = Int(pingTimeout / (pingInterval ?? 25000))
  103. }
  104. }
  105. private var pongsMissed = 0
  106. private var pongsMissedMax = 0
  107. private var probeWait = ProbeWaitQueue()
  108. private var secure = false
  109. private var security: SocketIO.SSLSecurity?
  110. private var selfSigned = false
  111. // MARK: Initializers
  112. /// Creates a new engine.
  113. ///
  114. /// - parameter client: The client for this engine.
  115. /// - parameter url: The url for this engine.
  116. /// - parameter config: An array of configuration options for this engine.
  117. public init(client: SocketEngineClient, url: URL, config: SocketIOClientConfiguration) {
  118. self.client = client
  119. self.url = url
  120. super.init()
  121. setConfigs(config)
  122. sessionDelegate = sessionDelegate ?? self
  123. (urlPolling, urlWebSocket) = createURLs()
  124. }
  125. /// Creates a new engine.
  126. ///
  127. /// - parameter client: The client for this engine.
  128. /// - parameter url: The url for this engine.
  129. /// - parameter options: The options for this engine.
  130. public required convenience init(client: SocketEngineClient, url: URL, options: [String: Any]?) {
  131. self.init(client: client, url: url, config: options?.toSocketConfiguration() ?? [])
  132. }
  133. /// :nodoc:
  134. deinit {
  135. DefaultSocketLogger.Logger.log("Engine is being released", type: SocketEngine.logType)
  136. closed = true
  137. stopPolling()
  138. }
  139. // MARK: Methods
  140. private func checkAndHandleEngineError(_ msg: String) {
  141. do {
  142. let dict = try msg.toDictionary()
  143. guard let error = dict["message"] as? String else { return }
  144. /*
  145. 0: Unknown transport
  146. 1: Unknown sid
  147. 2: Bad handshake request
  148. 3: Bad request
  149. */
  150. didError(reason: error)
  151. } catch {
  152. client?.engineDidError(reason: "Got unknown error from server \(msg)")
  153. }
  154. }
  155. private func handleBase64(message: String) {
  156. // binary in base64 string
  157. let noPrefix = String(message[message.index(message.startIndex, offsetBy: 2)..<message.endIndex])
  158. if let data = Data(base64Encoded: noPrefix, options: .ignoreUnknownCharacters) {
  159. client?.parseEngineBinaryData(data)
  160. }
  161. }
  162. private func closeOutEngine(reason: String) {
  163. sid = ""
  164. closed = true
  165. invalidated = true
  166. connected = false
  167. ws?.disconnect()
  168. stopPolling()
  169. client?.engineDidClose(reason: reason)
  170. }
  171. /// Starts the connection to the server.
  172. open func connect() {
  173. engineQueue.async {
  174. self._connect()
  175. }
  176. }
  177. private func _connect() {
  178. if connected {
  179. DefaultSocketLogger.Logger.error("Engine tried opening while connected. Assuming this was a reconnect",
  180. type: SocketEngine.logType)
  181. _disconnect(reason: "reconnect")
  182. }
  183. DefaultSocketLogger.Logger.log("Starting engine. Server: \(url)", type: SocketEngine.logType)
  184. DefaultSocketLogger.Logger.log("Handshaking", type: SocketEngine.logType)
  185. resetEngine()
  186. if forceWebsockets {
  187. polling = false
  188. createWebSocketAndConnect()
  189. return
  190. }
  191. var reqPolling = URLRequest(url: urlPolling, cachePolicy: .reloadIgnoringLocalCacheData, timeoutInterval: 60.0)
  192. addHeaders(to: &reqPolling)
  193. doLongPoll(for: reqPolling)
  194. }
  195. private func createURLs() -> (URL, URL) {
  196. if client == nil {
  197. return (URL(string: "http://localhost/")!, URL(string: "http://localhost/")!)
  198. }
  199. var urlPolling = URLComponents(string: url.absoluteString)!
  200. var urlWebSocket = URLComponents(string: url.absoluteString)!
  201. var queryString = ""
  202. urlWebSocket.path = socketPath
  203. urlPolling.path = socketPath
  204. if secure {
  205. urlPolling.scheme = "https"
  206. urlWebSocket.scheme = "wss"
  207. } else {
  208. urlPolling.scheme = "http"
  209. urlWebSocket.scheme = "ws"
  210. }
  211. if let connectParams = self.connectParams {
  212. for (key, value) in connectParams {
  213. let keyEsc = key.urlEncode()!
  214. let valueEsc = "\(value)".urlEncode()!
  215. queryString += "&\(keyEsc)=\(valueEsc)"
  216. }
  217. }
  218. urlWebSocket.percentEncodedQuery = "transport=websocket" + queryString
  219. urlPolling.percentEncodedQuery = "transport=polling&b64=1" + queryString
  220. return (urlPolling.url!, urlWebSocket.url!)
  221. }
  222. private func createWebSocketAndConnect() {
  223. var req = URLRequest(url: urlWebSocketWithSid)
  224. addHeaders(to: &req, includingCookies: session?.configuration.httpCookieStorage?.cookies(for: urlPollingWithSid))
  225. let stream = FoundationStream()
  226. stream.enableSOCKSProxy = enableSOCKSProxy
  227. ws = WebSocket(request: req, stream: stream)
  228. ws?.callbackQueue = engineQueue
  229. ws?.enableCompression = compress
  230. ws?.disableSSLCertValidation = selfSigned
  231. ws?.security = security?.security
  232. ws?.onConnect = {[weak self] in
  233. guard let this = self else { return }
  234. this.websocketDidConnect()
  235. }
  236. ws?.onDisconnect = {[weak self] error in
  237. guard let this = self else { return }
  238. this.websocketDidDisconnect(error: error)
  239. }
  240. ws?.onData = {[weak self] data in
  241. guard let this = self else { return }
  242. this.parseEngineData(data)
  243. }
  244. ws?.onText = {[weak self] message in
  245. guard let this = self else { return }
  246. this.parseEngineMessage(message)
  247. }
  248. ws?.onHttpResponseHeaders = {[weak self] headers in
  249. guard let this = self else { return }
  250. this.client?.engineDidWebsocketUpgrade(headers: headers)
  251. }
  252. ws?.connect()
  253. }
  254. /// Called when an error happens during execution. Causes a disconnection.
  255. open func didError(reason: String) {
  256. DefaultSocketLogger.Logger.error("\(reason)", type: SocketEngine.logType)
  257. client?.engineDidError(reason: reason)
  258. disconnect(reason: reason)
  259. }
  260. /// Disconnects from the server.
  261. ///
  262. /// - parameter reason: The reason for the disconnection. This is communicated up to the client.
  263. open func disconnect(reason: String) {
  264. engineQueue.async {
  265. self._disconnect(reason: reason)
  266. }
  267. }
  268. private func _disconnect(reason: String) {
  269. guard connected && !closed else { return closeOutEngine(reason: reason) }
  270. DefaultSocketLogger.Logger.log("Engine is being closed.", type: SocketEngine.logType)
  271. if polling {
  272. disconnectPolling(reason: reason)
  273. } else {
  274. sendWebSocketMessage("", withType: .close, withData: [], completion: nil)
  275. closeOutEngine(reason: reason)
  276. }
  277. }
  278. // We need to take special care when we're polling that we send it ASAP
  279. // Also make sure we're on the emitQueue since we're touching postWait
  280. private func disconnectPolling(reason: String) {
  281. postWait.append((String(SocketEnginePacketType.close.rawValue), {}))
  282. doRequest(for: createRequestForPostWithPostWait()) {_, _, _ in }
  283. closeOutEngine(reason: reason)
  284. }
  285. /// Called to switch from HTTP long-polling to WebSockets. After calling this method the engine will be in
  286. /// WebSocket mode.
  287. ///
  288. /// **You shouldn't call this directly**
  289. open func doFastUpgrade() {
  290. if waitingForPoll {
  291. DefaultSocketLogger.Logger.error("Outstanding poll when switched to WebSockets," +
  292. "we'll probably disconnect soon. You should report this.", type: SocketEngine.logType)
  293. }
  294. DefaultSocketLogger.Logger.log("Switching to WebSockets", type: SocketEngine.logType)
  295. sendWebSocketMessage("", withType: .upgrade, withData: [], completion: nil)
  296. polling = false
  297. fastUpgrade = false
  298. probing = false
  299. flushProbeWait()
  300. // Need to flush postWait to socket since it connected successfully
  301. // moved from flushProbeWait() since it is also called on connected failure, and we don't want to try and send
  302. // packets through WebSockets when WebSockets has failed!
  303. if !postWait.isEmpty {
  304. flushWaitingForPostToWebSocket()
  305. }
  306. }
  307. private func flushProbeWait() {
  308. DefaultSocketLogger.Logger.log("Flushing probe wait", type: SocketEngine.logType)
  309. for waiter in probeWait {
  310. write(waiter.msg, withType: waiter.type, withData: waiter.data, completion: waiter.completion)
  311. }
  312. probeWait.removeAll(keepingCapacity: false)
  313. }
  314. /// Causes any packets that were waiting for POSTing to be sent through the WebSocket. This happens because when
  315. /// the engine is attempting to upgrade to WebSocket it does not do any POSTing.
  316. ///
  317. /// **You shouldn't call this directly**
  318. open func flushWaitingForPostToWebSocket() {
  319. guard let ws = self.ws else { return }
  320. for msg in postWait {
  321. ws.write(string: msg.msg, completion: msg.completion)
  322. }
  323. postWait.removeAll(keepingCapacity: false)
  324. }
  325. private func handleClose(_ reason: String) {
  326. client?.engineDidClose(reason: reason)
  327. }
  328. private func handleMessage(_ message: String) {
  329. client?.parseEngineMessage(message)
  330. }
  331. private func handleNOOP() {
  332. doPoll()
  333. }
  334. private func handleOpen(openData: String) {
  335. guard let json = try? openData.toDictionary() else {
  336. didError(reason: "Error parsing open packet")
  337. return
  338. }
  339. guard let sid = json["sid"] as? String else {
  340. didError(reason: "Open packet contained no sid")
  341. return
  342. }
  343. let upgradeWs: Bool
  344. self.sid = sid
  345. connected = true
  346. pongsMissed = 0
  347. if let upgrades = json["upgrades"] as? [String] {
  348. upgradeWs = upgrades.contains("websocket")
  349. } else {
  350. upgradeWs = false
  351. }
  352. if let pingInterval = json["pingInterval"] as? Int, let pingTimeout = json["pingTimeout"] as? Int {
  353. self.pingInterval = pingInterval
  354. self.pingTimeout = pingTimeout
  355. }
  356. if !forcePolling && !forceWebsockets && upgradeWs {
  357. createWebSocketAndConnect()
  358. }
  359. sendPing()
  360. if !forceWebsockets {
  361. doPoll()
  362. }
  363. client?.engineDidOpen(reason: "Connect")
  364. }
  365. private func handlePong(with message: String) {
  366. pongsMissed = 0
  367. // We should upgrade
  368. if message == "3probe" {
  369. DefaultSocketLogger.Logger.log("Received probe response, should upgrade to WebSockets",
  370. type: SocketEngine.logType)
  371. upgradeTransport()
  372. }
  373. client?.engineDidReceivePong()
  374. }
  375. /// Parses raw binary received from engine.io.
  376. ///
  377. /// - parameter data: The data to parse.
  378. open func parseEngineData(_ data: Data) {
  379. DefaultSocketLogger.Logger.log("Got binary data: \(data)", type: SocketEngine.logType)
  380. client?.parseEngineBinaryData(data.subdata(in: 1..<data.endIndex))
  381. }
  382. /// Parses a raw engine.io packet.
  383. ///
  384. /// - parameter message: The message to parse.
  385. open func parseEngineMessage(_ message: String) {
  386. DefaultSocketLogger.Logger.log("Got message: \(message)", type: SocketEngine.logType)
  387. let reader = SocketStringReader(message: message)
  388. if message.hasPrefix("b4") {
  389. return handleBase64(message: message)
  390. }
  391. guard let type = SocketEnginePacketType(rawValue: Int(reader.currentCharacter) ?? -1) else {
  392. checkAndHandleEngineError(message)
  393. return
  394. }
  395. switch type {
  396. case .message:
  397. handleMessage(String(message.dropFirst()))
  398. case .noop:
  399. handleNOOP()
  400. case .pong:
  401. handlePong(with: message)
  402. case .open:
  403. handleOpen(openData: String(message.dropFirst()))
  404. case .close:
  405. handleClose(message)
  406. default:
  407. DefaultSocketLogger.Logger.log("Got unknown packet type", type: SocketEngine.logType)
  408. }
  409. }
  410. // Puts the engine back in its default state
  411. private func resetEngine() {
  412. let queue = OperationQueue()
  413. queue.underlyingQueue = engineQueue
  414. closed = false
  415. connected = false
  416. fastUpgrade = false
  417. polling = true
  418. probing = false
  419. invalidated = false
  420. session = Foundation.URLSession(configuration: .default, delegate: sessionDelegate, delegateQueue: queue)
  421. sid = ""
  422. waitingForPoll = false
  423. waitingForPost = false
  424. }
  425. private func sendPing() {
  426. guard connected, let pingInterval = pingInterval else { return }
  427. // Server is not responding
  428. if pongsMissed > pongsMissedMax {
  429. closeOutEngine(reason: "Ping timeout")
  430. return
  431. }
  432. pongsMissed += 1
  433. write("", withType: .ping, withData: [], completion: nil)
  434. engineQueue.asyncAfter(deadline: .now() + .milliseconds(pingInterval)) {[weak self, id = self.sid] in
  435. // Make sure not to ping old connections
  436. guard let this = self, this.sid == id else { return }
  437. this.sendPing()
  438. }
  439. client?.engineDidSendPing()
  440. }
  441. /// Called when the engine should set/update its configs from a given configuration.
  442. ///
  443. /// parameter config: The `SocketIOClientConfiguration` that should be used to set/update configs.
  444. open func setConfigs(_ config: SocketIOClientConfiguration) {
  445. for option in config {
  446. switch option {
  447. case let .connectParams(params):
  448. connectParams = params
  449. case let .cookies(cookies):
  450. self.cookies = cookies
  451. case let .extraHeaders(headers):
  452. extraHeaders = headers
  453. case let .sessionDelegate(delegate):
  454. sessionDelegate = delegate
  455. case let .forcePolling(force):
  456. forcePolling = force
  457. case let .forceWebsockets(force):
  458. forceWebsockets = force
  459. case let .path(path):
  460. socketPath = path
  461. if !socketPath.hasSuffix("/") {
  462. socketPath += "/"
  463. }
  464. case let .secure(secure):
  465. self.secure = secure
  466. case let .selfSigned(selfSigned):
  467. self.selfSigned = selfSigned
  468. case let .security(security):
  469. self.security = security
  470. case .compress:
  471. self.compress = true
  472. case .enableSOCKSProxy:
  473. self.enableSOCKSProxy = true
  474. default:
  475. continue
  476. }
  477. }
  478. }
  479. // Moves from long-polling to websockets
  480. private func upgradeTransport() {
  481. if ws?.isConnected ?? false {
  482. DefaultSocketLogger.Logger.log("Upgrading transport to WebSockets", type: SocketEngine.logType)
  483. fastUpgrade = true
  484. sendPollMessage("", withType: .noop, withData: [], completion: nil)
  485. // After this point, we should not send anymore polling messages
  486. }
  487. }
  488. /// Writes a message to engine.io, independent of transport.
  489. ///
  490. /// - parameter msg: The message to send.
  491. /// - parameter type: The type of this message.
  492. /// - parameter data: Any data that this message has.
  493. /// - parameter completion: Callback called on transport write completion.
  494. open func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data], completion: (() -> ())? = nil) {
  495. engineQueue.async {
  496. guard self.connected else {
  497. completion?()
  498. return
  499. }
  500. guard !self.probing else {
  501. self.probeWait.append((msg, type, data, completion))
  502. return
  503. }
  504. if self.polling {
  505. DefaultSocketLogger.Logger.log("Writing poll: \(msg) has data: \(data.count != 0)",
  506. type: SocketEngine.logType)
  507. self.sendPollMessage(msg, withType: type, withData: data, completion: completion)
  508. } else {
  509. DefaultSocketLogger.Logger.log("Writing ws: \(msg) has data: \(data.count != 0)",
  510. type: SocketEngine.logType)
  511. self.sendWebSocketMessage(msg, withType: type, withData: data, completion: completion)
  512. }
  513. }
  514. }
  515. // WebSocket Methods
  516. private func websocketDidConnect() {
  517. if !forceWebsockets {
  518. probing = true
  519. probeWebSocket()
  520. } else {
  521. connected = true
  522. probing = false
  523. polling = false
  524. }
  525. }
  526. private func websocketDidDisconnect(error: Error?) {
  527. probing = false
  528. if closed {
  529. client?.engineDidClose(reason: "Disconnect")
  530. return
  531. }
  532. guard !polling else {
  533. flushProbeWait()
  534. return
  535. }
  536. connected = false
  537. polling = true
  538. if let error = error as? WSError {
  539. didError(reason: "\(error.message). code=\(error.code), type=\(error.type)")
  540. } else if let reason = error?.localizedDescription {
  541. didError(reason: reason)
  542. } else {
  543. client?.engineDidClose(reason: "Socket Disconnected")
  544. }
  545. }
  546. // Test Properties
  547. func setConnected(_ value: Bool) {
  548. connected = value
  549. }
  550. }
  551. extension SocketEngine {
  552. // MARK: URLSessionDelegate methods
  553. /// Delegate called when the session becomes invalid.
  554. public func URLSession(session: URLSession, didBecomeInvalidWithError error: NSError?) {
  555. DefaultSocketLogger.Logger.error("Engine URLSession became invalid", type: "SocketEngine")
  556. didError(reason: "Engine URLSession became invalid")
  557. }
  558. }