1 /* 2 * hunt-amqp: AMQP library for D programming language, based on hunt-net. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.impl.ProtonLinkImpl; 12 13 //import io.vertx.core.AsyncResult; 14 //import io.vertx.core.Handler; 15 import hunt.amqp.ProtonHelper; 16 import hunt.amqp.ProtonLink; 17 import hunt.amqp.ProtonQoS; 18 19 import hunt.proton.amqp.Symbol; 20 import hunt.proton.amqp.UnsignedLong; 21 import hunt.proton.amqp.transport.ErrorCondition; 22 import hunt.proton.amqp.transport.ReceiverSettleMode; 23 import hunt.proton.amqp.transport.SenderSettleMode; 24 import hunt.proton.amqp.transport.Source; 25 import hunt.proton.amqp.transport.Target; 26 import hunt.proton.engine.Delivery; 27 import hunt.proton.engine.EndpointState; 28 import hunt.proton.engine.Link; 29 import hunt.proton.engine.Record; 30 import hunt.collection.Map; 31 import hunt.amqp.Handler; 32 import hunt.amqp.impl.ProtonSessionImpl; 33 import hunt.Exceptions; 34 import hunt.logging; 35 /** 36 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 37 */ 38 abstract class ProtonLinkImpl(T) : ProtonLink!T { 39 40 protected Link link; 41 private Handler!T _openHandler; 42 private Handler!T _closeHandler; 43 private Handler!T _detachHandler; 44 45 this(Link link) { 46 this.link = link; 47 this.link.setContext(this); 48 49 setQoS(getRemoteQoS()); 50 } 51 52 public abstract T self(); 53 54 public ProtonSessionImpl getSession() { 55 return cast(ProtonSessionImpl) (this.link.getSession().getContext()); 56 } 57 58 public Record attachments() { 59 return link.attachments(); 60 } 61 62 public ErrorCondition getCondition() { 63 return link.getCondition(); 64 } 65 66 public int getCredit() { 67 return link.getCredit(); 68 } 69 70 public bool getDrain() { 71 return link.getDrain(); 72 } 73 74 public EndpointState getLocalState() { 75 return link.getLocalState(); 76 } 77 78 public string getName() { 79 return link.getName(); 80 } 81 82 public ErrorCondition getRemoteCondition() { 83 return link.getRemoteCondition(); 84 } 85 86 public int getRemoteCredit() { 87 return link.getRemoteCredit(); 88 } 89 90 public EndpointState getRemoteState() { 91 return link.getRemoteState(); 92 } 93 94 public Target getRemoteTarget() { 95 return link.getRemoteTarget(); 96 } 97 98 public Target getTarget() { 99 return link.getTarget(); 100 } 101 102 public T setTarget(Target target) { 103 link.setTarget(target); 104 return self(); 105 } 106 107 public Source getRemoteSource() { 108 return link.getRemoteSource(); 109 } 110 111 public Source getSource() { 112 return link.getSource(); 113 } 114 115 public T setSource(Source source) { 116 link.setSource(source); 117 return self(); 118 } 119 120 public int getUnsettled() { 121 return link.getUnsettled(); 122 } 123 124 public int getQueued() { 125 return link.getQueued(); 126 } 127 128 public bool advance() { 129 return link.advance(); 130 } 131 132 public int drained() { 133 int drained = link.drained(); 134 getSession().getConnectionImpl().flush(); 135 return drained; 136 } 137 138 public bool detached() { 139 return link.detached(); 140 } 141 142 public Delivery delivery(byte[] tag, int offset, int length) { 143 return link.delivery(tag, offset, length); 144 } 145 146 public Delivery current() { 147 return link.current(); 148 } 149 150 public T setCondition(ErrorCondition condition) { 151 link.setCondition(condition); 152 return self(); 153 } 154 155 public Delivery delivery(byte[] tag) { 156 return link.delivery(tag); 157 } 158 159 public T open() { 160 link.open(); 161 version(HUNT_AMQP_DEBUG) logInfo("link open flush"); 162 getSession().getConnectionImpl().flush(); 163 return self(); 164 } 165 166 public T close() { 167 link.close(); 168 getSession().getConnectionImpl().flush(); 169 return self(); 170 } 171 172 public T detach() { 173 link.detach(); 174 getSession().getConnectionImpl().flush(); 175 return self(); 176 } 177 178 public T openHandler(Handler!T openHandler) { 179 this._openHandler = openHandler; 180 return self(); 181 } 182 183 public T closeHandler(Handler!T closeHandler) { 184 this._closeHandler = closeHandler; 185 return self(); 186 } 187 188 public T detachHandler(Handler!T detachHandler) { 189 this._detachHandler = detachHandler; 190 return self(); 191 } 192 193 public bool isOpen() { 194 return getLocalState() == EndpointState.ACTIVE; 195 } 196 197 public ProtonQoS getQoS() { 198 if (link.getSenderSettleMode() == SenderSettleMode.SETTLED) { 199 return ProtonQoS.AT_MOST_ONCE; 200 } 201 202 return ProtonQoS.AT_LEAST_ONCE; 203 } 204 205 public ProtonQoS getRemoteQoS() { 206 if (link.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED) { 207 return ProtonQoS.AT_MOST_ONCE; 208 } 209 210 return ProtonQoS.AT_LEAST_ONCE; 211 } 212 213 public T setQoS(ProtonQoS qos) { 214 int type = qos.ordinal(); 215 enum int AT_MOST_ONCE = ProtonQoS.AT_MOST_ONCE.ordinal; 216 enum int AT_LEAST_ONCE = ProtonQoS.AT_LEAST_ONCE.ordinal; 217 switch (type) { 218 case AT_MOST_ONCE: 219 link.setSenderSettleMode(SenderSettleMode.SETTLED); 220 link.setReceiverSettleMode(ReceiverSettleMode.FIRST); 221 break; 222 case AT_LEAST_ONCE: 223 link.setSenderSettleMode(SenderSettleMode.UNSETTLED); 224 link.setReceiverSettleMode(ReceiverSettleMode.FIRST); 225 break; 226 default: 227 break; 228 } 229 return self(); 230 } 231 232 public UnsignedLong getMaxMessageSize() { 233 return link.getMaxMessageSize(); 234 } 235 236 public void setMaxMessageSize(UnsignedLong maxMessageSize) { 237 link.setMaxMessageSize(maxMessageSize); 238 } 239 240 public UnsignedLong getRemoteMaxMessageSize() { 241 return link.getRemoteMaxMessageSize(); 242 } 243 244 public Map!(Symbol, Object) getRemoteProperties() { 245 return link.getRemoteProperties(); 246 } 247 248 public void setProperties(Map!(Symbol, Object) properties) { 249 link.setProperties(properties); 250 } 251 252 public void setOfferedCapabilities(Symbol[] capabilities) { 253 link.setOfferedCapabilities(capabilities); 254 } 255 256 public Symbol[] getRemoteOfferedCapabilities() { 257 return link.getRemoteOfferedCapabilities(); 258 } 259 260 public void setDesiredCapabilities(Symbol[] capabilities) { 261 link.setDesiredCapabilities(capabilities); 262 } 263 264 public Symbol[] getRemoteDesiredCapabilities() { 265 return link.getRemoteDesiredCapabilities(); 266 } 267 268 public void free() { 269 link.free(); 270 getSession().getConnectionImpl().flush(); 271 } 272 273 ///////////////////////////////////////////////////////////////////////////// 274 // 275 // Implementation details hidden from public api. 276 // 277 ///////////////////////////////////////////////////////////////////////////// 278 void fireRemoteOpen() { 279 if (_openHandler !is null) { 280 //openHandler.handle(ProtonHelper.future(self(), getRemoteCondition())); 281 _openHandler.handle(self()); 282 } 283 } 284 285 void fireRemoteDetach() { 286 if (_detachHandler !is null) { 287 //detachHandler.handle(ProtonHelper.future(self(), getRemoteCondition())); 288 _detachHandler.handle(self()); 289 } 290 } 291 292 void fireRemoteClose() { 293 if (_closeHandler !is null) { 294 //closeHandler.handle(ProtonHelper.future(self(), getRemoteCondition())); 295 _closeHandler.handle(self()); 296 } 297 } 298 299 abstract void handleLinkFlow(); 300 }