WebSocket.swift 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356
  1. //////////////////////////////////////////////////////////////////////////////////////////////////
  2. //
  3. // Websocket.swift
  4. //
  5. // Created by Dalton Cherry on 7/16/14.
  6. // Copyright (c) 2014-2017 Dalton Cherry.
  7. //
  8. // Licensed under the Apache License, Version 2.0 (the "License");
  9. // you may not use this file except in compliance with the License.
  10. // You may obtain a copy of the License at
  11. //
  12. // http://www.apache.org/licenses/LICENSE-2.0
  13. //
  14. // Unless required by applicable law or agreed to in writing, software
  15. // distributed under the License is distributed on an "AS IS" BASIS,
  16. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  17. // See the License for the specific language governing permissions and
  18. // limitations under the License.
  19. //
  20. //////////////////////////////////////////////////////////////////////////////////////////////////
  21. import Foundation
  22. import CoreFoundation
  23. import CommonCrypto
  24. public let WebsocketDidConnectNotification = "WebsocketDidConnectNotification"
  25. public let WebsocketDidDisconnectNotification = "WebsocketDidDisconnectNotification"
  26. public let WebsocketDisconnectionErrorKeyName = "WebsocketDisconnectionErrorKeyName"
  27. //Standard WebSocket close codes
  28. public enum CloseCode : UInt16 {
  29. case normal = 1000
  30. case goingAway = 1001
  31. case protocolError = 1002
  32. case protocolUnhandledType = 1003
  33. // 1004 reserved.
  34. case noStatusReceived = 1005
  35. //1006 reserved.
  36. case encoding = 1007
  37. case policyViolated = 1008
  38. case messageTooBig = 1009
  39. }
  40. public enum ErrorType: Error {
  41. case outputStreamWriteError //output stream error during write
  42. case compressionError
  43. case invalidSSLError //Invalid SSL certificate
  44. case writeTimeoutError //The socket timed out waiting to be ready to write
  45. case protocolError //There was an error parsing the WebSocket frames
  46. case upgradeError //There was an error during the HTTP upgrade
  47. case closeError //There was an error during the close (socket probably has been dereferenced)
  48. }
  49. public struct WSError: Error {
  50. public let type: ErrorType
  51. public let message: String
  52. public let code: Int
  53. }
  54. //WebSocketClient is setup to be dependency injection for testing
  55. public protocol WebSocketClient: class {
  56. var delegate: WebSocketDelegate? {get set}
  57. var pongDelegate: WebSocketPongDelegate? {get set}
  58. var disableSSLCertValidation: Bool {get set}
  59. var overrideTrustHostname: Bool {get set}
  60. var desiredTrustHostname: String? {get set}
  61. var sslClientCertificate: SSLClientCertificate? {get set}
  62. #if os(Linux)
  63. #else
  64. var security: SSLTrustValidator? {get set}
  65. var enabledSSLCipherSuites: [SSLCipherSuite]? {get set}
  66. #endif
  67. var isConnected: Bool {get}
  68. func connect()
  69. func disconnect(forceTimeout: TimeInterval?, closeCode: UInt16)
  70. func write(string: String, completion: (() -> ())?)
  71. func write(data: Data, completion: (() -> ())?)
  72. func write(ping: Data, completion: (() -> ())?)
  73. func write(pong: Data, completion: (() -> ())?)
  74. }
  75. //implements some of the base behaviors
  76. extension WebSocketClient {
  77. public func write(string: String) {
  78. write(string: string, completion: nil)
  79. }
  80. public func write(data: Data) {
  81. write(data: data, completion: nil)
  82. }
  83. public func write(ping: Data) {
  84. write(ping: ping, completion: nil)
  85. }
  86. public func write(pong: Data) {
  87. write(pong: pong, completion: nil)
  88. }
  89. public func disconnect() {
  90. disconnect(forceTimeout: nil, closeCode: CloseCode.normal.rawValue)
  91. }
  92. }
  93. //SSL settings for the stream
  94. public struct SSLSettings {
  95. public let useSSL: Bool
  96. public let disableCertValidation: Bool
  97. public var overrideTrustHostname: Bool
  98. public var desiredTrustHostname: String?
  99. public let sslClientCertificate: SSLClientCertificate?
  100. #if os(Linux)
  101. #else
  102. public let cipherSuites: [SSLCipherSuite]?
  103. #endif
  104. }
  105. public protocol WSStreamDelegate: class {
  106. func newBytesInStream()
  107. func streamDidError(error: Error?)
  108. }
  109. //This protocol is to allow custom implemention of the underlining stream. This way custom socket libraries (e.g. linux) can be used
  110. public protocol WSStream {
  111. var delegate: WSStreamDelegate? {get set}
  112. func connect(url: URL, port: Int, timeout: TimeInterval, ssl: SSLSettings, completion: @escaping ((Error?) -> Void))
  113. func write(data: Data) -> Int
  114. func read() -> Data?
  115. func cleanup()
  116. #if os(Linux) || os(watchOS)
  117. #else
  118. func sslTrust() -> (trust: SecTrust?, domain: String?)
  119. #endif
  120. }
  121. open class FoundationStream : NSObject, WSStream, StreamDelegate {
  122. private let workQueue = DispatchQueue(label: "com.vluxe.starscream.websocket", attributes: [])
  123. private var inputStream: InputStream?
  124. private var outputStream: OutputStream?
  125. public weak var delegate: WSStreamDelegate?
  126. let BUFFER_MAX = 4096
  127. public var enableSOCKSProxy = false
  128. public func connect(url: URL, port: Int, timeout: TimeInterval, ssl: SSLSettings, completion: @escaping ((Error?) -> Void)) {
  129. var readStream: Unmanaged<CFReadStream>?
  130. var writeStream: Unmanaged<CFWriteStream>?
  131. let h = url.host! as NSString
  132. CFStreamCreatePairWithSocketToHost(nil, h, UInt32(port), &readStream, &writeStream)
  133. inputStream = readStream!.takeRetainedValue()
  134. outputStream = writeStream!.takeRetainedValue()
  135. #if os(watchOS) //watchOS us unfortunately is missing the kCFStream properties to make this work
  136. #else
  137. if enableSOCKSProxy {
  138. let proxyDict = CFNetworkCopySystemProxySettings()
  139. let socksConfig = CFDictionaryCreateMutableCopy(nil, 0, proxyDict!.takeRetainedValue())
  140. let propertyKey = CFStreamPropertyKey(rawValue: kCFStreamPropertySOCKSProxy)
  141. CFWriteStreamSetProperty(outputStream, propertyKey, socksConfig)
  142. CFReadStreamSetProperty(inputStream, propertyKey, socksConfig)
  143. }
  144. #endif
  145. guard let inStream = inputStream, let outStream = outputStream else { return }
  146. inStream.delegate = self
  147. outStream.delegate = self
  148. if ssl.useSSL {
  149. inStream.setProperty(StreamSocketSecurityLevel.negotiatedSSL as AnyObject, forKey: Stream.PropertyKey.socketSecurityLevelKey)
  150. outStream.setProperty(StreamSocketSecurityLevel.negotiatedSSL as AnyObject, forKey: Stream.PropertyKey.socketSecurityLevelKey)
  151. #if os(watchOS) //watchOS us unfortunately is missing the kCFStream properties to make this work
  152. #else
  153. var settings = [NSObject: NSObject]()
  154. if ssl.disableCertValidation {
  155. settings[kCFStreamSSLValidatesCertificateChain] = NSNumber(value: false)
  156. }
  157. if ssl.overrideTrustHostname {
  158. if let hostname = ssl.desiredTrustHostname {
  159. settings[kCFStreamSSLPeerName] = hostname as NSString
  160. } else {
  161. settings[kCFStreamSSLPeerName] = kCFNull
  162. }
  163. }
  164. if let sslClientCertificate = ssl.sslClientCertificate {
  165. settings[kCFStreamSSLCertificates] = sslClientCertificate.streamSSLCertificates
  166. }
  167. inStream.setProperty(settings, forKey: kCFStreamPropertySSLSettings as Stream.PropertyKey)
  168. outStream.setProperty(settings, forKey: kCFStreamPropertySSLSettings as Stream.PropertyKey)
  169. #endif
  170. #if os(Linux)
  171. #else
  172. if let cipherSuites = ssl.cipherSuites {
  173. #if os(watchOS) //watchOS us unfortunately is missing the kCFStream properties to make this work
  174. #else
  175. if let sslContextIn = CFReadStreamCopyProperty(inputStream, CFStreamPropertyKey(rawValue: kCFStreamPropertySSLContext)) as! SSLContext?,
  176. let sslContextOut = CFWriteStreamCopyProperty(outputStream, CFStreamPropertyKey(rawValue: kCFStreamPropertySSLContext)) as! SSLContext? {
  177. let resIn = SSLSetEnabledCiphers(sslContextIn, cipherSuites, cipherSuites.count)
  178. let resOut = SSLSetEnabledCiphers(sslContextOut, cipherSuites, cipherSuites.count)
  179. if resIn != errSecSuccess {
  180. completion(WSError(type: .invalidSSLError, message: "Error setting ingoing cypher suites", code: Int(resIn)))
  181. }
  182. if resOut != errSecSuccess {
  183. completion(WSError(type: .invalidSSLError, message: "Error setting outgoing cypher suites", code: Int(resOut)))
  184. }
  185. }
  186. #endif
  187. }
  188. #endif
  189. }
  190. CFReadStreamSetDispatchQueue(inStream, workQueue)
  191. CFWriteStreamSetDispatchQueue(outStream, workQueue)
  192. inStream.open()
  193. outStream.open()
  194. var out = timeout// wait X seconds before giving up
  195. workQueue.async { [weak self] in
  196. while !outStream.hasSpaceAvailable {
  197. usleep(100) // wait until the socket is ready
  198. out -= 100
  199. if out < 0 {
  200. completion(WSError(type: .writeTimeoutError, message: "Timed out waiting for the socket to be ready for a write", code: 0))
  201. return
  202. } else if let error = outStream.streamError {
  203. completion(error)
  204. return // disconnectStream will be called.
  205. } else if self == nil {
  206. completion(WSError(type: .closeError, message: "socket object has been dereferenced", code: 0))
  207. return
  208. }
  209. }
  210. completion(nil) //success!
  211. }
  212. }
  213. public func write(data: Data) -> Int {
  214. guard let outStream = outputStream else {return -1}
  215. let buffer = UnsafeRawPointer((data as NSData).bytes).assumingMemoryBound(to: UInt8.self)
  216. return outStream.write(buffer, maxLength: data.count)
  217. }
  218. public func read() -> Data? {
  219. guard let stream = inputStream else {return nil}
  220. let buf = NSMutableData(capacity: BUFFER_MAX)
  221. let buffer = UnsafeMutableRawPointer(mutating: buf!.bytes).assumingMemoryBound(to: UInt8.self)
  222. let length = stream.read(buffer, maxLength: BUFFER_MAX)
  223. if length < 1 {
  224. return nil
  225. }
  226. return Data(bytes: buffer, count: length)
  227. }
  228. public func cleanup() {
  229. if let stream = inputStream {
  230. stream.delegate = nil
  231. CFReadStreamSetDispatchQueue(stream, nil)
  232. stream.close()
  233. }
  234. if let stream = outputStream {
  235. stream.delegate = nil
  236. CFWriteStreamSetDispatchQueue(stream, nil)
  237. stream.close()
  238. }
  239. outputStream = nil
  240. inputStream = nil
  241. }
  242. #if os(Linux) || os(watchOS)
  243. #else
  244. public func sslTrust() -> (trust: SecTrust?, domain: String?) {
  245. guard let outputStream = outputStream else { return (nil, nil) }
  246. let trust = outputStream.property(forKey: kCFStreamPropertySSLPeerTrust as Stream.PropertyKey) as! SecTrust?
  247. var domain = outputStream.property(forKey: kCFStreamSSLPeerName as Stream.PropertyKey) as! String?
  248. if domain == nil,
  249. let sslContextOut = CFWriteStreamCopyProperty(outputStream, CFStreamPropertyKey(rawValue: kCFStreamPropertySSLContext)) as! SSLContext? {
  250. var peerNameLen: Int = 0
  251. SSLGetPeerDomainNameLength(sslContextOut, &peerNameLen)
  252. var peerName = Data(count: peerNameLen)
  253. let _ = peerName.withUnsafeMutableBytes { (peerNamePtr: UnsafeMutablePointer<Int8>) in
  254. SSLGetPeerDomainName(sslContextOut, peerNamePtr, &peerNameLen)
  255. }
  256. if let peerDomain = String(bytes: peerName, encoding: .utf8), peerDomain.count > 0 {
  257. domain = peerDomain
  258. }
  259. }
  260. return (trust, domain)
  261. }
  262. #endif
  263. /**
  264. Delegate for the stream methods. Processes incoming bytes
  265. */
  266. open func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
  267. if eventCode == .hasBytesAvailable {
  268. if aStream == inputStream {
  269. delegate?.newBytesInStream()
  270. }
  271. } else if eventCode == .errorOccurred {
  272. delegate?.streamDidError(error: aStream.streamError)
  273. } else if eventCode == .endEncountered {
  274. delegate?.streamDidError(error: nil)
  275. }
  276. }
  277. }
  278. //WebSocket implementation
  279. //standard delegate you should use
  280. public protocol WebSocketDelegate: class {
  281. func websocketDidConnect(socket: WebSocketClient)
  282. func websocketDidDisconnect(socket: WebSocketClient, error: Error?)
  283. func websocketDidReceiveMessage(socket: WebSocketClient, text: String)
  284. func websocketDidReceiveData(socket: WebSocketClient, data: Data)
  285. }
  286. //got pongs
  287. public protocol WebSocketPongDelegate: class {
  288. func websocketDidReceivePong(socket: WebSocketClient, data: Data?)
  289. }
  290. // A Delegate with more advanced info on messages and connection etc.
  291. public protocol WebSocketAdvancedDelegate: class {
  292. func websocketDidConnect(socket: WebSocket)
  293. func websocketDidDisconnect(socket: WebSocket, error: Error?)
  294. func websocketDidReceiveMessage(socket: WebSocket, text: String, response: WebSocket.WSResponse)
  295. func websocketDidReceiveData(socket: WebSocket, data: Data, response: WebSocket.WSResponse)
  296. func websocketHttpUpgrade(socket: WebSocket, request: String)
  297. func websocketHttpUpgrade(socket: WebSocket, response: String)
  298. }
  299. open class WebSocket : NSObject, StreamDelegate, WebSocketClient, WSStreamDelegate {
  300. public enum OpCode : UInt8 {
  301. case continueFrame = 0x0
  302. case textFrame = 0x1
  303. case binaryFrame = 0x2
  304. // 3-7 are reserved.
  305. case connectionClose = 0x8
  306. case ping = 0x9
  307. case pong = 0xA
  308. // B-F reserved.
  309. }
  310. public static let ErrorDomain = "WebSocket"
  311. // Where the callback is executed. It defaults to the main UI thread queue.
  312. public var callbackQueue = DispatchQueue.main
  313. // MARK: - Constants
  314. let headerWSUpgradeName = "Upgrade"
  315. let headerWSUpgradeValue = "websocket"
  316. let headerWSHostName = "Host"
  317. let headerWSConnectionName = "Connection"
  318. let headerWSConnectionValue = "Upgrade"
  319. let headerWSProtocolName = "Sec-WebSocket-Protocol"
  320. let headerWSVersionName = "Sec-WebSocket-Version"
  321. let headerWSVersionValue = "13"
  322. let headerWSExtensionName = "Sec-WebSocket-Extensions"
  323. let headerWSKeyName = "Sec-WebSocket-Key"
  324. let headerOriginName = "Origin"
  325. let headerWSAcceptName = "Sec-WebSocket-Accept"
  326. let BUFFER_MAX = 4096
  327. let FinMask: UInt8 = 0x80
  328. let OpCodeMask: UInt8 = 0x0F
  329. let RSVMask: UInt8 = 0x70
  330. let RSV1Mask: UInt8 = 0x40
  331. let MaskMask: UInt8 = 0x80
  332. let PayloadLenMask: UInt8 = 0x7F
  333. let MaxFrameSize: Int = 32
  334. let httpSwitchProtocolCode = 101
  335. let supportedSSLSchemes = ["wss", "https"]
  336. public class WSResponse {
  337. var isFin = false
  338. public var code: OpCode = .continueFrame
  339. var bytesLeft = 0
  340. public var frameCount = 0
  341. public var buffer: NSMutableData?
  342. public let firstFrame = {
  343. return Date()
  344. }()
  345. }
  346. // MARK: - Delegates
  347. /// Responds to callback about new messages coming in over the WebSocket
  348. /// and also connection/disconnect messages.
  349. public weak var delegate: WebSocketDelegate?
  350. /// The optional advanced delegate can be used instead of of the delegate
  351. public weak var advancedDelegate: WebSocketAdvancedDelegate?
  352. /// Receives a callback for each pong message recived.
  353. public weak var pongDelegate: WebSocketPongDelegate?
  354. public var onConnect: (() -> Void)?
  355. public var onDisconnect: ((Error?) -> Void)?
  356. public var onText: ((String) -> Void)?
  357. public var onData: ((Data) -> Void)?
  358. public var onPong: ((Data?) -> Void)?
  359. public var onHttpResponseHeaders: (([String: String]) -> Void)?
  360. public var disableSSLCertValidation = false
  361. public var overrideTrustHostname = false
  362. public var desiredTrustHostname: String? = nil
  363. public var sslClientCertificate: SSLClientCertificate? = nil
  364. public var enableCompression = true
  365. #if os(Linux)
  366. #else
  367. public var security: SSLTrustValidator?
  368. public var enabledSSLCipherSuites: [SSLCipherSuite]?
  369. #endif
  370. public var isConnected: Bool {
  371. mutex.lock()
  372. let isConnected = connected
  373. mutex.unlock()
  374. return isConnected
  375. }
  376. public var request: URLRequest //this is only public to allow headers, timeout, etc to be modified on reconnect
  377. public var currentURL: URL { return request.url! }
  378. public var respondToPingWithPong: Bool = true
  379. // MARK: - Private
  380. private struct CompressionState {
  381. var supportsCompression = false
  382. var messageNeedsDecompression = false
  383. var serverMaxWindowBits = 15
  384. var clientMaxWindowBits = 15
  385. var clientNoContextTakeover = false
  386. var serverNoContextTakeover = false
  387. var decompressor:Decompressor? = nil
  388. var compressor:Compressor? = nil
  389. }
  390. private var stream: WSStream
  391. private var connected = false
  392. private var isConnecting = false
  393. private let mutex = NSLock()
  394. private var compressionState = CompressionState()
  395. private var writeQueue = OperationQueue()
  396. private var readStack = [WSResponse]()
  397. private var inputQueue = [Data]()
  398. private var fragBuffer: Data?
  399. private var certValidated = false
  400. private var didDisconnect = false
  401. private var readyToWrite = false
  402. private var headerSecKey = ""
  403. private var canDispatch: Bool {
  404. mutex.lock()
  405. let canWork = readyToWrite
  406. mutex.unlock()
  407. return canWork
  408. }
  409. /// Used for setting protocols.
  410. public init(request: URLRequest, protocols: [String]? = nil, stream: WSStream = FoundationStream()) {
  411. self.request = request
  412. self.stream = stream
  413. if request.value(forHTTPHeaderField: headerOriginName) == nil {
  414. guard let url = request.url else {return}
  415. var origin = url.absoluteString
  416. if let hostUrl = URL (string: "/", relativeTo: url) {
  417. origin = hostUrl.absoluteString
  418. origin.remove(at: origin.index(before: origin.endIndex))
  419. }
  420. self.request.setValue(origin, forHTTPHeaderField: headerOriginName)
  421. }
  422. if let protocols = protocols, !protocols.isEmpty {
  423. self.request.setValue(protocols.joined(separator: ","), forHTTPHeaderField: headerWSProtocolName)
  424. }
  425. writeQueue.maxConcurrentOperationCount = 1
  426. }
  427. public convenience init(url: URL, protocols: [String]? = nil) {
  428. var request = URLRequest(url: url)
  429. request.timeoutInterval = 5
  430. self.init(request: request, protocols: protocols)
  431. }
  432. // Used for specifically setting the QOS for the write queue.
  433. public convenience init(url: URL, writeQueueQOS: QualityOfService, protocols: [String]? = nil) {
  434. self.init(url: url, protocols: protocols)
  435. writeQueue.qualityOfService = writeQueueQOS
  436. }
  437. /**
  438. Connect to the WebSocket server on a background thread.
  439. */
  440. open func connect() {
  441. guard !isConnecting else { return }
  442. didDisconnect = false
  443. isConnecting = true
  444. createHTTPRequest()
  445. }
  446. /**
  447. Disconnect from the server. I send a Close control frame to the server, then expect the server to respond with a Close control frame and close the socket from its end. I notify my delegate once the socket has been closed.
  448. If you supply a non-nil `forceTimeout`, I wait at most that long (in seconds) for the server to close the socket. After the timeout expires, I close the socket and notify my delegate.
  449. If you supply a zero (or negative) `forceTimeout`, I immediately close the socket (without sending a Close control frame) and notify my delegate.
  450. - Parameter forceTimeout: Maximum time to wait for the server to close the socket.
  451. - Parameter closeCode: The code to send on disconnect. The default is the normal close code for cleanly disconnecting a webSocket.
  452. */
  453. open func disconnect(forceTimeout: TimeInterval? = nil, closeCode: UInt16 = CloseCode.normal.rawValue) {
  454. guard isConnected else { return }
  455. switch forceTimeout {
  456. case .some(let seconds) where seconds > 0:
  457. let milliseconds = Int(seconds * 1_000)
  458. callbackQueue.asyncAfter(deadline: .now() + .milliseconds(milliseconds)) { [weak self] in
  459. self?.disconnectStream(nil)
  460. }
  461. fallthrough
  462. case .none:
  463. writeError(closeCode)
  464. default:
  465. disconnectStream(nil)
  466. break
  467. }
  468. }
  469. /**
  470. Write a string to the websocket. This sends it as a text frame.
  471. If you supply a non-nil completion block, I will perform it when the write completes.
  472. - parameter string: The string to write.
  473. - parameter completion: The (optional) completion handler.
  474. */
  475. open func write(string: String, completion: (() -> ())? = nil) {
  476. guard isConnected else { return }
  477. dequeueWrite(string.data(using: String.Encoding.utf8)!, code: .textFrame, writeCompletion: completion)
  478. }
  479. /**
  480. Write binary data to the websocket. This sends it as a binary frame.
  481. If you supply a non-nil completion block, I will perform it when the write completes.
  482. - parameter data: The data to write.
  483. - parameter completion: The (optional) completion handler.
  484. */
  485. open func write(data: Data, completion: (() -> ())? = nil) {
  486. guard isConnected else { return }
  487. dequeueWrite(data, code: .binaryFrame, writeCompletion: completion)
  488. }
  489. /**
  490. Write a ping to the websocket. This sends it as a control frame.
  491. Yodel a sound to the planet. This sends it as an astroid. http://youtu.be/Eu5ZJELRiJ8?t=42s
  492. */
  493. open func write(ping: Data, completion: (() -> ())? = nil) {
  494. guard isConnected else { return }
  495. dequeueWrite(ping, code: .ping, writeCompletion: completion)
  496. }
  497. /**
  498. Write a pong to the websocket. This sends it as a control frame.
  499. Respond to a Yodel.
  500. */
  501. open func write(pong: Data, completion: (() -> ())? = nil) {
  502. guard isConnected else { return }
  503. dequeueWrite(pong, code: .pong, writeCompletion: completion)
  504. }
  505. /**
  506. Private method that starts the connection.
  507. */
  508. private func createHTTPRequest() {
  509. guard let url = request.url else {return}
  510. var port = url.port
  511. if port == nil {
  512. if supportedSSLSchemes.contains(url.scheme!) {
  513. port = 443
  514. } else {
  515. port = 80
  516. }
  517. }
  518. request.setValue(headerWSUpgradeValue, forHTTPHeaderField: headerWSUpgradeName)
  519. request.setValue(headerWSConnectionValue, forHTTPHeaderField: headerWSConnectionName)
  520. headerSecKey = generateWebSocketKey()
  521. request.setValue(headerWSVersionValue, forHTTPHeaderField: headerWSVersionName)
  522. request.setValue(headerSecKey, forHTTPHeaderField: headerWSKeyName)
  523. if enableCompression {
  524. let val = "permessage-deflate; client_max_window_bits; server_max_window_bits=15"
  525. request.setValue(val, forHTTPHeaderField: headerWSExtensionName)
  526. }
  527. let hostValue = request.allHTTPHeaderFields?[headerWSHostName] ?? "\(url.host!):\(port!)"
  528. request.setValue(hostValue, forHTTPHeaderField: headerWSHostName)
  529. var path = url.absoluteString
  530. let offset = (url.scheme?.count ?? 2) + 3
  531. path = String(path[path.index(path.startIndex, offsetBy: offset)..<path.endIndex])
  532. if let range = path.range(of: "/") {
  533. path = String(path[range.lowerBound..<path.endIndex])
  534. } else {
  535. path = "/"
  536. if let query = url.query {
  537. path += "?" + query
  538. }
  539. }
  540. var httpBody = "\(request.httpMethod ?? "GET") \(path) HTTP/1.1\r\n"
  541. if let headers = request.allHTTPHeaderFields {
  542. for (key, val) in headers {
  543. httpBody += "\(key): \(val)\r\n"
  544. }
  545. }
  546. httpBody += "\r\n"
  547. initStreamsWithData(httpBody.data(using: .utf8)!, Int(port!))
  548. advancedDelegate?.websocketHttpUpgrade(socket: self, request: httpBody)
  549. }
  550. /**
  551. Generate a WebSocket key as needed in RFC.
  552. */
  553. private func generateWebSocketKey() -> String {
  554. var key = ""
  555. let seed = 16
  556. for _ in 0..<seed {
  557. let uni = UnicodeScalar(UInt32(97 + arc4random_uniform(25)))
  558. key += "\(Character(uni!))"
  559. }
  560. let data = key.data(using: String.Encoding.utf8)
  561. let baseKey = data?.base64EncodedString(options: NSData.Base64EncodingOptions(rawValue: 0))
  562. return baseKey!
  563. }
  564. /**
  565. Start the stream connection and write the data to the output stream.
  566. */
  567. private func initStreamsWithData(_ data: Data, _ port: Int) {
  568. guard let url = request.url else {
  569. disconnectStream(nil, runDelegate: true)
  570. return
  571. }
  572. // Disconnect and clean up any existing streams before setting up a new pair
  573. disconnectStream(nil, runDelegate: false)
  574. let useSSL = supportedSSLSchemes.contains(url.scheme!)
  575. #if os(Linux)
  576. let settings = SSLSettings(useSSL: useSSL,
  577. disableCertValidation: disableSSLCertValidation,
  578. overrideTrustHostname: overrideTrustHostname,
  579. desiredTrustHostname: desiredTrustHostname),
  580. sslClientCertificate: sslClientCertificate
  581. #else
  582. let settings = SSLSettings(useSSL: useSSL,
  583. disableCertValidation: disableSSLCertValidation,
  584. overrideTrustHostname: overrideTrustHostname,
  585. desiredTrustHostname: desiredTrustHostname,
  586. sslClientCertificate: sslClientCertificate,
  587. cipherSuites: self.enabledSSLCipherSuites)
  588. #endif
  589. certValidated = !useSSL
  590. let timeout = request.timeoutInterval * 1_000_000
  591. stream.delegate = self
  592. stream.connect(url: url, port: port, timeout: timeout, ssl: settings, completion: { [weak self] (error) in
  593. guard let self = self else {return}
  594. if error != nil {
  595. self.disconnectStream(error)
  596. return
  597. }
  598. let operation = BlockOperation()
  599. operation.addExecutionBlock { [weak self, weak operation] in
  600. guard let sOperation = operation, let self = self else { return }
  601. guard !sOperation.isCancelled else { return }
  602. // Do the pinning now if needed
  603. #if os(Linux) || os(watchOS)
  604. self.certValidated = false
  605. #else
  606. if let sec = self.security, !self.certValidated {
  607. let trustObj = self.stream.sslTrust()
  608. if let possibleTrust = trustObj.trust {
  609. self.certValidated = sec.isValid(possibleTrust, domain: trustObj.domain)
  610. } else {
  611. self.certValidated = false
  612. }
  613. if !self.certValidated {
  614. self.disconnectStream(WSError(type: .invalidSSLError, message: "Invalid SSL certificate", code: 0))
  615. return
  616. }
  617. }
  618. #endif
  619. let _ = self.stream.write(data: data)
  620. }
  621. self.writeQueue.addOperation(operation)
  622. })
  623. self.mutex.lock()
  624. self.readyToWrite = true
  625. self.mutex.unlock()
  626. }
  627. /**
  628. Delegate for the stream methods. Processes incoming bytes
  629. */
  630. public func newBytesInStream() {
  631. processInputStream()
  632. }
  633. public func streamDidError(error: Error?) {
  634. disconnectStream(error)
  635. }
  636. /**
  637. Disconnect the stream object and notifies the delegate.
  638. */
  639. private func disconnectStream(_ error: Error?, runDelegate: Bool = true) {
  640. if error == nil {
  641. writeQueue.waitUntilAllOperationsAreFinished()
  642. } else {
  643. writeQueue.cancelAllOperations()
  644. }
  645. mutex.lock()
  646. cleanupStream()
  647. connected = false
  648. mutex.unlock()
  649. if runDelegate {
  650. doDisconnect(error)
  651. }
  652. }
  653. /**
  654. cleanup the streams.
  655. */
  656. private func cleanupStream() {
  657. stream.cleanup()
  658. fragBuffer = nil
  659. }
  660. /**
  661. Handles the incoming bytes and sending them to the proper processing method.
  662. */
  663. private func processInputStream() {
  664. let data = stream.read()
  665. guard let d = data else { return }
  666. var process = false
  667. if inputQueue.count == 0 {
  668. process = true
  669. }
  670. inputQueue.append(d)
  671. if process {
  672. dequeueInput()
  673. }
  674. }
  675. /**
  676. Dequeue the incoming input so it is processed in order.
  677. */
  678. private func dequeueInput() {
  679. while !inputQueue.isEmpty {
  680. autoreleasepool {
  681. let data = inputQueue[0]
  682. var work = data
  683. if let buffer = fragBuffer {
  684. var combine = NSData(data: buffer) as Data
  685. combine.append(data)
  686. work = combine
  687. fragBuffer = nil
  688. }
  689. let buffer = UnsafeRawPointer((work as NSData).bytes).assumingMemoryBound(to: UInt8.self)
  690. let length = work.count
  691. if !connected {
  692. processTCPHandshake(buffer, bufferLen: length)
  693. } else {
  694. processRawMessagesInBuffer(buffer, bufferLen: length)
  695. }
  696. inputQueue = inputQueue.filter{ $0 != data }
  697. }
  698. }
  699. }
  700. /**
  701. Handle checking the inital connection status
  702. */
  703. private func processTCPHandshake(_ buffer: UnsafePointer<UInt8>, bufferLen: Int) {
  704. let code = processHTTP(buffer, bufferLen: bufferLen)
  705. switch code {
  706. case 0:
  707. break
  708. case -1:
  709. fragBuffer = Data(bytes: buffer, count: bufferLen)
  710. break // do nothing, we are going to collect more data
  711. default:
  712. doDisconnect(WSError(type: .upgradeError, message: "Invalid HTTP upgrade", code: code))
  713. }
  714. }
  715. /**
  716. Finds the HTTP Packet in the TCP stream, by looking for the CRLF.
  717. */
  718. private func processHTTP(_ buffer: UnsafePointer<UInt8>, bufferLen: Int) -> Int {
  719. let CRLFBytes = [UInt8(ascii: "\r"), UInt8(ascii: "\n"), UInt8(ascii: "\r"), UInt8(ascii: "\n")]
  720. var k = 0
  721. var totalSize = 0
  722. for i in 0..<bufferLen {
  723. if buffer[i] == CRLFBytes[k] {
  724. k += 1
  725. if k == 4 {
  726. totalSize = i + 1
  727. break
  728. }
  729. } else {
  730. k = 0
  731. }
  732. }
  733. if totalSize > 0 {
  734. let code = validateResponse(buffer, bufferLen: totalSize)
  735. if code != 0 {
  736. return code
  737. }
  738. isConnecting = false
  739. mutex.lock()
  740. connected = true
  741. mutex.unlock()
  742. didDisconnect = false
  743. if canDispatch {
  744. callbackQueue.async { [weak self] in
  745. guard let self = self else { return }
  746. self.onConnect?()
  747. self.delegate?.websocketDidConnect(socket: self)
  748. self.advancedDelegate?.websocketDidConnect(socket: self)
  749. NotificationCenter.default.post(name: NSNotification.Name(WebsocketDidConnectNotification), object: self)
  750. }
  751. }
  752. //totalSize += 1 //skip the last \n
  753. let restSize = bufferLen - totalSize
  754. if restSize > 0 {
  755. processRawMessagesInBuffer(buffer + totalSize, bufferLen: restSize)
  756. }
  757. return 0 //success
  758. }
  759. return -1 // Was unable to find the full TCP header.
  760. }
  761. /**
  762. Validates the HTTP is a 101 as per the RFC spec.
  763. */
  764. private func validateResponse(_ buffer: UnsafePointer<UInt8>, bufferLen: Int) -> Int {
  765. guard let str = String(data: Data(bytes: buffer, count: bufferLen), encoding: .utf8) else { return -1 }
  766. let splitArr = str.components(separatedBy: "\r\n")
  767. var code = -1
  768. var i = 0
  769. var headers = [String: String]()
  770. for str in splitArr {
  771. if i == 0 {
  772. let responseSplit = str.components(separatedBy: .whitespaces)
  773. guard responseSplit.count > 1 else { return -1 }
  774. if let c = Int(responseSplit[1]) {
  775. code = c
  776. }
  777. } else {
  778. let responseSplit = str.components(separatedBy: ":")
  779. guard responseSplit.count > 1 else { break }
  780. let key = responseSplit[0].trimmingCharacters(in: .whitespaces)
  781. let val = responseSplit[1].trimmingCharacters(in: .whitespaces)
  782. headers[key.lowercased()] = val
  783. }
  784. i += 1
  785. }
  786. advancedDelegate?.websocketHttpUpgrade(socket: self, response: str)
  787. onHttpResponseHeaders?(headers)
  788. if code != httpSwitchProtocolCode {
  789. return code
  790. }
  791. if let extensionHeader = headers[headerWSExtensionName.lowercased()] {
  792. processExtensionHeader(extensionHeader)
  793. }
  794. if let acceptKey = headers[headerWSAcceptName.lowercased()] {
  795. if acceptKey.count > 0 {
  796. if headerSecKey.count > 0 {
  797. let sha = "\(headerSecKey)258EAFA5-E914-47DA-95CA-C5AB0DC85B11".sha1Base64()
  798. if sha != acceptKey as String {
  799. return -1
  800. }
  801. }
  802. return 0
  803. }
  804. }
  805. return -1
  806. }
  807. /**
  808. Parses the extension header, setting up the compression parameters.
  809. */
  810. func processExtensionHeader(_ extensionHeader: String) {
  811. let parts = extensionHeader.components(separatedBy: ";")
  812. for p in parts {
  813. let part = p.trimmingCharacters(in: .whitespaces)
  814. if part == "permessage-deflate" {
  815. compressionState.supportsCompression = true
  816. } else if part.hasPrefix("server_max_window_bits=") {
  817. let valString = part.components(separatedBy: "=")[1]
  818. if let val = Int(valString.trimmingCharacters(in: .whitespaces)) {
  819. compressionState.serverMaxWindowBits = val
  820. }
  821. } else if part.hasPrefix("client_max_window_bits=") {
  822. let valString = part.components(separatedBy: "=")[1]
  823. if let val = Int(valString.trimmingCharacters(in: .whitespaces)) {
  824. compressionState.clientMaxWindowBits = val
  825. }
  826. } else if part == "client_no_context_takeover" {
  827. compressionState.clientNoContextTakeover = true
  828. } else if part == "server_no_context_takeover" {
  829. compressionState.serverNoContextTakeover = true
  830. }
  831. }
  832. if compressionState.supportsCompression {
  833. compressionState.decompressor = Decompressor(windowBits: compressionState.serverMaxWindowBits)
  834. compressionState.compressor = Compressor(windowBits: compressionState.clientMaxWindowBits)
  835. }
  836. }
  837. /**
  838. Read a 16 bit big endian value from a buffer
  839. */
  840. private static func readUint16(_ buffer: UnsafePointer<UInt8>, offset: Int) -> UInt16 {
  841. return (UInt16(buffer[offset + 0]) << 8) | UInt16(buffer[offset + 1])
  842. }
  843. /**
  844. Read a 64 bit big endian value from a buffer
  845. */
  846. private static func readUint64(_ buffer: UnsafePointer<UInt8>, offset: Int) -> UInt64 {
  847. var value = UInt64(0)
  848. for i in 0...7 {
  849. value = (value << 8) | UInt64(buffer[offset + i])
  850. }
  851. return value
  852. }
  853. /**
  854. Write a 16-bit big endian value to a buffer.
  855. */
  856. private static func writeUint16(_ buffer: UnsafeMutablePointer<UInt8>, offset: Int, value: UInt16) {
  857. buffer[offset + 0] = UInt8(value >> 8)
  858. buffer[offset + 1] = UInt8(value & 0xff)
  859. }
  860. /**
  861. Write a 64-bit big endian value to a buffer.
  862. */
  863. private static func writeUint64(_ buffer: UnsafeMutablePointer<UInt8>, offset: Int, value: UInt64) {
  864. for i in 0...7 {
  865. buffer[offset + i] = UInt8((value >> (8*UInt64(7 - i))) & 0xff)
  866. }
  867. }
  868. /**
  869. Process one message at the start of `buffer`. Return another buffer (sharing storage) that contains the leftover contents of `buffer` that I didn't process.
  870. */
  871. private func processOneRawMessage(inBuffer buffer: UnsafeBufferPointer<UInt8>) -> UnsafeBufferPointer<UInt8> {
  872. let response = readStack.last
  873. guard let baseAddress = buffer.baseAddress else {return emptyBuffer}
  874. let bufferLen = buffer.count
  875. if response != nil && bufferLen < 2 {
  876. fragBuffer = Data(buffer: buffer)
  877. return emptyBuffer
  878. }
  879. if let response = response, response.bytesLeft > 0 {
  880. var len = response.bytesLeft
  881. var extra = bufferLen - response.bytesLeft
  882. if response.bytesLeft > bufferLen {
  883. len = bufferLen
  884. extra = 0
  885. }
  886. response.bytesLeft -= len
  887. response.buffer?.append(Data(bytes: baseAddress, count: len))
  888. _ = processResponse(response)
  889. return buffer.fromOffset(bufferLen - extra)
  890. } else {
  891. let isFin = (FinMask & baseAddress[0])
  892. let receivedOpcodeRawValue = (OpCodeMask & baseAddress[0])
  893. let receivedOpcode = OpCode(rawValue: receivedOpcodeRawValue)
  894. let isMasked = (MaskMask & baseAddress[1])
  895. let payloadLen = (PayloadLenMask & baseAddress[1])
  896. var offset = 2
  897. if compressionState.supportsCompression && receivedOpcode != .continueFrame {
  898. compressionState.messageNeedsDecompression = (RSV1Mask & baseAddress[0]) > 0
  899. }
  900. if (isMasked > 0 || (RSVMask & baseAddress[0]) > 0) && receivedOpcode != .pong && !compressionState.messageNeedsDecompression {
  901. let errCode = CloseCode.protocolError.rawValue
  902. doDisconnect(WSError(type: .protocolError, message: "masked and rsv data is not currently supported", code: Int(errCode)))
  903. writeError(errCode)
  904. return emptyBuffer
  905. }
  906. let isControlFrame = (receivedOpcode == .connectionClose || receivedOpcode == .ping)
  907. if !isControlFrame && (receivedOpcode != .binaryFrame && receivedOpcode != .continueFrame &&
  908. receivedOpcode != .textFrame && receivedOpcode != .pong) {
  909. let errCode = CloseCode.protocolError.rawValue
  910. doDisconnect(WSError(type: .protocolError, message: "unknown opcode: \(receivedOpcodeRawValue)", code: Int(errCode)))
  911. writeError(errCode)
  912. return emptyBuffer
  913. }
  914. if isControlFrame && isFin == 0 {
  915. let errCode = CloseCode.protocolError.rawValue
  916. doDisconnect(WSError(type: .protocolError, message: "control frames can't be fragmented", code: Int(errCode)))
  917. writeError(errCode)
  918. return emptyBuffer
  919. }
  920. var closeCode = CloseCode.normal.rawValue
  921. if receivedOpcode == .connectionClose {
  922. if payloadLen == 1 {
  923. closeCode = CloseCode.protocolError.rawValue
  924. } else if payloadLen > 1 {
  925. closeCode = WebSocket.readUint16(baseAddress, offset: offset)
  926. if closeCode < 1000 || (closeCode > 1003 && closeCode < 1007) || (closeCode > 1013 && closeCode < 3000) {
  927. closeCode = CloseCode.protocolError.rawValue
  928. }
  929. }
  930. if payloadLen < 2 {
  931. doDisconnect(WSError(type: .protocolError, message: "connection closed by server", code: Int(closeCode)))
  932. writeError(closeCode)
  933. return emptyBuffer
  934. }
  935. } else if isControlFrame && payloadLen > 125 {
  936. writeError(CloseCode.protocolError.rawValue)
  937. return emptyBuffer
  938. }
  939. var dataLength = UInt64(payloadLen)
  940. if dataLength == 127 {
  941. dataLength = WebSocket.readUint64(baseAddress, offset: offset)
  942. offset += MemoryLayout<UInt64>.size
  943. } else if dataLength == 126 {
  944. dataLength = UInt64(WebSocket.readUint16(baseAddress, offset: offset))
  945. offset += MemoryLayout<UInt16>.size
  946. }
  947. if bufferLen < offset || UInt64(bufferLen - offset) < dataLength {
  948. fragBuffer = Data(bytes: baseAddress, count: bufferLen)
  949. return emptyBuffer
  950. }
  951. var len = dataLength
  952. if dataLength > UInt64(bufferLen) {
  953. len = UInt64(bufferLen-offset)
  954. }
  955. if receivedOpcode == .connectionClose && len > 0 {
  956. let size = MemoryLayout<UInt16>.size
  957. offset += size
  958. len -= UInt64(size)
  959. }
  960. let data: Data
  961. if compressionState.messageNeedsDecompression, let decompressor = compressionState.decompressor {
  962. do {
  963. data = try decompressor.decompress(bytes: baseAddress+offset, count: Int(len), finish: isFin > 0)
  964. if isFin > 0 && compressionState.serverNoContextTakeover {
  965. try decompressor.reset()
  966. }
  967. } catch {
  968. let closeReason = "Decompression failed: \(error)"
  969. let closeCode = CloseCode.encoding.rawValue
  970. doDisconnect(WSError(type: .protocolError, message: closeReason, code: Int(closeCode)))
  971. writeError(closeCode)
  972. return emptyBuffer
  973. }
  974. } else {
  975. data = Data(bytes: baseAddress+offset, count: Int(len))
  976. }
  977. if receivedOpcode == .connectionClose {
  978. var closeReason = "connection closed by server"
  979. if let customCloseReason = String(data: data, encoding: .utf8) {
  980. closeReason = customCloseReason
  981. } else {
  982. closeCode = CloseCode.protocolError.rawValue
  983. }
  984. doDisconnect(WSError(type: .protocolError, message: closeReason, code: Int(closeCode)))
  985. writeError(closeCode)
  986. return emptyBuffer
  987. }
  988. if receivedOpcode == .pong {
  989. if canDispatch {
  990. callbackQueue.async { [weak self] in
  991. guard let self = self else { return }
  992. let pongData: Data? = data.count > 0 ? data : nil
  993. self.onPong?(pongData)
  994. self.pongDelegate?.websocketDidReceivePong(socket: self, data: pongData)
  995. }
  996. }
  997. return buffer.fromOffset(offset + Int(len))
  998. }
  999. var response = readStack.last
  1000. if isControlFrame {
  1001. response = nil // Don't append pings.
  1002. }
  1003. if isFin == 0 && receivedOpcode == .continueFrame && response == nil {
  1004. let errCode = CloseCode.protocolError.rawValue
  1005. doDisconnect(WSError(type: .protocolError, message: "continue frame before a binary or text frame", code: Int(errCode)))
  1006. writeError(errCode)
  1007. return emptyBuffer
  1008. }
  1009. var isNew = false
  1010. if response == nil {
  1011. if receivedOpcode == .continueFrame {
  1012. let errCode = CloseCode.protocolError.rawValue
  1013. doDisconnect(WSError(type: .protocolError, message: "first frame can't be a continue frame", code: Int(errCode)))
  1014. writeError(errCode)
  1015. return emptyBuffer
  1016. }
  1017. isNew = true
  1018. response = WSResponse()
  1019. response!.code = receivedOpcode!
  1020. response!.bytesLeft = Int(dataLength)
  1021. response!.buffer = NSMutableData(data: data)
  1022. } else {
  1023. if receivedOpcode == .continueFrame {
  1024. response!.bytesLeft = Int(dataLength)
  1025. } else {
  1026. let errCode = CloseCode.protocolError.rawValue
  1027. doDisconnect(WSError(type: .protocolError, message: "second and beyond of fragment message must be a continue frame", code: Int(errCode)))
  1028. writeError(errCode)
  1029. return emptyBuffer
  1030. }
  1031. response!.buffer!.append(data)
  1032. }
  1033. if let response = response {
  1034. response.bytesLeft -= Int(len)
  1035. response.frameCount += 1
  1036. response.isFin = isFin > 0 ? true : false
  1037. if isNew {
  1038. readStack.append(response)
  1039. }
  1040. _ = processResponse(response)
  1041. }
  1042. let step = Int(offset + numericCast(len))
  1043. return buffer.fromOffset(step)
  1044. }
  1045. }
  1046. /**
  1047. Process all messages in the buffer if possible.
  1048. */
  1049. private func processRawMessagesInBuffer(_ pointer: UnsafePointer<UInt8>, bufferLen: Int) {
  1050. var buffer = UnsafeBufferPointer(start: pointer, count: bufferLen)
  1051. repeat {
  1052. buffer = processOneRawMessage(inBuffer: buffer)
  1053. } while buffer.count >= 2
  1054. if buffer.count > 0 {
  1055. fragBuffer = Data(buffer: buffer)
  1056. }
  1057. }
  1058. /**
  1059. Process the finished response of a buffer.
  1060. */
  1061. private func processResponse(_ response: WSResponse) -> Bool {
  1062. if response.isFin && response.bytesLeft <= 0 {
  1063. if response.code == .ping {
  1064. if respondToPingWithPong {
  1065. let data = response.buffer! // local copy so it is perverse for writing
  1066. dequeueWrite(data as Data, code: .pong)
  1067. }
  1068. } else if response.code == .textFrame {
  1069. guard let str = String(data: response.buffer! as Data, encoding: .utf8) else {
  1070. writeError(CloseCode.encoding.rawValue)
  1071. return false
  1072. }
  1073. if canDispatch {
  1074. callbackQueue.async { [weak self] in
  1075. guard let self = self else { return }
  1076. self.onText?(str)
  1077. self.delegate?.websocketDidReceiveMessage(socket: self, text: str)
  1078. self.advancedDelegate?.websocketDidReceiveMessage(socket: self, text: str, response: response)
  1079. }
  1080. }
  1081. } else if response.code == .binaryFrame {
  1082. if canDispatch {
  1083. let data = response.buffer! // local copy so it is perverse for writing
  1084. callbackQueue.async { [weak self] in
  1085. guard let self = self else { return }
  1086. self.onData?(data as Data)
  1087. self.delegate?.websocketDidReceiveData(socket: self, data: data as Data)
  1088. self.advancedDelegate?.websocketDidReceiveData(socket: self, data: data as Data, response: response)
  1089. }
  1090. }
  1091. }
  1092. readStack.removeLast()
  1093. return true
  1094. }
  1095. return false
  1096. }
  1097. /**
  1098. Write an error to the socket
  1099. */
  1100. private func writeError(_ code: UInt16) {
  1101. let buf = NSMutableData(capacity: MemoryLayout<UInt16>.size)
  1102. let buffer = UnsafeMutableRawPointer(mutating: buf!.bytes).assumingMemoryBound(to: UInt8.self)
  1103. WebSocket.writeUint16(buffer, offset: 0, value: code)
  1104. dequeueWrite(Data(bytes: buffer, count: MemoryLayout<UInt16>.size), code: .connectionClose)
  1105. }
  1106. /**
  1107. Used to write things to the stream
  1108. */
  1109. private func dequeueWrite(_ data: Data, code: OpCode, writeCompletion: (() -> ())? = nil) {
  1110. let operation = BlockOperation()
  1111. operation.addExecutionBlock { [weak self, weak operation] in
  1112. //stream isn't ready, let's wait
  1113. guard let self = self else { return }
  1114. guard let sOperation = operation else { return }
  1115. var offset = 2
  1116. var firstByte:UInt8 = self.FinMask | code.rawValue
  1117. var data = data
  1118. if [.textFrame, .binaryFrame].contains(code), let compressor = self.compressionState.compressor {
  1119. do {
  1120. data = try compressor.compress(data)
  1121. if self.compressionState.clientNoContextTakeover {
  1122. try compressor.reset()
  1123. }
  1124. firstByte |= self.RSV1Mask
  1125. } catch {
  1126. // TODO: report error? We can just send the uncompressed frame.
  1127. }
  1128. }
  1129. let dataLength = data.count
  1130. let frame = NSMutableData(capacity: dataLength + self.MaxFrameSize)
  1131. let buffer = UnsafeMutableRawPointer(frame!.mutableBytes).assumingMemoryBound(to: UInt8.self)
  1132. buffer[0] = firstByte
  1133. if dataLength < 126 {
  1134. buffer[1] = CUnsignedChar(dataLength)
  1135. } else if dataLength <= Int(UInt16.max) {
  1136. buffer[1] = 126
  1137. WebSocket.writeUint16(buffer, offset: offset, value: UInt16(dataLength))
  1138. offset += MemoryLayout<UInt16>.size
  1139. } else {
  1140. buffer[1] = 127
  1141. WebSocket.writeUint64(buffer, offset: offset, value: UInt64(dataLength))
  1142. offset += MemoryLayout<UInt64>.size
  1143. }
  1144. buffer[1] |= self.MaskMask
  1145. let maskKey = UnsafeMutablePointer<UInt8>(buffer + offset)
  1146. _ = SecRandomCopyBytes(kSecRandomDefault, Int(MemoryLayout<UInt32>.size), maskKey)
  1147. offset += MemoryLayout<UInt32>.size
  1148. for i in 0..<dataLength {
  1149. buffer[offset] = data[i] ^ maskKey[i % MemoryLayout<UInt32>.size]
  1150. offset += 1
  1151. }
  1152. var total = 0
  1153. while !sOperation.isCancelled {
  1154. if !self.readyToWrite {
  1155. self.doDisconnect(WSError(type: .outputStreamWriteError, message: "output stream had an error during write", code: 0))
  1156. break
  1157. }
  1158. let stream = self.stream
  1159. let writeBuffer = UnsafeRawPointer(frame!.bytes+total).assumingMemoryBound(to: UInt8.self)
  1160. let len = stream.write(data: Data(bytes: writeBuffer, count: offset-total))
  1161. if len <= 0 {
  1162. self.doDisconnect(WSError(type: .outputStreamWriteError, message: "output stream had an error during write", code: 0))
  1163. break
  1164. } else {
  1165. total += len
  1166. }
  1167. if total >= offset {
  1168. if let callback = writeCompletion {
  1169. self.callbackQueue.async {
  1170. callback()
  1171. }
  1172. }
  1173. break
  1174. }
  1175. }
  1176. }
  1177. writeQueue.addOperation(operation)
  1178. }
  1179. /**
  1180. Used to preform the disconnect delegate
  1181. */
  1182. private func doDisconnect(_ error: Error?) {
  1183. guard !didDisconnect else { return }
  1184. didDisconnect = true
  1185. isConnecting = false
  1186. mutex.lock()
  1187. connected = false
  1188. mutex.unlock()
  1189. guard canDispatch else {return}
  1190. callbackQueue.async { [weak self] in
  1191. guard let self = self else { return }
  1192. self.onDisconnect?(error)
  1193. self.delegate?.websocketDidDisconnect(socket: self, error: error)
  1194. self.advancedDelegate?.websocketDidDisconnect(socket: self, error: error)
  1195. let userInfo = error.map{ [WebsocketDisconnectionErrorKeyName: $0] }
  1196. NotificationCenter.default.post(name: NSNotification.Name(WebsocketDidDisconnectNotification), object: self, userInfo: userInfo)
  1197. }
  1198. }
  1199. // MARK: - Deinit
  1200. deinit {
  1201. mutex.lock()
  1202. readyToWrite = false
  1203. cleanupStream()
  1204. mutex.unlock()
  1205. writeQueue.cancelAllOperations()
  1206. }
  1207. }
  1208. private extension String {
  1209. func sha1Base64() -> String {
  1210. let data = self.data(using: String.Encoding.utf8)!
  1211. var digest = [UInt8](repeating: 0, count:Int(CC_SHA1_DIGEST_LENGTH))
  1212. data.withUnsafeBytes { _ = CC_SHA1($0, CC_LONG(data.count), &digest) }
  1213. return Data(bytes: digest).base64EncodedString()
  1214. }
  1215. }
  1216. private extension Data {
  1217. init(buffer: UnsafeBufferPointer<UInt8>) {
  1218. self.init(bytes: buffer.baseAddress!, count: buffer.count)
  1219. }
  1220. }
  1221. private extension UnsafeBufferPointer {
  1222. func fromOffset(_ offset: Int) -> UnsafeBufferPointer<Element> {
  1223. return UnsafeBufferPointer<Element>(start: baseAddress?.advanced(by: offset), count: count - offset)
  1224. }
  1225. }
  1226. private let emptyBuffer = UnsafeBufferPointer<UInt8>(start: nil, count: 0)
  1227. #if swift(>=4)
  1228. #else
  1229. fileprivate extension String {
  1230. var count: Int {
  1231. return self.characters.count
  1232. }
  1233. }
  1234. #endif