1 /* 2 * hunt-amqp: AMQP library for D programming language, based on hunt-net. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.impl.ProtonSenderImpl; 12 13 import hunt.proton.amqp.transport.SenderSettleMode; 14 import hunt.proton.amqp.transport.Target; 15 import hunt.proton.codec.ReadableBuffer; 16 import hunt.proton.engine.Delivery; 17 import hunt.proton.engine.Sender; 18 import hunt.proton.message.Message; 19 import hunt.proton.message.impl.MessageImpl; 20 21 import hunt.amqp.ProtonDelivery; 22 import hunt.amqp.ProtonSender; 23 import hunt.amqp.impl.ProtonLinkImpl; 24 import hunt.amqp.Handler; 25 import hunt.Exceptions; 26 import hunt.Integer; 27 import hunt.amqp.impl.ProtonWritableBufferImpl; 28 import hunt.amqp.impl.ProtonReadableBufferImpl; 29 import hunt.amqp.impl.ProtonDeliveryImpl; 30 import hunt.logging; 31 32 /** 33 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 34 */ 35 class ProtonSenderImpl : ProtonLinkImpl!ProtonSender, ProtonSender { 36 37 private Handler!ProtonSender drainHandler; 38 private bool anonymousSender; 39 private bool autoSettle = true; 40 private int tag = 1; 41 private bool autoDrained = true; 42 43 this(Sender sender) { 44 super(sender); 45 } 46 47 private Sender sender() { 48 return cast(Sender) link; 49 } 50 51 override ProtonDelivery send(Message message) { 52 return send(message, null); 53 } 54 55 override ProtonDelivery send(Message message, Handler!ProtonDelivery onUpdated) { 56 return send(generateTag(), message, onUpdated); 57 } 58 59 private byte[] generateTag() { 60 int value = tag++; 61 byte[] binary = new byte[Integer.BYTES]; 62 setInt(binary, value); 63 return binary; 64 } 65 66 private static void setInt(byte[] binary, int value) { 67 binary[0] = cast(byte)(value >>> 24); 68 binary[1] = cast(byte)(value >>> 16); 69 binary[2] = cast(byte)(value >>> 8); 70 binary[3] = cast(byte) value; 71 } 72 73 override ProtonDelivery send(byte[] tag, Message message) { 74 return send(tag, message, null); 75 } 76 77 override ProtonDelivery send(byte[] tag, Message message, Handler!ProtonDelivery onUpdated) { 78 if (anonymousSender && message.getAddress() is null) { 79 throw new IllegalArgumentException( 80 "Message must have an address when using anonymous sender."); 81 } 82 // TODO: prevent odd combination of onRecieved callback + SenderSettleMode.SETTLED, or just allow it? 83 version (HUNT_AMQP_DEBUG) 84 logInfof("tag: %s", tag); 85 Delivery delivery = sender().delivery(tag); // start a new delivery.. 86 ProtonWritableBufferImpl buffer = new ProtonWritableBufferImpl(); 87 MessageImpl msg = cast(MessageImpl) message; 88 msg.encode(buffer); 89 ReadableBuffer encoded = new ProtonReadableBufferImpl(buffer.getBuffer()); 90 91 int ll = sender().sendNoCopy(encoded); // 55 92 if (link.getSenderSettleMode() == SenderSettleMode.SETTLED) { 93 delivery.settle(); 94 } 95 sender().advance(); // ends the delivery. 96 97 ProtonDeliveryImpl protonDeliveryImpl = new ProtonDeliveryImpl(delivery); 98 if (onUpdated !is null) { 99 protonDeliveryImpl.setAutoSettle(autoSettle); 100 protonDeliveryImpl.handler(onUpdated); 101 } else { 102 protonDeliveryImpl.setAutoSettle(true); 103 } 104 version (HUNT_AMQP_DEBUG) 105 logInfof("send"); 106 getSession().getConnectionImpl().flush(); 107 108 return protonDeliveryImpl; 109 } 110 111 override bool isAutoSettle() { 112 return autoSettle; 113 } 114 115 override ProtonSender setAutoSettle(bool autoSettle) { 116 this.autoSettle = autoSettle; 117 return this; 118 } 119 120 bool isAnonymousSender() { 121 return anonymousSender; 122 } 123 124 void setAnonymousSender(bool anonymousSender) { 125 this.anonymousSender = anonymousSender; 126 } 127 128 override ProtonSenderImpl self() { 129 return this; 130 } 131 132 override bool sendQueueFull() { 133 return link.getRemoteCredit() <= 0; 134 } 135 136 override ProtonSender sendQueueDrainHandler(Handler!ProtonSender drainHandler) { 137 this.drainHandler = drainHandler; 138 handleLinkFlow(); 139 return this; 140 } 141 142 override void handleLinkFlow() { 143 if (link.getRemoteCredit() > 0 && drainHandler !is null) { 144 drainHandler.handle(this); 145 } 146 147 if (autoDrained && getDrain()) { 148 drained(); 149 } 150 } 151 152 override bool isAutoDrained() { 153 return autoDrained; 154 } 155 156 override ProtonSender setAutoDrained(bool autoDrained) { 157 this.autoDrained = autoDrained; 158 return this; 159 } 160 161 override int drained() { 162 return super.drained(); 163 } 164 165 override string getRemoteAddress() { 166 Target remoteTarget = getRemoteTarget(); 167 168 return remoteTarget is null ? null : (cast(string)(remoteTarget.getAddress().getBytes())); 169 } 170 171 }