1 /* 2 * hunt-amqp: AMQP library for D programming language, based on hunt-net. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.impl.ProtonDeliveryImpl; 12 13 import hunt.amqp.Handler; 14 import hunt.amqp.ProtonDelivery; 15 16 import hunt.proton.amqp.transport.DeliveryState; 17 import hunt.proton.engine.Delivery; 18 import hunt.proton.engine.Record; 19 import hunt.amqp.impl.ProtonLinkImpl; 20 import hunt.amqp.ProtonReceiver; 21 import hunt.amqp.ProtonSender; 22 23 /** 24 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 25 */ 26 class ProtonDeliveryImpl : ProtonDelivery { 27 28 private Delivery delivery; 29 private Handler!ProtonDelivery _handler; 30 private bool autoSettle; 31 32 this(Delivery delivery) { 33 this.delivery = delivery; 34 delivery.setContext(this); 35 } 36 37 public Object getLink() { 38 return this.delivery.getLink().getContext(); 39 } 40 41 public void clear() { 42 delivery.clear(); 43 } 44 45 public DeliveryState getLocalState() { 46 return delivery.getLocalState(); 47 } 48 49 50 public bool isSettled() { 51 return delivery.isSettled(); 52 } 53 54 55 public bool remotelySettled() { 56 return delivery.remotelySettled(); 57 } 58 59 60 public Record attachments() { 61 return delivery.attachments(); 62 } 63 64 65 public byte[] getTag() { 66 return delivery.getTag(); 67 } 68 69 public void setDefaultDeliveryState(DeliveryState state) { 70 delivery.setDefaultDeliveryState(state); 71 } 72 73 public DeliveryState getDefaultDeliveryState() { 74 return delivery.getDefaultDeliveryState(); 75 } 76 77 public bool isReadable() { 78 return delivery.isReadable(); 79 } 80 81 public bool isUpdated() { 82 return delivery.isUpdated(); 83 } 84 85 public bool isWritable() { 86 return delivery.isWritable(); 87 } 88 89 public int pending() { 90 return delivery.pending(); 91 } 92 93 public bool isPartial() { 94 return delivery.isPartial(); 95 } 96 97 public DeliveryState getRemoteState() { 98 return delivery.getRemoteState(); 99 } 100 101 102 public int getMessageFormat() { 103 return delivery.getMessageFormat(); 104 } 105 106 public bool isBuffered() { 107 return delivery.isBuffered(); 108 } 109 110 111 public ProtonDelivery disposition(DeliveryState state, bool s) { 112 if(delivery.isSettled()) { 113 return this; 114 } 115 116 delivery.disposition(state); 117 if (s) { 118 settle(); 119 } else { 120 flushConnection(); 121 } 122 123 return this; 124 } 125 126 127 public ProtonDelivery settle() { 128 delivery.settle(); 129 flushConnection(); 130 131 return this; 132 } 133 134 private void flushConnection() { 135 ProtonLinkImpl!ProtonReceiver receiver = cast(ProtonLinkImpl!ProtonReceiver)getLinkImpl(); 136 if (receiver !is null) 137 { 138 receiver.getSession().getConnectionImpl().flush(); 139 }else 140 { 141 ProtonLinkImpl!ProtonSender sender = cast(ProtonLinkImpl!ProtonSender)getLinkImpl(); 142 if (sender !is null) 143 { 144 sender.getSession().getConnectionImpl().flush(); 145 } 146 } 147 //getLinkImpl().getSession().getConnectionImpl().flush(); 148 } 149 150 public ProtonDelivery handler(Handler!ProtonDelivery handler) { 151 this._handler = handler; 152 if (delivery.isSettled()) { 153 fireUpdate(); 154 } 155 return this; 156 } 157 158 bool isAutoSettle() { 159 return autoSettle; 160 } 161 162 void setAutoSettle(bool autoSettle) { 163 this.autoSettle = autoSettle; 164 } 165 166 void fireUpdate() { 167 if (this._handler !is null) { 168 this._handler.handle(this); 169 } 170 171 if (autoSettle && delivery.remotelySettled() && !delivery.isSettled()) { 172 settle(); 173 } 174 } 175 176 public Object getLinkImpl() { 177 return delivery.getLink().getContext(); 178 } 179 180 }