1 /* 2 * hunt-amqp: AMQP library for D programming language, based on hunt-net. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.impl.ProtonSessionImpl; 12 13 import hunt.amqp.ProtonReceiver; 14 import hunt.amqp.ProtonSender; 15 import hunt.amqp.ProtonSession; 16 import hunt.amqp.ProtonConnection; 17 import hunt.amqp.ProtonHelper; 18 import hunt.amqp.ProtonQoS; 19 import hunt.amqp.ProtonLinkOptions; 20 import hunt.amqp.impl.ProtonConnectionImpl; 21 import hunt.amqp.impl.ProtonReceiverImpl; 22 import hunt.amqp.impl.ProtonSenderImpl; 23 24 import hunt.proton.amqp.Symbol; 25 import hunt.proton.amqp.messaging.Accepted; 26 import hunt.proton.amqp.messaging.Modified; 27 import hunt.proton.amqp.messaging.Rejected; 28 import hunt.proton.amqp.messaging.Released; 29 import hunt.proton.amqp.messaging.Source; 30 import hunt.proton.amqp.messaging.Target; 31 import hunt.proton.amqp.transport.ErrorCondition; 32 import hunt.proton.engine.EndpointState; 33 import hunt.proton.engine.Receiver; 34 import hunt.proton.engine.Record; 35 import hunt.proton.engine.Sender; 36 import hunt.proton.engine.Session; 37 import hunt.Exceptions; 38 import hunt.String; 39 import hunt.collection.ArrayList; 40 import std.conv: to; 41 import hunt.Boolean; 42 import hunt.amqp.Handler; 43 import hunt.logging; 44 /** 45 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 46 */ 47 class ProtonSessionImpl : ProtonSession { 48 49 private Session session; 50 private int autoLinkCounter = 0; 51 private Handler!ProtonSession _openHandler ;//= (result) -> { 52 // LOG.trace("Session open completed"); 53 //}; 54 private Handler!ProtonSession _closeHandler; // = (result) -> { 55 // if (result.succeeded()) { 56 // LOG.trace("Session closed"); 57 // } else { 58 // LOG.warn("Session closed with error", result.cause()); 59 // } 60 //}; 61 62 this(Session session) { 63 this.session = session; 64 this.session.setContext(this); 65 session.setIncomingCapacity(2147483647); 66 _openHandler = new class Handler!ProtonSession 67 { 68 void handle(ProtonSession var1) 69 { 70 version(HUNT_DEBUG) logInfo("Session open completed"); 71 } 72 }; 73 74 _closeHandler = new class Handler!ProtonSession 75 { 76 void handle(ProtonSession var1) 77 { 78 error("Session closed with error"); 79 } 80 }; 81 82 } 83 84 85 ProtonConnection getConnection() { 86 return getConnectionImpl(); 87 } 88 89 ProtonConnectionImpl getConnectionImpl() { 90 return cast(ProtonConnectionImpl) (this.session.getConnection().getContext()); 91 } 92 93 long getOutgoingWindow() { 94 return session.getOutgoingWindow(); 95 } 96 97 98 ProtonSession setIncomingCapacity(int bytes) { 99 session.setIncomingCapacity(bytes); 100 return this; 101 } 102 103 int getOutgoingBytes() { 104 return session.getOutgoingBytes(); 105 } 106 107 EndpointState getRemoteState() { 108 return session.getRemoteState(); 109 } 110 111 int getIncomingBytes() { 112 return session.getIncomingBytes(); 113 } 114 115 116 ErrorCondition getRemoteCondition() { 117 return session.getRemoteCondition(); 118 } 119 120 121 int getIncomingCapacity() { 122 return session.getIncomingCapacity(); 123 } 124 125 EndpointState getLocalState() { 126 return session.getLocalState(); 127 } 128 129 130 ProtonSession setCondition(ErrorCondition condition) { 131 session.setCondition(condition); 132 return this; 133 } 134 135 136 ErrorCondition getCondition() { 137 return session.getCondition(); 138 } 139 140 void setOutgoingWindow(long outgoingWindowSize) { 141 session.setOutgoingWindow(outgoingWindowSize); 142 } 143 144 145 ProtonSessionImpl open() { 146 session.open(); 147 version(HUNT_AMQP_DEBUG) logInfo("session open"); 148 getConnectionImpl().flush(); 149 return this; 150 } 151 152 153 ProtonSessionImpl close() { 154 version(HUNT_AMQP_DEBUG) logInfo("session close"); 155 session.close(); 156 getConnectionImpl().flush(); 157 return this; 158 } 159 160 161 ProtonSessionImpl openHandler(Handler!ProtonSession openHandler) { 162 this._openHandler = openHandler; 163 return this; 164 } 165 166 167 ProtonSessionImpl closeHandler(Handler!ProtonSession closeHandler) { 168 this._closeHandler = closeHandler; 169 return this; 170 } 171 172 private string generateLinkName() { 173 // TODO: include useful details in name, like address and container? 174 return "auto-" ~ to!string((autoLinkCounter++)); 175 } 176 177 private string getOrCreateLinkName(ProtonLinkOptions linkOptions) { 178 return linkOptions.getLinkName() is null ? generateLinkName() : linkOptions.getLinkName(); 179 } 180 181 182 ProtonReceiver createReceiver(string address) { 183 return createReceiver(address, new ProtonLinkOptions()); 184 } 185 186 187 ProtonReceiver createReceiver(string address, ProtonLinkOptions receiverOptions) { 188 Receiver receiver = session.receiver(getOrCreateLinkName(receiverOptions)); 189 190 Symbol[] outcomes = [ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, 191 Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL ]; 192 193 Source source = new Source(); 194 source.setAddress(new String(address)); 195 //source.setOutcomes(new ArrayList!Symbol(outcomes)); 196 source.setDefaultOutcome(Released.getInstance()); 197 if(receiverOptions.isDynamic()) { 198 source.setDynamic(new Boolean(true)); 199 } 200 201 Target target = new Target(); 202 203 receiver.setSource(source); 204 receiver.setTarget(target); 205 206 ProtonReceiverImpl r = new ProtonReceiverImpl(receiver); 207 //r.openHandler((result) -> { 208 // LOG.trace("Receiver open completed"); 209 //}); 210 //r.closeHandler((result) -> { 211 // if (result.succeeded()) { 212 // LOG.trace("Receiver closed"); 213 // } else { 214 // LOG.warn("Receiver closed with error", result.cause()); 215 // } 216 //}); 217 218 // Default to at-least-once 219 r.setQoS(ProtonQoS.AT_LEAST_ONCE); 220 221 return r; 222 } 223 224 225 ProtonSender createSender(string address) { 226 return createSender(address, new ProtonLinkOptions()); 227 } 228 229 230 ProtonSender createSender(string address, ProtonLinkOptions senderOptions) { 231 Sender sender = session.sender(getOrCreateLinkName(senderOptions)); 232 233 Symbol[] outcomes = [ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, 234 Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL ]; 235 Source source = new Source(); 236 //source.setOutcomes(new ArrayList!Symbol(outcomes)); 237 238 Target target = new Target(); 239 target.setAddress(address is null ? null : new String(address)); 240 if(senderOptions.isDynamic()) { 241 target.setDynamic(new Boolean(true)); 242 } 243 244 sender.setSource(source); 245 sender.setTarget(target); 246 247 ProtonSenderImpl s = new ProtonSenderImpl(sender); 248 if (address is null) { 249 s.setAnonymousSender(true); 250 } 251 252 //s.openHandler((result) -> { 253 // LOG.trace("Sender open completed"); 254 //}); 255 //s.closeHandler((result) -> { 256 // if (result.succeeded()) { 257 // LOG.trace("Sender closed"); 258 // } else { 259 // LOG.warn("Sender closed with error", result.cause()); 260 // } 261 //}); 262 263 // Default to at-least-once 264 s.setQoS(ProtonQoS.AT_LEAST_ONCE); 265 266 return s; 267 } 268 269 270 Record attachments() { 271 return session.attachments(); 272 } 273 274 275 void free() { 276 session.free(); 277 getConnectionImpl().flush(); 278 } 279 280 ///////////////////////////////////////////////////////////////////////////// 281 // 282 // Implementation details hidden from api. 283 // 284 ///////////////////////////////////////////////////////////////////////////// 285 void fireRemoteOpen() { 286 version(HUNT_DEBUG) { 287 string s = this.session.getConnection().getRemoteHostname(); 288 warningf("The remote opened: %s", s); 289 } 290 if (_openHandler !is null) 291 { 292 _openHandler.handle(null); 293 } 294 } 295 296 void fireRemoteClose() { 297 version(HUNT_DEBUG) { 298 string s = this.session.getConnection().getRemoteHostname(); 299 warningf("The remote closed: %s", s); 300 } 301 302 if (_closeHandler !is null) { 303 _closeHandler.handle(null); 304 } 305 } 306 307 }