1 /* 2 * hunt-amqp: AMQP library for D programming language, based on hunt-net. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.impl.ProtonConnectionImpl; 12 13 import hunt.Exceptions; 14 import hunt.amqp.ProtonConnection; 15 import hunt.amqp.ProtonHelper; 16 import hunt.amqp.ProtonLinkOptions; 17 import hunt.amqp.ProtonReceiver; 18 import hunt.amqp.ProtonSender; 19 import hunt.amqp.ProtonSession; 20 import hunt.amqp.ProtonTransportOptions; 21 import hunt.amqp.sasl.ProtonSaslAuthenticator; 22 23 import hunt.logging; 24 import hunt.proton.Proton; 25 import hunt.proton.amqp.Symbol; 26 import hunt.proton.amqp.transport.ErrorCondition; 27 import hunt.proton.engine.Connection; 28 import hunt.proton.engine.EndpointState; 29 import hunt.proton.engine.Link; 30 import hunt.proton.engine.Receiver; 31 import hunt.proton.engine.Record; 32 import hunt.proton.engine.Sender; 33 import hunt.proton.engine.Session; 34 import hunt.amqp.impl.ProtonSenderImpl; 35 import hunt.amqp.impl.ProtonReceiverImpl; 36 37 import hunt.collection.ArrayList; 38 import hunt.collection.LinkedHashMap; 39 import hunt.collection.List; 40 import hunt.collection.Map; 41 import hunt.net.NetClient; 42 43 44 import hunt.amqp.impl.ProtonTransport; 45 import hunt.amqp.Handler; 46 import std.concurrency : initOnce; 47 import std.uuid; 48 import std.random; 49 import hunt.amqp.impl.ProtonMetaDataSupportImpl; 50 import hunt.amqp.impl.ProtonSessionImpl; 51 import hunt.net.Connection; 52 import hunt.amqp.impl.ProtonSaslClientAuthenticatorImpl; 53 import hunt.Object; 54 import hunt.String; 55 56 /** 57 * @author <a href="http://tfox.org">Tim Fox</a> 58 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 59 */ 60 class ProtonConnectionImpl : ProtonConnection { 61 62 // static Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); 63 64 static Symbol ANONYMOUS_RELAY() { 65 __gshared Symbol inst; 66 return initOnce!inst(Symbol.valueOf("ANONYMOUS-RELAY")); 67 } 68 69 private hunt.proton.engine.Connection.Connection connection; //= Proton.connection(); 70 private ProtonTransport transport; 71 private Handler!(Void)[] endHandlers; 72 73 private Handler!ProtonConnection _openHandler; //= (result) -> { 74 // LOG.trace("Connection open completed"); 75 //}; 76 private AsyncResultHandler!ProtonConnection _closeHandler; 77 78 private AmqpEventHandler!ProtonConnection _disconnectHandler; 79 // 80 private AmqpEventHandler!ProtonSession _sessionOpenHandler; 81 82 private Handler!ProtonSender _senderOpenHandler; //= (sender) -> { 83 // sender.setCondition(new ErrorCondition(Symbol.getSymbol("Not Supported"), "")); 84 //}; 85 private Handler!ProtonReceiver _receiverOpenHandler; //= (receiver) -> { 86 // receiver.setCondition(new ErrorCondition(Symbol.getSymbol("Not Supported"), "")); 87 //}; 88 private bool anonymousRelaySupported; 89 private ProtonSession defaultSession; 90 private hunt.net.Connection.Connection _conn; 91 92 this(string hostname, hunt.net.Connection.Connection conn) { 93 _closeHandler = (result) { 94 if (result.succeeded()) { 95 trace("Connection closed"); 96 } else { 97 warning("Connection closed with error", result.cause()); 98 } 99 }; 100 101 _disconnectHandler = (connection) { 102 trace("Connection disconnected"); 103 }; 104 105 _sessionOpenHandler = (session) { 106 session.setCondition(new ErrorCondition(Symbol.getSymbol("Not Supported"), new String(""))); 107 }; 108 109 this.connection = Proton.connection(); 110 this.connection.setContext(this); 111 string tmp = "vert.x-"; 112 tmp ~= randomUUID().toString(); 113 this.connection.setContainer(tmp); 114 this.connection.setHostname(hostname); 115 this._conn = conn; 116 Map!(Symbol, Object) props = createInitialPropertiesMap(); 117 connection.setProperties(props); 118 } 119 120 private Map!(Symbol, Object) createInitialPropertiesMap() { 121 Map!(Symbol, Object) props = new LinkedHashMap!(Symbol, Object)(); 122 props.put(ProtonMetaDataSupportImpl.PRODUCT_KEY, new String("vertx-proton")); 123 props.put(ProtonMetaDataSupportImpl.VERSION_KEY, new String("version")); 124 return props; 125 } 126 127 ///////////////////////////////////////////////////////////////////////////// 128 // 129 // Delegated state tracking 130 // 131 ///////////////////////////////////////////////////////////////////////////// 132 133 ProtonConnectionImpl setProperties(Map!(Symbol, Object) properties) { 134 Map!(Symbol, Object) newProps = null; 135 if (properties !is null) { 136 newProps = createInitialPropertiesMap(); 137 newProps.putAll(properties); 138 } 139 140 connection.setProperties(newProps); 141 return this; 142 } 143 144 ProtonConnectionImpl setOfferedCapabilities(Symbol[] capabilities) { 145 connection.setOfferedCapabilities(capabilities); 146 return this; 147 } 148 149 ProtonConnectionImpl setHostname(string hostname) { 150 connection.setHostname(hostname); 151 return this; 152 } 153 154 ProtonConnectionImpl setDesiredCapabilities(Symbol[] capabilities) { 155 connection.setDesiredCapabilities(capabilities); 156 return this; 157 } 158 159 ProtonConnectionImpl setContainer(string container) { 160 connection.setContainer(container); 161 return this; 162 } 163 164 ProtonConnectionImpl setCondition(ErrorCondition condition) { 165 connection.setCondition(condition); 166 return this; 167 } 168 169 ErrorCondition getCondition() { 170 return connection.getCondition(); 171 } 172 173 string getContainer() { 174 return connection.getContainer(); 175 } 176 177 string getHostname() { 178 return connection.getHostname(); 179 } 180 181 EndpointState getLocalState() { 182 return connection.getLocalState(); 183 } 184 185 ErrorCondition getRemoteCondition() { 186 return connection.getRemoteCondition(); 187 } 188 189 string getRemoteContainer() { 190 return connection.getRemoteContainer(); 191 } 192 193 Symbol[] getRemoteDesiredCapabilities() { 194 return connection.getRemoteDesiredCapabilities(); 195 } 196 197 string getRemoteHostname() { 198 return connection.getRemoteHostname(); 199 } 200 201 Symbol[] getRemoteOfferedCapabilities() { 202 return connection.getRemoteOfferedCapabilities(); 203 } 204 205 Map!(Symbol, Object) getRemoteProperties() { 206 return connection.getRemoteProperties(); 207 } 208 209 EndpointState getRemoteState() { 210 return connection.getRemoteState(); 211 } 212 213 bool isAnonymousRelaySupported() { 214 return anonymousRelaySupported; 215 } 216 217 Record attachments() { 218 return connection.attachments(); 219 } 220 221 ///////////////////////////////////////////////////////////////////////////// 222 // 223 // Handle/Trigger connection level state changes 224 // 225 ///////////////////////////////////////////////////////////////////////////// 226 227 ProtonConnection open() { 228 version (HUNT_AMQP_DEBUG) 229 logInfo("open-------"); 230 connection.open(); 231 flush(); 232 return this; 233 } 234 235 ProtonConnection close() { 236 connection.close(); 237 flush(); 238 return this; 239 } 240 241 ProtonSessionImpl createSession() { 242 return new ProtonSessionImpl(connection.session()); 243 } 244 245 private ProtonSession getDefaultSession() { 246 if (defaultSession is null) { 247 defaultSession = createSession(); 248 //defaultSession.closeHandler(result -> { 249 // string msg = "The connections default session closed unexpectedly"; 250 // if (!result.succeeded()) { 251 // msg += ": "; 252 // msg += ": " + string.valueOf(result.cause()); 253 // } 254 // Future<ProtonConnection> failure = Future.failedFuture(msg); 255 // Handler<AsyncResult<ProtonConnection>> connCloseHandler = closeHandler; 256 // if (connCloseHandler !is null) { 257 // connCloseHandler.handle(failure); 258 // } 259 //}); 260 261 //defaultSession.closeHandler ( new class Handler!ProtonSession { 262 // void handle(ProtonSession var1) 263 // { 264 // string msg = "The connections default session closed unexpectedly"; 265 // if (closeHandler !is null) { 266 // closeHandler.handle(); 267 // } 268 // } 269 // } ); 270 271 defaultSession.open(); 272 // Deliberately not flushing, the sender/receiver open 273 // call will do that (if it doesn't happen otherwise). 274 } 275 return defaultSession; 276 } 277 278 ProtonSender createSender(string address) { 279 return getDefaultSession().createSender((address)); 280 } 281 282 ProtonSender createSender(string address, ProtonLinkOptions senderOptions) { 283 return getDefaultSession().createSender((address), senderOptions); 284 } 285 286 ProtonReceiver createReceiver(string address) { 287 return getDefaultSession().createReceiver((address)); 288 } 289 290 ProtonReceiver createReceiver(string address, ProtonLinkOptions receiverOptions) { 291 return getDefaultSession().createReceiver((address), receiverOptions); 292 } 293 294 void flush() { 295 if (transport !is null) { 296 version (HUNT_AMQP_DEBUG) 297 logInfo("transport flush"); 298 transport.flush(); 299 } 300 } 301 302 void disconnect() { 303 if (transport !is null) { 304 transport.disconnect(); 305 } 306 } 307 308 bool isDisconnected() { 309 return transport is null; 310 } 311 312 ProtonConnection openHandler(Handler!ProtonConnection openHandler) { 313 //implementationMissing(false); 314 this._openHandler = openHandler; 315 return this; 316 } 317 318 ProtonConnection closeHandler(AsyncResultHandler!ProtonConnection closeHandler) { 319 this._closeHandler = closeHandler; 320 return this; 321 } 322 323 ProtonConnection disconnectHandler(AmqpEventHandler!ProtonConnection disconnectHandler) { 324 this._disconnectHandler = disconnectHandler; 325 return this; 326 } 327 328 ProtonConnection sessionOpenHandler(AmqpEventHandler!ProtonSession remoteSessionOpenHandler) { 329 this._sessionOpenHandler = remoteSessionOpenHandler; 330 return this; 331 } 332 333 ProtonConnection senderOpenHandler(Handler!ProtonSender remoteSenderOpenHandler) { 334 //implementationMissing(false); 335 this._senderOpenHandler = remoteSenderOpenHandler; 336 return this; 337 } 338 339 ProtonConnection receiverOpenHandler(Handler!ProtonReceiver remoteReceiverOpenHandler) { 340 //implementationMissing(false); 341 this._receiverOpenHandler = remoteReceiverOpenHandler; 342 return this; 343 } 344 345 ///////////////////////////////////////////////////////////////////////////// 346 // 347 // Implementation details hidden from api. 348 // 349 ///////////////////////////////////////////////////////////////////////////// 350 351 private void processCapabilities() { 352 Symbol[] capabilities = getRemoteOfferedCapabilities(); 353 anonymousRelaySupported = true; 354 if (capabilities.length != 0) { 355 List!Symbol list = new ArrayList!Symbol(capabilities); 356 if (list.contains(ANONYMOUS_RELAY)) { 357 anonymousRelaySupported = true; 358 } 359 } 360 } 361 362 void fireRemoteOpen() { 363 processCapabilities(); 364 365 if (_openHandler !is null) { 366 _openHandler.handle(this); 367 } 368 } 369 370 void fireRemoteClose() { 371 version(HUNT_DEBUG) info("Remote closed"); 372 if (_closeHandler !is null) { 373 _closeHandler(ProtonHelper.future!(ProtonConnection)(this, getRemoteCondition())); 374 } 375 } 376 377 void fireDisconnect() { 378 version(HUNT_DEBUG) info("Disconnecting..."); 379 transport = null; 380 if (_disconnectHandler !is null) { 381 _disconnectHandler(this); 382 } 383 384 foreach(Handler!Void handler; endHandlers) { 385 handler.handle(null); 386 } 387 388 endHandlers = null; 389 } 390 391 void bindClient(NetClient client, ProtonSaslClientAuthenticatorImpl authenticator, 392 ProtonTransportOptions transportOptions) { 393 transport = new ProtonTransport(connection, client, _conn, 394 authenticator, transportOptions); 395 } 396 397 //void bindServer(NetSocket socket, ProtonSaslAuthenticator authenticator, ProtonTransportOptions transportOptions) { 398 // transport = new ProtonTransport(connection, vertx, null, socket, authenticator, transportOptions); 399 //} 400 401 void fireRemoteSessionOpen(hunt.proton.engine.Session.Session session) { 402 implementationMissing(false); 403 //if (sessionOpenHandler !is null) { 404 // sessionOpenHandler.handle(new ProtonSessionImpl(session)); 405 //} 406 } 407 408 void fireRemoteLinkOpen(Link link) { 409 if (cast(Sender) link !is null) { 410 if (_senderOpenHandler !is null) { 411 _senderOpenHandler.handle(new ProtonSenderImpl(cast(Sender) link)); 412 } 413 } else { 414 if (_receiverOpenHandler !is null) { 415 _receiverOpenHandler.handle(new ProtonReceiverImpl(cast(Receiver) link)); 416 } 417 } 418 } 419 420 void addEndHandler(Handler!Void handler) { 421 endHandlers ~= handler; 422 } 423 424 hunt.net.Connection.Connection getContext() { 425 return _conn; 426 } 427 }