1 /*
2  * hunt-amqp: AMQP library for D programming language, based on hunt-net.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.impl.ProtonSenderImpl;
12 
13 import hunt.proton.amqp.transport.SenderSettleMode;
14 import hunt.proton.amqp.transport.Target;
15 import hunt.proton.codec.ReadableBuffer;
16 import hunt.proton.engine.Delivery;
17 import hunt.proton.engine.Sender;
18 import hunt.proton.message.Message;
19 import hunt.proton.message.impl.MessageImpl;
20 
21 import hunt.amqp.ProtonDelivery;
22 import hunt.amqp.ProtonSender;
23 import hunt.amqp.impl.ProtonLinkImpl;
24 import hunt.amqp.Handler;
25 import hunt.Exceptions;
26 import hunt.Integer;
27 import hunt.amqp.impl.ProtonWritableBufferImpl;
28 import hunt.amqp.impl.ProtonReadableBufferImpl;
29 import hunt.amqp.impl.ProtonDeliveryImpl;
30 import hunt.logging;
31 
32 /**
33  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
34  */
35 class ProtonSenderImpl : ProtonLinkImpl!ProtonSender, ProtonSender {
36 
37     private Handler!ProtonSender drainHandler;
38     private bool anonymousSender;
39     private bool autoSettle = true;
40     private int tag = 1;
41     private bool autoDrained = true;
42 
43     this(Sender sender) {
44         super(sender);
45     }
46 
47     private Sender sender() {
48         return cast(Sender) link;
49     }
50 
51     override ProtonDelivery send(Message message) {
52         return send(message, null);
53     }
54 
55     override ProtonDelivery send(Message message, Handler!ProtonDelivery onUpdated) {
56         return send(generateTag(), message, onUpdated);
57     }
58 
59     private byte[] generateTag() {
60         int value = tag++;
61         byte[] binary = new byte[Integer.BYTES];
62         setInt(binary, value);
63         return binary;
64     }
65 
66     private static void setInt(byte[] binary, int value) {
67         binary[0] = cast(byte)(value >>> 24);
68         binary[1] = cast(byte)(value >>> 16);
69         binary[2] = cast(byte)(value >>> 8);
70         binary[3] = cast(byte) value;
71     }
72 
73     override ProtonDelivery send(byte[] tag, Message message) {
74         return send(tag, message, null);
75     }
76 
77     override ProtonDelivery send(byte[] tag, Message message, Handler!ProtonDelivery onUpdated) {
78         if (anonymousSender && message.getAddress() is null) {
79             throw new IllegalArgumentException(
80                     "Message must have an address when using anonymous sender.");
81         }
82         // TODO: prevent odd combination of onRecieved callback + SenderSettleMode.SETTLED, or just allow it?
83         version (HUNT_AMQP_DEBUG)
84             logInfof("tag: %s", tag);
85         Delivery delivery = sender().delivery(tag); // start a new delivery..
86         ProtonWritableBufferImpl buffer = new ProtonWritableBufferImpl();
87         MessageImpl msg = cast(MessageImpl) message;
88         msg.encode(buffer);
89         ReadableBuffer encoded = new ProtonReadableBufferImpl(buffer.getBuffer());
90 
91         int ll = sender().sendNoCopy(encoded); // 55
92         if (link.getSenderSettleMode() == SenderSettleMode.SETTLED) {
93             delivery.settle();
94         }
95         sender().advance(); // ends the delivery.
96 
97         ProtonDeliveryImpl protonDeliveryImpl = new ProtonDeliveryImpl(delivery);
98         if (onUpdated !is null) {
99             protonDeliveryImpl.setAutoSettle(autoSettle);
100             protonDeliveryImpl.handler(onUpdated);
101         } else {
102             protonDeliveryImpl.setAutoSettle(true);
103         }
104         version (HUNT_AMQP_DEBUG)
105             logInfof("send");
106         getSession().getConnectionImpl().flush();
107 
108         return protonDeliveryImpl;
109     }
110 
111     override bool isAutoSettle() {
112         return autoSettle;
113     }
114 
115     override ProtonSender setAutoSettle(bool autoSettle) {
116         this.autoSettle = autoSettle;
117         return this;
118     }
119 
120     bool isAnonymousSender() {
121         return anonymousSender;
122     }
123 
124     void setAnonymousSender(bool anonymousSender) {
125         this.anonymousSender = anonymousSender;
126     }
127 
128     override ProtonSenderImpl self() {
129         return this;
130     }
131 
132     override bool sendQueueFull() {
133         return link.getRemoteCredit() <= 0;
134     }
135 
136     override ProtonSender sendQueueDrainHandler(Handler!ProtonSender drainHandler) {
137         this.drainHandler = drainHandler;
138         handleLinkFlow();
139         return this;
140     }
141 
142     override void handleLinkFlow() {
143         if (link.getRemoteCredit() > 0 && drainHandler !is null) {
144             drainHandler.handle(this);
145         }
146 
147         if (autoDrained && getDrain()) {
148             drained();
149         }
150     }
151 
152     override bool isAutoDrained() {
153         return autoDrained;
154     }
155 
156     override ProtonSender setAutoDrained(bool autoDrained) {
157         this.autoDrained = autoDrained;
158         return this;
159     }
160 
161     override int drained() {
162         return super.drained();
163     }
164 
165     override string getRemoteAddress() {
166         Target remoteTarget = getRemoteTarget();
167 
168         return remoteTarget is null ? null : (cast(string)(remoteTarget.getAddress().getBytes()));
169     }
170 
171 }